All pages
Powered by GitBook
1 of 4

Loading...

Loading...

Loading...

Loading...

Fluent Bit + SQL

Fluent Bit stream processor uses common SQL to perform record queries. The following section describe the features available and examples of it.

Statements

You can find the detailed query language syntax in BNF form here. The following section will be a brief introduction on how to write SQL queries for Fluent Bit stream processing.

SELECT Statement

Synopsis

SELECT results_statement
  FROM STREAM:stream_name | TAG:match_rule
  [WINDOW TUMBLING (integer SECOND)]
  [WHERE condition]
  [GROUP BY groupby]

Description

Select keys from records coming from a stream or records matching a specific Tag pattern. Note that a simple SELECT statement not associated from a stream creation will send the results to the standard output interface (stdout), useful for debugging purposes.

The query allows filtering the results by applying a condition using WHERE statement. We will explain WINDOW and GROUP BY statements later in aggregation functions section.

Examples

Select all keys from records coming from a stream called apache:

SELECT * FROM STREAM:apache;

Select code key from records which Tag starts with apache.:

SELECT code AS http_status FROM TAG:'apache.*';

Since the TAG selector allows the use of wildcards, we put the value between single quotes.

CREATE STREAM Statement

Synopsis

CREATE STREAM stream_name
  [WITH (property_name=value, [...])]
  AS select_statement

Description

Create a new stream of data using the results from the SELECT statement. New stream created can be optionally re-ingested back into Fluent Bit pipeline if the property Tag is set in the WITH statement.

Examples

Create a new stream called hello from stream called apache:

CREATE STREAM hello AS SELECT * FROM STREAM:apache;

Create a new stream called hello for all records which original Tag starts with apache:

CREATE STREAM hello AS SELECT * FROM TAG:'apache.*';

Aggregation Functions

Aggregation functions are used in results_statement on the keys, allowing to perform data calculation on groups of records. Group of records that aggregation functions apply on are determined by WINDOW keyword. When WINDOW is not specified, aggregation functions apply on the current buffer of records received, which may have non-deterministic number of elements. Aggregation functions can be applied on records in a window of a specific time interval (see the syntax of WINDOW in select statement).

Fluent Bit streaming currently supports tumbling window, which is non-overlapping window type. That means, a window of size 5 seconds performs aggregation computations on records over a 5-second interval, and then starts new calculations for the next interval.

In addition, the syntax support GROUP BY statement, which groups the results by the one or more keys, when they have the same values.

AVG

Synopsis

SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' ;

Description

Calculates the average of request sizes in POST requests.

COUNT

Synopsis

SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (5 SECOND) GROUP BY host;

Description

Count the number of records in 5 second windows group by host IP addresses.

MIN

Synopsis

SELECT MIN(key) FROM STREAM:apache;

Description

Gets the minimum value of a key in a set of records.

MAX

Synopsis

SELECT MIN(key) FROM STREAM:apache;

Description

Gets the maximum value of a key in a set of records.

SUM

Synopsis

SELECT SUM(key) FROM STREAM:apache;

Description

Calculates the sum of all values of key in a set of records.

Time Functions

Time functions adds a new key into the record with timing data

NOW

Synopsis

SELECT NOW() FROM STREAM:apache;

Description

Add system time using format: %Y-%m-%d %H:%M:%S. Output example: 2019-03-09 21:36:05.

UNIX_TIMESTAMP

Synopsis

SELECT UNIX_TIMESTAMP() FROM STREAM:apache;

Description

Add current Unix timestamp to the record. Output example: 1552196165 .

Record Functions

Record functions append new keys to the record using values from the record context.

RECORD_TAG

Synopsis

SELECT RECORD_TAG() FROM STREAM:apache;

Description

Append Tag string associated to the record as a new key.

RECORD_TIME

Synopsis

SELECT RECORD_TIME() FROM STREAM:apache;

WHERE Condition

Similar to conventional SQL statements, WHERE condition is supported in Fluent Bit query language. The language supports conditions over keys and subkeys, for instance:

SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' AND status = 200;

It is possible to check the existence of a key in the record using record-specific function @record.contains:

SELECT MAX(key) FROM STREAM:apache WHERE @record.contains(key);

And to check if the value of a key is/is not NULL:

SELECT MAX(key) FROM STREAM:apache WHERE key IS NULL;
SELECT * FROM STREAM:apache WHERE user IS NOT NULL;

