Feature available on Fluent Bit >= 1.2
When working with structured messages (records), there are certain cases where we want to know if a key exists, if it value is null or have a value different than null.
Fluent Bit internal records are a binary serialization of maps with keys and values. A value can be null which is a valid data type. In our SQL language we provide the following statements that can be applied to the conditionals statements:
The following SQL statement can be used to retrieve all records from stream test where the key called phone has a null value:
Similar to the example above, there are cases where we want to retrieve all records that certain key value have something different than null:
Another common use-case is to check if certain key exists in the record. We provide specific record functions that can be used in the conditional part of the SQL statement. The prototype of the function to check if a key exists in the record is the following:
The following example query all records that contains a key called phone:
Concept | Description |
Stream | A Stream represents an unique flow of data being ingested by an Input plugin. By default Streams get a name using the plugin name plus an internal numerical identification, e.g: tail.0 . Stream name can be changed setting the alias property. |
Task | Stream Processor configuration have the notion of Tasks that represents an execution unit, for short: SQL queries are configured in a Task. |
Results | When Stream Processor runs a SQL query, results are generated. These results can be re-ingested back into the main Fluent Bit pipeline or simply redirected to the standard output interfaces for debugging purposes. |
Tag | Fluent Bit group records and associate a Tag to them. Tags are used to define routing rules or in the case of the stream processor to attach to specific Tag that matches a pattern. |
Match | Matching rule that can use a wildcard to match specific records associated to a Tag. |
This article goes through very specific and simple steps to learn how Stream Processor works. For simplicity it uses a custom Docker image that contains the relevant components for testing.
The following tutorial requires the following software components:
Fluent Bit >= v1.2.0
Docker Engine (not mandatory if you already have Fluent Bit binary installed in your system)
In addition download the following data sample file (130KB):
For all next steps we will run Fluent Bit from the command line, and for simplicity we will use the official Docker image.
The samples file contains JSON records. On this command, we are appending the Parsers configuration file and instructing tail input plugin to parse the content as json:
The command above will simply print the parsed content to the standard output interface. The content will print the Tag associated to each record and an array with two fields: record timestamp and record map:
As of now there is no Stream Processing, on step #3 we will start doing some basic queries.
This command introduces a Stream Processor (SP) query through the -T option and changes the output plugin to null, this is done with the purpose of obtaining the SP results in the standard output interface and avoid confusions in the terminal.
The query above aims to retrieve all records that a key named country value matches the value Chile, and for each match compose and output a record using only the key fields word and num:
The following query is similar to the one in the previous step, but this time we will use the aggregation function called AVG() to get the average value of the records ingested:
output:
why did we get multiple records? Answer: When Fluent Bit processes the data, records come in chunks and the Stream Processor runs the process over chunks of data, so the input plugin ingested 5 chunks of records and SP processed the query for each chunk independently. To process multiple chunks at once we have to group results during windows of time.
Grouping results aims to simplify data processing and when used in a defined window of time we can achieve great things. The next query group the results by country and calculate the average of num value, the processing window is 1 second which basically means: process all incoming chunks coming within 1 second window:
output:
Now we see a more real-world use case. Sending data results to the standard output interface is good for learning purposes, but now we will instruct the Stream Processor to ingest results as part of Fluent Bit data pipeline and attach a Tag to them.
This can be done using the CREATE STREAM statement that will also tag results with sp-results value. Note that output plugin parameter is now stdout matching all records tagged with sp-results:
output:
Fluent Bit have the notion of streams, and every input plugin instance gets a default name. You can override that behavior by setting an alias. Check the alias parameter and new stream name in the following example:
Fluent Bit stream processor uses common SQL to perform record queries. The following section describe the features available and examples of it.
You can find the detailed query language syntax in BNF form here. The following section will be a brief introduction on how to write SQL queries for Fluent Bit stream processing.
Select keys from records coming from a stream or records matching a specific Tag pattern. Note that a simple SELECT
statement not associated from a stream creation will send the results to the standard output interface (stdout), useful for debugging purposes.
The query allows filtering the results by applying a condition using WHERE
statement. We will explain WINDOW
and GROUP BY
statements later in aggregation functions section.
Select all keys from records coming from a stream called apache:
Select all keys from records which Tag starts with apache.:
Since the TAG selector allows the use of wildcards, we put the value between single quotes.
Create a new stream of data using the results from the SELECT
statement. New stream created can be optionally re-ingested back into Fluent Bit pipeline if the property Tag is set in the WITH statement.
Create a new stream called hello from stream called apache:
Create a new stream called hello for all records which original Tag starts with apache:
Aggregation functions are used in results_statement
on the keys, allowing to perform data calculation on groups of records. Group of records that aggregation functions apply on are determined by WINDOW
keyword. When WINDOW
is not specified, aggregation functions apply on the current buffer of records received, which may have non-deterministic number of elements. Aggregation functions can be applied on records in a window of a specific time interval (see the syntax of WINDOW
in select statement).
Fluent Bit streaming currently supports tumbling window, which is non-overlapping window type. That means, a window of size 5 seconds performs aggregation computations on records over a 5-second interval, and then starts new calculations for the next interval.
In addition, the syntax support GROUP BY
statement, which groups the results by the one or more keys, when they have the same values.
Calculates the average of request sizes in POST requests.
Count the number of records in 5 second windows group by host IP addresses.
Gets the minimum value of a key in a set of records.
Gets the maximum value of a key in a set of records.
Calculates the sum of all values of key in a set of records.
Time functions adds a new key into the record with timing data
Add system time using format: %Y-%m-%d %H:%M:%S. Output example: 2019-03-09 21:36:05.
Add current Unix timestamp to the record. Output example: 1552196165 .
Record functions append new keys to the record using values from the record context.
Append Tag string associated to the record as a new key.
Similar to conventional SQL statements, WHERE
condition is supported in Fluent Bit query language. The language supports conditions over keys and subkeys, for instance:
It is possible to check the existence of a key in the record using record-specific function @record.contains
:
And to check if the value of a key is/is not NULL
:
Append a new key with the record Timestamp in double format: seconds.nanoseconds. Output example: 1552196165.705683 .