Fluent Bit: Official Manual
SlackGitHubCommunity MeetingsSandbox and LabsWebinars
1.9
1.9
  • Fluent Bit v1.9 Documentation
  • About
    • What is Fluent Bit?
    • A Brief History of Fluent Bit
    • Fluentd & Fluent Bit
    • License
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Fluent Bit
    • Upgrade Notes
    • Supported Platforms
    • Requirements
    • Sources
      • Download Source Code
      • Build and Install
      • Build with Static Configuration
    • Linux Packages
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • Raspbian / Raspberry Pi
    • Docker
    • Containers on AWS
    • Amazon EC2
    • Kubernetes
    • macOS
    • Windows
    • Yocto / Embedded Linux
  • Administration
    • Configuring Fluent Bit
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • YAML Configuration
        • Configuration File
      • Unit Sizes
      • Multiline Parsing
    • Security
    • Buffering & Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • Dump Internals / Signal
    • HTTP Proxy
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Pipeline Monitoring
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Log Based Metrics
      • Docker Events
      • Dummy
      • Exec
      • Fluent Bit Metrics
      • Forward
      • Head
      • HTTP
      • Health
      • Kernel Logs
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • Process Log Based Metrics
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Filters
      • AWS Metadata
      • CheckList
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Throttle
      • Tensorflow
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Log Analytics
      • Counter
      • Datadog
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • NATS
      • New Relic
      • NULL
      • Observe
      • OpenSearch
      • OpenTelemetry
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP & TLS
      • Treasure Data
      • WebSocket
  • Stream Processing
    • Introduction to Stream Processing
    • Overview
    • Changelog
    • Getting Started
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On! 101
  • Fluent Bit for Developers
    • C Library API
    • Ingest Records Manually
    • Golang Output Plugins
    • Developer guide for beginners on contributing to Fluent Bit
Powered by GitBook
On this page
  • Statements
  • SELECT Statement
  • CREATE STREAM Statement
  • Aggregation Functions
  • AVG
  • COUNT
  • MIN
  • MAX
  • SUM
  • Time Functions
  • NOW
  • UNIX_TIMESTAMP
  • Record Functions
  • RECORD_TAG
  • RECORD_TIME
  • WHERE Condition

Was this helpful?

Export as PDF
  1. Stream Processing
  2. Getting Started

Fluent Bit + SQL

Last updated 2 years ago

Was this helpful?

Fluent Bit stream processor uses common SQL to perform record queries. The following section describe the features available and examples of it.

Statements

You can find the detailed query language syntax in BNF form . The following section will be a brief introduction on how to write SQL queries for Fluent Bit stream processing.

SELECT Statement

Synopsis

SELECT results_statement
  FROM STREAM:stream_name | TAG:match_rule
  [WINDOW TUMBLING (integer SECOND)]
  [WHERE condition]
  [GROUP BY groupby]

Description

Select keys from records coming from a stream or records matching a specific Tag pattern. Note that a simple SELECT statement not associated from a stream creation will send the results to the standard output interface (stdout), useful for debugging purposes.

The query allows filtering the results by applying a condition using WHERE statement. We will explain WINDOW and GROUP BY statements later in aggregation functions section.

Examples

Select all keys from records coming from a stream called apache:

SELECT * FROM STREAM:apache;

Select code key from records which Tag starts with apache.:

SELECT code AS http_status FROM TAG:'apache.*';

Since the TAG selector allows the use of wildcards, we put the value between single quotes.

CREATE STREAM Statement

Synopsis

CREATE STREAM stream_name
  [WITH (property_name=value, [...])]
  AS select_statement

Description

Create a new stream of data using the results from the SELECT statement. New stream created can be optionally re-ingested back into Fluent Bit pipeline if the property Tag is set in the WITH statement.

Examples

Create a new stream called hello from stream called apache:

CREATE STREAM hello AS SELECT * FROM STREAM:apache;

Create a new stream called hello for all records which original Tag starts with apache:

CREATE STREAM hello AS SELECT * FROM TAG:'apache.*';

Aggregation Functions

Aggregation functions are used in results_statement on the keys, allowing to perform data calculation on groups of records. Group of records that aggregation functions apply on are determined by WINDOW keyword. When WINDOW is not specified, aggregation functions apply on the current buffer of records received, which may have non-deterministic number of elements. Aggregation functions can be applied on records in a window of a specific time interval (see the syntax of WINDOW in select statement).

Fluent Bit streaming currently supports tumbling window, which is non-overlapping window type. That means, a window of size 5 seconds performs aggregation computations on records over a 5-second interval, and then starts new calculations for the next interval.

In addition, the syntax support GROUP BY statement, which groups the results by the one or more keys, when they have the same values.

AVG

Synopsis

SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' ;

Description

Calculates the average of request sizes in POST requests.

COUNT

Synopsis

SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (5 SECOND) GROUP BY host;

Description

Count the number of records in 5 second windows group by host IP addresses.

MIN

Synopsis

SELECT MIN(key) FROM STREAM:apache;

Description

Gets the minimum value of a key in a set of records.

MAX

Synopsis

SELECT MIN(key) FROM STREAM:apache;

Description

Gets the maximum value of a key in a set of records.

SUM

Synopsis

SELECT SUM(key) FROM STREAM:apache;

Description

Calculates the sum of all values of key in a set of records.

Time Functions

Time functions adds a new key into the record with timing data

NOW

Synopsis

SELECT NOW() FROM STREAM:apache;

Description

Add system time using format: %Y-%m-%d %H:%M:%S. Output example: 2019-03-09 21:36:05.

UNIX_TIMESTAMP

Synopsis

SELECT UNIX_TIMESTAMP() FROM STREAM:apache;

Description

Add current Unix timestamp to the record. Output example: 1552196165 .

Record Functions

Record functions append new keys to the record using values from the record context.

RECORD_TAG

Synopsis

SELECT RECORD_TAG() FROM STREAM:apache;

Description

Append Tag string associated to the record as a new key.

RECORD_TIME

Synopsis

SELECT RECORD_TIME() FROM STREAM:apache;

WHERE Condition

Similar to conventional SQL statements, WHERE condition is supported in Fluent Bit query language. The language supports conditions over keys and subkeys, for instance:

SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' AND status = 200;

It is possible to check the existence of a key in the record using record-specific function @record.contains:

SELECT MAX(key) FROM STREAM:apache WHERE @record.contains(key);

And to check if the value of a key is/is not NULL:

SELECT MAX(key) FROM STREAM:apache WHERE key IS NULL;
SELECT * FROM STREAM:apache WHERE user IS NOT NULL;

Description

Append a new key with the record Timestamp in double format: seconds.nanoseconds. Output example: 1552196165.705683 .

here