Description

Append a new key with the record Timestamp in double format: seconds.nanoseconds. Output example: 1552196165.705683 .

Getting Started

The following guide assumes that you are familiar with Fluent Bit, if that is not the case we suggest you review the official manual first:

  • Fluent Bit Manual

Requirements

  • Fluent Bit >= v1.1.0 or Fluent Bit from GIT Master

  • Basic understanding of Structured Query Language (SQL)

Technical Concepts

Concept

Description

Stream

A Stream represents an unique flow of data being ingested by an Input plugin. By default Streams get a name using the plugin name plus an internal numerical identification, e.g: tail.0 . Stream name can be changed setting the alias property.

Task

Stream Processor configuration have the notion of Tasks that represents an execution unit, for short: SQL queries are configured in a Task.

Results

When Stream Processor runs a SQL query, results are generated. These results can be re-ingested back into the main Fluent Bit pipeline or simply redirected to the standard output interfaces for debugging purposes.

Tag

Fluent Bit group records and associate a Tag to them. Tags are used to define routing rules or in the case of the stream processor to attach to specific Tag that matches a pattern.

Match

Matching rule that can use a wildcard to match specific records associated to a Tag.

Check Keys and NULL values

Feature available on Fluent Bit >= 1.2

When working with structured messages (records), there are certain cases where we want to know if a key exists, if it value is null or have a value different than null.

Fluent Bit internal records are a binary serialization of maps with keys and values. A value can be null which is a valid data type. In our SQL language we provide the following statements that can be applied to the conditionals statements:

Check if a key value IS NULL

The following SQL statement can be used to retrieve all records from stream test where the key called phone has a null value:

SELECT * FROM STREAM:test WHERE phone IS NULL;

Check if a key value IS NOT NULL

Similar to the example above, there are cases where we want to retrieve all records that certain key value have something different than null:

SELECT * FROM STREAM:test WHERE phone IS NOT NULL;

Check if a key exists

Another common use-case is to check if certain key exists in the record. We provide specific record functions that can be used in the conditional part of the SQL statement. The prototype of the function to check if a key exists in the record is the following:

@record.contains(key)

The following example query all records that contains a key called phone:

SELECT * FROM STREAM:test WHERE @record.contains(phone);

Hands On! 101

This article goes through very specific and simple steps to learn how Stream Processor works. For simplicity it uses a custom Docker image that contains the relevant components for testing.

Requirements

The following tutorial requires the following software components:

  • Fluent Bit >= v1.2.0

  • Docker Engine (not mandatory if you already have Fluent Bit binary installed in your system)

In addition download the following data sample file (130KB):

  • https://fluentbit.io/samples/sp-samples-1k.log

Stream Processing using the command line

For all next steps we will run Fluent Bit from the command line, and for simplicity we will use the official Docker image.

1. Fluent Bit version:

$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
Fluent Bit v1.4.0

2. Parse sample files

The samples file contains JSON records. On this command, we are appending the Parsers configuration file and instructing tail input plugin to parse the content as json:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.4                                          \
     /fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf     \
                                -i tail -p path=/sp-samples-1k.log  \
                                        -p parser=json              \
                                -o stdout -f 1

The command above will simply print the parsed content to the standard output interface. The content will print the Tag associated to each record and an array with two fields: record timestamp and record map:

Fluent Bit v1.4.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2019/05/08 13:34:16] [ info] [storage] initializing...
[2019/05/08 13:34:16] [ info] [storage] in-memory
[2019/05/08 13:34:16] [ info] [storage] normal synchronization mode, checksum disabled
[2019/05/08 13:34:16] [ info] [engine] started (pid=1)
[2019/05/08 13:34:16] [ info] [sp] stream processor started
[0] tail.0: [1557322456.315513208, {"date"=>"22/abr/2019:12:43:51 -0600", "ip"=>"73.113.230.135", "word"=>"balsamine", "country"=>"Japan", "flag"=>false, "num"=>96}]
[1] tail.0: [1557322456.315525280, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"242.212.128.227", "word"=>"inappendiculate", "country"=>"Chile", "flag"=>false, "num"=>15}]
[2] tail.0: [1557322456.315532364, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"85.61.182.212", "word"=>"elicits", "country"=>"Argentina", "flag"=>true, "num"=>73}]
[3] tail.0: [1557322456.315538969, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"124.192.66.23", "word"=>"Dwan", "country"=>"Germany", "flag"=>false, "num"=>67}]
[4] tail.0: [1557322456.315545150, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"18.135.244.142", "word"=>"chesil", "country"=>"Argentina", "flag"=>true, "num"=>19}]
[5] tail.0: [1557322456.315550927, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"132.113.203.169", "word"=>"fendered", "country"=>"United States", "flag"=>true, "num"=>53}]

