Fluent Bit: Official Manual
SlackGitHubCommunity MeetingsSandboxWebinars
4.0
4.0
  • Fluent Bit Documentation
  • About
    • What is Fluent Bit?
    • A Brief History of Fluent Bit
    • Fluentd and Fluent Bit
    • License
    • Sandbox and Lab Resources
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Fluent Bit
    • Upgrade Notes
    • Supported Platforms
    • Requirements
    • Sources
      • Download Source Code
      • Build and Install
      • Build with Static Configuration
    • Linux Packages
      • Amazon Linux
      • Alma / Rocky Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • Raspbian / Raspberry Pi
    • Docker
    • Containers on AWS
    • Amazon EC2
    • Kubernetes
    • macOS
    • Windows
    • Yocto / Embedded Linux
    • Buildroot / Embedded Linux
  • Administration
    • Configuring Fluent Bit
      • YAML Configuration
        • Service
        • Parsers
        • Multiline Parsers
        • Pipeline
        • Plugins
        • Upstream Servers
        • Environment Variables
        • Includes
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • Unit Sizes
      • Multiline Parsing
    • Transport Security
    • Buffering and Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • Multithreading
    • HTTP Proxy
    • Hot Reload
    • Troubleshooting
    • Performance Tips
    • AWS credentials
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Pipeline Monitoring
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Events
      • Docker Log Based Metrics
      • Dummy
      • Elasticsearch
      • Exec
      • Exec Wasi
      • Ebpf
      • Fluent Bit Metrics
      • Forward
      • Head
      • Health
      • HTTP
      • Kafka
      • Kernel Logs
      • Kubernetes Events
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • OpenTelemetry
      • Podman Metrics
      • Process Exporter Metrics
      • Process Log Based Metrics
      • Prometheus Remote Write
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Splunk
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • UDP
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Processors
      • Content Modifier
      • Labels
      • Metrics Selector
      • OpenTelemetry Envelope
      • Sampling
      • SQL
      • Filters as processors
      • Conditional processing
    • Filters
      • AWS Metadata
      • CheckList
      • ECS Metadata
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Log to Metrics
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Sysinfo
      • Throttle
      • Type Converter
      • Tensorflow
      • Wasm
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Data Explorer
      • Azure Log Analytics
      • Azure Logs Ingestion API
      • Counter
      • Dash0
      • Datadog
      • Dynatrace
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Google Chronicle
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • Microsoft Fabric
      • NATS
      • New Relic
      • NULL
      • Observe
      • OpenObserve
      • OpenSearch
      • OpenTelemetry
      • Oracle Log Analytics
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP and TLS
      • Treasure Data
      • Vivo Exporter
      • WebSocket
  • Stream Processing
    • Introduction to Stream Processing
    • Overview
    • Changelog
    • Getting Started
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On 101
  • Fluent Bit for Developers
    • C Library API
    • Ingest Records Manually
    • Golang Output Plugins
    • WASM Filter Plugins
    • WASM Input Plugins
    • Developer guide for beginners on contributing to Fluent Bit
Powered by GitBook
On this page
  • Requirements
  • Steps
  • 1. Fluent Bit version
  • 2. Parse sample files
  • 3. Select specific record keys
  • 4. Calculate average value
  • 5. Group results and windows
  • 6. Ingest stream processor results as a new stream of data

Was this helpful?

Export as PDF
  1. Stream Processing
  2. Getting Started

Hands On 101

Last updated 1 month ago

Was this helpful?

Follow this tutorial to learn more about stream processing.

Requirements

This tutorial requires the following components:

  • Fluent Bit

  • A stream processing

Steps

These steps use the official Fluent Bit Docker image.

1. Fluent Bit version

Run the following command to confirm that Fluent Bit is installed and up-to-date:

$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
Fluent Bit v1.8.2

2. Parse sample files

The sample file contains JSON records. Run the following command to append the parsers.conf file and instruct the Tail input plugin to parse content as JSON:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.8.2                                          \
     /fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf     \
                                -i tail -p path=/sp-samples-1k.log  \
                                        -p parser=json              \
                                        -p read_from_head=true      \
                                -o stdout -f 1

This command prints the parsed content to the standard output interface. The parsed content includes a tag associated with each record and an array with two fields: a timestamp and a record map:

Fluent Bit v1.8.2
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2019/05/08 13:34:16] [ info] [storage] initializing...
[2019/05/08 13:34:16] [ info] [storage] in-memory
[2019/05/08 13:34:16] [ info] [storage] normal synchronization mode, checksum disabled
[2019/05/08 13:34:16] [ info] [engine] started (pid=1)
[2019/05/08 13:34:16] [ info] [sp] stream processor started
[0] tail.0: [1557322456.315513208, {"date"=>"22/abr/2019:12:43:51 -0600", "ip"=>"73.113.230.135", "word"=>"balsamine", "country"=>"Japan", "flag"=>false, "num"=>96}]
[1] tail.0: [1557322456.315525280, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"242.212.128.227", "word"=>"inappendiculate", "country"=>"Chile", "flag"=>false, "num"=>15}]
[2] tail.0: [1557322456.315532364, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"85.61.182.212", "word"=>"elicits", "country"=>"Argentina", "flag"=>true, "num"=>73}]
[3] tail.0: [1557322456.315538969, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"124.192.66.23", "word"=>"Dwan", "country"=>"Germany", "flag"=>false, "num"=>67}]
[4] tail.0: [1557322456.315545150, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"18.135.244.142", "word"=>"chesil", "country"=>"Argentina", "flag"=>true, "num"=>19}]
[5] tail.0: [1557322456.315550927, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"132.113.203.169", "word"=>"fendered", "country"=>"United States", "flag"=>true, "num"=>53}]

3. Select specific record keys

Run the following command to create a stream processor query using the -T flag and change the output to the Null plugin. This obtains the stream processing results in the standard output interface and avoids confusion in the terminal.

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.2                                               \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
             -p read_from_head=true                                      \
         -T "SELECT word, num FROM STREAM:tail.0 WHERE country='Chile';" \
         -o null -f 1

The previous query aims to retrieve all records for which the country key contains the value Chile. For each match, it composes and outputs a record that only contains the keys word and num:

[0] [1557322913.263534, {"word"=>"Candide", "num"=>94}]
[0] [1557322913.263581, {"word"=>"delightfulness", "num"=>99}]
[0] [1557322913.263607, {"word"=>"effulges", "num"=>63}]
[0] [1557322913.263690, {"word"=>"febres", "num"=>21}]
[0] [1557322913.263706, {"word"=>"decasyllables", "num"=>76}]

4. Calculate average value

Run the following command to use the AVG aggregation function to get the average value of ingested records:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log           \
     fluent/fluent-bit:1.8.2                                             \
     /fluent-bit/bin/fluent-bit                                          \
         -R /fluent-bit/etc/parsers.conf                                 \
         -i tail                                                         \
             -p path=/sp-samples-1k.log                                  \
             -p parser=json                                              \
             -p read_from_head=true                                      \
         -T "SELECT AVG(num) FROM STREAM:tail.0 WHERE country='Chile';"  \
         -o null -f 1

The previous query yields the following output:

[0] [1557323573.940149, {"AVG(num)"=>61.230770}]
[0] [1557323573.941890, {"AVG(num)"=>47.842106}]
[0] [1557323573.943544, {"AVG(num)"=>40.647060}]
[0] [1557323573.945086, {"AVG(num)"=>56.812500}]
[0] [1557323573.945130, {"AVG(num)"=>99.000000}]

The resulting output contains multiple records because Fluent Bit processes data in chunks, and the stream processor processes each chunk independently. To process multiple chunks at the same time, you can group results using time windows.

5. Group results and windows

Grouping results within a time window simplifies data processing. Run the following command to group results by country and calculate the average of num with a one-second processing window:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.8.2                                        \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
             -p read_from_head=true                                 \
         -T "SELECT country, AVG(num) FROM STREAM:tail.0            \
             WINDOW TUMBLING (1 SECOND)                             \
             WHERE country='Chile'                                  \
             GROUP BY country;"                                     \
         -o null -f 1

The previous query yields the following output:

[0] [1557324239.003211, {"country"=>"Chile", "AVG(num)"=>53.164558}]

6. Ingest stream processor results as a new stream of data

Next, instruct the stream processor to ingest results as part of the Fluent Bit data pipeline and assign a tag to each record.

Run the following command, which uses a CREATE STREAM statement to tag results with the sp-results tag, then outputs records with that tag to standard output:

$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log      \
     fluent/fluent-bit:1.8.2                                        \
     /fluent-bit/bin/fluent-bit                                     \
         -R /fluent-bit/etc/parsers.conf                            \
         -i tail                                                    \
             -p path=/sp-samples-1k.log                             \
             -p parser=json                                         \
             -p read_from_head=true                                 \
         -T "CREATE STREAM results WITH (tag='sp-results')          \
             AS                                                     \
               SELECT country, AVG(num) FROM STREAM:tail.0          \
               WINDOW TUMBLING (1 SECOND)                           \
               WHERE country='Chile'                                \
               GROUP BY country;"                                   \
         -o stdout -m 'sp-results' -f 1

The previous query yields the following results:

[0] sp-results: [1557325032.000160100, {"country"=>"Chile", "AVG(num)"=>53.164558}]
Docker Engine
sample file