As of now there is no Stream Processing, on step #3 we will start doing some basic queries.

3. Selecting specific record keys

This command introduces a Stream Processor (SP) query through the -T option and changes the output plugin to null, this is done with the purpose of obtaining the SP results in the standard output interface and avoid confusions in the terminal.

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.2                                               \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
         -T "SELECT word, num FROM STREAM:tail.0 WHERE country='Chile';" \
         -o null -f 1

The query above aims to retrieve all records that a key named country value matches the value Chile, and for each match compose and output a record using only the key fields word and num:

[0] [1557322913.263534, {"word"=>"Candide", "num"=>94}]
[0] [1557322913.263581, {"word"=>"delightfulness", "num"=>99}]
[0] [1557322913.263607, {"word"=>"effulges", "num"=>63}]
[0] [1557322913.263690, {"word"=>"febres", "num"=>21}]
[0] [1557322913.263706, {"word"=>"decasyllables", "num"=>76}]

4. Calculate Average Value

The following query is similar to the one in the previous step, but this time we will use the aggregation function called AVG() to get the average value of the records ingested:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.2                                               \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
         -T "SELECT AVG(num) FROM STREAM:tail.0 WHERE country='Chile';"  \
         -o null -f 1

output:

[0] [1557323573.940149, {"AVG(num)"=>61.230770}]
[0] [1557323573.941890, {"AVG(num)"=>47.842106}]
[0] [1557323573.943544, {"AVG(num)"=>40.647060}]
[0] [1557323573.945086, {"AVG(num)"=>56.812500}]
[0] [1557323573.945130, {"AVG(num)"=>99.000000}]

why did we get multiple records? Answer: When Fluent Bit processes the data, records come in chunks and the Stream Processor runs the process over chunks of data, so the input plugin ingested 5 chunks of records and SP processed the query for each chunk independently. To process multiple chunks at once we have to group results during windows of time.

5. Grouping Results and Window

Grouping results aims to simplify data processing and when used in a defined window of time we can achieve great things. The next query group the results by country and calculate the average of num value, the processing window is 1 second which basically means: process all incoming chunks coming within 1 second window:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.2                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
         -T "SELECT country, AVG(num) FROM STREAM:tail.0            \
             WINDOW TUMBLING (1 SECOND)                             \
             WHERE country='Chile'                                  \
             GROUP BY country;"                                     \
         -o null -f 1

output:

[0] [1557324239.003211, {"country"=>"Chile", "AVG(num)"=>53.164558}]

6. Ingest Stream Processor results as new Stream of Data

Now we see a more real-world use case. Sending data results to the standard output interface is good for learning purposes, but now we will instruct the Stream Processor to ingest results as part of Fluent Bit data pipeline and attach a Tag to them.

This can be done using the CREATE STREAM statement that will also tag results with sp-results value. Note that output plugin parameter is now stdout matching all records tagged with sp-results:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.2                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
         -T "CREATE STREAM results WITH (tag='sp-results')          \
             AS                                                     \
               SELECT country, AVG(num) FROM STREAM:tail.0          \
               WINDOW TUMBLING (1 SECOND)                           \
               WHERE country='Chile'                                \
               GROUP BY country;"                                   \
         -o stdout -m 'sp-results' -f 1

output:

[0] sp-results: [1557325032.000160100, {"country"=>"Chile", "AVG(num)"=>53.164558}]

F.A.Q

Where STREAM name comes from?

Fluent Bit have the notion of streams, and every input plugin instance gets a default name. You can override that behavior by setting an alias. Check the alias parameter and new stream name in the following example:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.4                                          \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
             -p alias=samples                                       \
         -T "CREATE STREAM results WITH (tag='sp-results')          \
             AS                                                     \
               SELECT country, AVG(num) FROM STREAM:samples         \
               WINDOW TUMBLING (1 SECOND)                           \
               WHERE country='Chile'                                \
               GROUP BY country;"                                   \
         -o stdout -m 'sp-results' -f 1