Fluent Bit: Official Manual
SlackGitHubCommunity MeetingsSandbox and LabsWebinars
4.0
4.0
  • Fluent Bit v4.0 Documentation
  • About
    • What is Fluent Bit?
    • A Brief History of Fluent Bit
    • Fluentd and Fluent Bit
    • License
    • Sandbox and Lab Resources
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Fluent Bit
    • Upgrade Notes
    • Supported Platforms
    • Requirements
    • Sources
      • Download Source Code
      • Build and Install
      • Build with Static Configuration
    • Linux Packages
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • Raspbian / Raspberry Pi
    • Docker
    • Containers on AWS
    • Amazon EC2
    • Kubernetes
    • macOS
    • Windows
    • Yocto / Embedded Linux
    • Buildroot / Embedded Linux
  • Administration
    • Configuring Fluent Bit
      • YAML Configuration
        • Service
        • Parsers
        • Multiline Parsers
        • Pipeline
        • Plugins
        • Upstream Servers
        • Environment Variables
        • Includes
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • Unit Sizes
      • Multiline Parsing
    • Transport Security
    • Buffering and Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • Multithreading
    • HTTP Proxy
    • Hot Reload
    • Troubleshooting
    • Performance Tips
    • AWS credentials
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Pipeline Monitoring
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Events
      • Docker Log Based Metrics
      • Dummy
      • Elasticsearch
      • Exec
      • Exec Wasi
      • Ebpf
      • Fluent Bit Metrics
      • Forward
      • Head
      • Health
      • HTTP
      • Kafka
      • Kernel Logs
      • Kubernetes Events
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • OpenTelemetry
      • Podman Metrics
      • Process Exporter Metrics
      • Process Log Based Metrics
      • Prometheus Remote Write
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Splunk
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • UDP
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Processors
      • Content Modifier
      • Labels
      • Metrics Selector
      • OpenTelemetry Envelope
      • Sampling
      • SQL
      • Filters as processors
      • Conditional processing
    • Filters
      • AWS Metadata
      • CheckList
      • ECS Metadata
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Log to Metrics
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Sysinfo
      • Throttle
      • Type Converter
      • Tensorflow
      • Wasm
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Data Explorer
      • Azure Log Analytics
      • Azure Logs Ingestion API
      • Counter
      • Dash0
      • Datadog
      • Dynatrace
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Google Chronicle
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • Microsoft Fabric
      • NATS
      • New Relic
      • NULL
      • Observe
      • OpenObserve
      • OpenSearch
      • OpenTelemetry
      • Oracle Log Analytics
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP and TLS
      • Treasure Data
      • Vivo Exporter
      • WebSocket
  • Stream Processing
    • Introduction to Stream Processing
    • Overview
    • Changelog
    • Getting Started
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On 101
  • Fluent Bit for Developers
    • C Library API
    • Ingest Records Manually
    • Golang Output Plugins
    • WASM Filter Plugins
    • WASM Input Plugins
    • Developer guide for beginners on contributing to Fluent Bit
Powered by GitBook
On this page
  • Configuration Parameters
  • Getting Started
  • Command Line
  • Configuration File
  • Lua Script Filter API
  • Callback Prototype
  • Features
  • Inline configuration
  • Number Type
  • Protected Mode
  • Code Examples
  • Processing environment variables
  • Record Split
  • Response code filtering
  • Time format Conversion
  • Using configuration variables

Was this helpful?

Export as PDF
  1. Data Pipeline
  2. Filters

Lua

Last updated 1 month ago

Was this helpful?

The Lua filter allows you to modify the incoming records (even split one record into multiple records) using custom scripts.

Due to the necessity to have a flexible filtering mechanism, it is now possible to extend Fluent Bit capabilities by writing custom filters using Lua programming language. A Lua-based filter takes two steps:

  1. Configure the Filter in the main configuration

  2. Prepare a Lua script that will be used by the Filter

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description

script

Path to the Lua script that will be used. This can be a relative path against the main configuration file.

call

Lua function name that will be triggered to do filtering. It's assumed that the function is declared inside the script parameter defined above.

type_int_key

If these keys are matched, the fields are converted to integer. If more than one key, delimit by space. Note that starting from Fluent Bit v1.6 integer data types are preserved and not converted to double as in previous versions.

type_array_key

If these keys are matched, the fields are handled as array. If more than one key, delimit by space. It is useful the array can be empty.

protected_mode

If enabled, Lua script will be executed in protected mode. It prevents Fluent Bit from crashing when invalid Lua script is executed or the triggered Lua function throws exceptions. Default is true.

time_as_table

By default when the Lua script is invoked, the record timestamp is passed as a floating number which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys sec for seconds since epoch and nsec for nanoseconds.

code

Inline LUA code instead of loading from a path via script.

enable_flb_null

If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false.

Getting Started

In order to test the filter, you can run the plugin from the command line or through the configuration file. The following examples use the input plugin for data ingestion, invoke Lua filter using the script and call the function which only prints the same information to the standard output:

Command Line

From the command line you can use the following options:

$ fluent-bit -i dummy -F lua -p script=test.lua -p call=cb_print -m '*' -o null

Configuration File

In your main configuration file append the following Input, Filter & Output sections:

[INPUT]
    Name    dummy

[FILTER]
    Name    lua
    Match   *
    script  test.lua
    call    cb_print

[OUTPUT]
    Name    null
    Match   *
pipeline:
  inputs:
    - name: dummy
  filters:
    - name: lua
      match: '*'
      script: test.lua
      call:  cb_print
  outputs:
    - name: null
      match: '*'

Lua Script Filter API

The life cycle of a filter have the following steps:

  1. Upon Tag matching by this filter, it may process or bypass the record.

  2. If tag matched, it will accept the record and invoke the function defined in the call property which basically is the name of a function defined in the Lua script.

  3. Invoke Lua function and pass each record in JSON format.

  4. Upon return, validate return value and continue the pipeline.

Callback Prototype

The Lua script can have one or multiple callbacks that can be used by this filter. The function prototype is as follows:

function cb_print(tag, timestamp, record)
    ...
    return code, timestamp, record
end

Function Arguments

name
description

tag

Name of the tag associated with the incoming record.

timestamp

Unix timestamp with nanoseconds associated with the incoming record. The original format is a double (seconds.nanoseconds)

record

Lua table with the record content

Return Values

Each callback must return three values:

name
data type
description

code

integer

The code return value represents the result and further action that may follows. If code equals -1, means that the record will be dropped. If code equals 0, the record will not be modified, otherwise if code equals 1, means the original timestamp and record have been modified so it must be replaced by the returned values from timestamp (second return value) and record (third return value). If code equals 2, means the original timestamp is not modified and the record has been modified so it must be replaced by the returned values from record (third return value). The code 2 is supported from v1.4.3.

timestamp

double

If code equals 1, the original record timestamp will be replaced with this new value.

record

table

If code equals 1, the original record information will be replaced with this new value. Note that the record value must be a valid Lua table. This value can be an array of tables (i.e., array of objects in JSON format), and in that case the input record is effectively split into multiple records. (see below for more details)

Features

Inline configuration

[SERVICE]
	flush 1
	daemon off
	log_level debug

[INPUT]
	Name random
	Tag test
	Samples 10

[FILTER]
	Name Lua
	Match *
	call append_tag
	code function append_tag(tag, timestamp, record) new_record = record new_record["tag"] = tag return 1, timestamp, new_record end

[OUTPUT]
	Name stdout
	Match *
service:
    flush:           1
    daemon:          off
    log_level:       info

pipeline:
    inputs:
        - name:    random
          tag:     test
          samples: 10

    filters:
        - name:  lua
          match: "*"
          call:  append_tag
          code:  |
              function append_tag(tag, timestamp, record)
                 new_record = record
                 new_record["tag"] = tag
                 return 1, timestamp, new_record
              end

    outputs:
        - name:  stdout
          match: "*"

Number Type

Lua treats numbers as a double type, which means an integer type containing data like user IDs and log levels will be converted to a double. To avoid type conversion, use the type_int_key property.

Protected Mode

Code Examples

For functional examples of this interface, please refer to the code samples provided in the source code of the project located here:

Processing environment variables

Kubernetes pods generally have various environment variables set by the infrastructure automatically which may contain useful information.

In this example, we want to extract part of the Kubernetes cluster API name.

The environment variable is set like so:KUBERNETES_SERVICE_HOST: api.sandboxbsh-a.project.domain.com

We want to extract the sandboxbsh name and add it to our record as a special key.

[FILTER]
Name                lua
Alias               filter-iots-lua
Match               iots_thread.*
Script              filters.lua
Call                set_landscape_deployment
  filters:
    - name: lua
      alias: filter-iots-lua
      match: iots_thread.*
      script: filters.lua
      call:  set_landscape_deployment

filters.lua:

    -- Use a Lua function to create some additional entries based
    -- on substrings from the kubernetes properties.
    function set_landscape_deployment(tag, timestamp, record)
        local landscape = os.getenv("KUBERNETES_SERVICE_HOST")
        if landscape then
            -- Strip the landscape name from this field, KUBERNETES_SERVICE_HOST
            -- Should be of this format
            -- api.sandboxbsh-a.project.domain.com
            -- Take off the leading "api."
            -- sandboxbsh-a.project.domain.com
            --print("landscape1:" .. landscape)
            landscape = landscape:gsub("^[^.]+.", "")
            --print("landscape2:" .. landscape)
            -- Take off everything including and after the - in the cluster name
            -- sandboxbsh
            landscape = landscape:gsub("-.*$", "")
            -- print("landscape3:" .. landscape)
            record["iot_landscape"] = landscape
        end
        -- 2 - replace existing record with this update
        return 2, timestamp, record
    end

Record Split

The Lua callback function can return an array of tables (i.e., array of records) in its third record return value. With this feature, the Lua filter can split one input record into multiple records according to custom logic.

For example:

Lua script

function cb_split(tag, timestamp, record)
    if record["x"] ~= nil then
        return 2, timestamp, record["x"]
    else
        return 2, timestamp, record
    end
end

Configuration

[Input]
    Name    stdin

[Filter]
    Name    lua
    Match   *
    script  test.lua
    call    cb_split

[Output]
    Name    stdout
    Match   *
pipeline:
  inputs:
    - name: stdin
  filters:
    - name: lua
      match: '*'
      script: test.lua
      call: cb_split
  outputs:
    - name: stdout
      match: '*'

Input

{"x": [ {"a1":"aa", "z1":"zz"}, {"b1":"bb", "x1":"xx"}, {"c1":"cc"} ]}
{"x": [ {"a2":"aa", "z2":"zz"}, {"b2":"bb", "x2":"xx"}, {"c2":"cc"} ]}
{"a3":"aa", "z3":"zz", "b3":"bb", "x3":"xx", "c3":"cc"}

Output

[0] stdin.0: [1538435928.310583591, {"a1"=>"aa", "z1"=>"zz"}]
[1] stdin.0: [1538435928.310583591, {"x1"=>"xx", "b1"=>"bb"}]
[2] stdin.0: [1538435928.310583591, {"c1"=>"cc"}]
[3] stdin.0: [1538435928.310588359, {"z2"=>"zz", "a2"=>"aa"}]
[4] stdin.0: [1538435928.310588359, {"b2"=>"bb", "x2"=>"xx"}]
[5] stdin.0: [1538435928.310588359, {"c2"=>"cc"}]
[6] stdin.0: [1538435928.310589790, {"z3"=>"zz", "x3"=>"xx", "c3"=>"cc", "a3"=>"aa", "b3"=>"bb"}]

Response code filtering

In this example, we want to filter Istio logs to exclude lines with response codes between 1 and 399. Istio is configured to write the logs in json format.

Lua script

Script response_code_filter.lua

function cb_response_code_filter(tag, timestamp, record)
  response_code = record["response_code"]
  if (response_code == nil or response_code == '') then
    return 0,0,0
  elseif (response_code ~= 0 and response_code < 400) then
    return -1,0,0
  else
    return 0,0,0
  end
end

Configuration

Configuration to get Istio logs and apply response code filter to them.

[INPUT]
    Name                tail
    Path                /var/log/containers/*_istio-proxy-*.log
    multiline.parser    docker, cri
    Tag                 istio.*
    Mem_Buf_Limit       64MB
    Skip_Long_Lines     Off

[FILTER]
    Name                lua
    Match               istio.*
    Script              response_code_filter.lua
    call                cb_response_code_filter

[Output]
    Name                stdout
    Match               *
pipeline:
  inputs:
    - name: tail
      path: /var/log/containers/*_istio-proxy-*.log
      multiline.parser: 'docker, cri'
      tag: istio.*
      mem_buf_limit: 64MB
      skip_long_lines: off
  filters:
    - name: lua
      match: istio.*
      script: response_code_filter.lua
      call: cb_response_code_filter
  outputs:
    - name: stdout
      match: '*'

Input

{
    "log": {
        "response_code": 200,
        "bytes_sent": 111328341,
        "authority": "randomservice.randomservice",
        "duration": 14493,
        "request_id": "2e9d38f8-36a9-40a6-bdb2-47c8eb7d399d",
        "upstream_local_address": "10.11.82.178:42738",
        "downstream_local_address": "10.10.21.17:80",
        "upstream_cluster": "outbound|80||randomservice.svc.cluster.local",
        "x_forwarded_for": null,
        "route_name": "default",
        "upstream_host": "10.11.6.90:80",
        "user_agent": "RandomUserAgent",
        "response_code_details": "via_upstream",
        "downstream_remote_address": "10.11.82.178:51096",
        "bytes_received": 1148,
        "path": "/?parameter=random",
        "response_flags": "-",
        "start_time": "2022-07-28T11:16:51.663Z",
        "upstream_transport_failure_reason": null,
        "method": "POST",
        "connection_termination_details": null,
        "protocol": "HTTP/1.1",
        "requested_server_name": null,
        "upstream_service_time": "6161"
    },
    "stream": "stdout",
    "time": "2022-07-28T11:17:06.704109897Z"
}

Output

In the output only the messages with response code 0 or greater than 399 are shown.

Time format Conversion

The following example converts a field's specific type of datetime format toutc ISO 8601 format.

Lua script

Script custom_datetime_format.lua

function convert_to_utc(tag, timestamp, record)
    local date_time = record["pub_date"]
    local new_record = record
    if date_time then
        if string.find(date_time, ",") then
            local pattern = "(%a+, %d+ %a+ %d+ %d+:%d+:%d+) ([+-]%d%d%d%d)"
            local date_part, zone_part = date_time:match(pattern)

            if date_part and zone_part then
                local command = string.format("date -u -d '%s %s' +%%Y-%%m-%%dT%%H:%%M:%%SZ", date_part, zone_part)
                local handle = io.popen(command)
                local result = handle:read("*a")
                handle:close()
                new_record["pub_date"] = result:match("%S+")
            end
        end
    end
    return 1, timestamp, new_record
end

Configuration

Use this configuration to obtain a JSON key with datetime, and then convert it to another format.

[INPUT]
    Name    dummy
    Dummy   {"event": "Restock", "pub_date": "Tue, 30 Jul 2024 18:01:06 +0000"}
    Tag     event_category_a

[INPUT]
    Name    dummy
    Dummy   {"event": "Soldout", "pub_date": "Mon, 29 Jul 2024 10:15:00 +0600"}
    Tag     event_category_b


[FILTER]
    Name                lua
    Match               *
    Script              custom_datetime_format.lua
    call                convert_to_utc

[Output]
    Name                stdout
    Match               *
pipeline:
  inputs:
    - name: dummy
      dummy: '{"event": "Restock", "pub_date": "Tue, 30 Jul 2024 18:01:06 +0000"}'
      tag: event_category_a

    - name: dummy
      dummy: '{"event": "Soldout", "pub_date": "Mon, 29 Jul 2024 10:15:00 +0600"}'
      tag: event_category_b

  filters:
    - name: lua
      match: '*'
      code: |
        function convert_to_utc(tag, timestamp, record)
          local date_time = record["pub_date"]
          local new_record = record
          if date_time then
              if string.find(date_time, ",") then
                  local pattern = "(%a+, %d+ %a+ %d+ %d+:%d+:%d+) ([+-]%d%d%d%d)"
                  local date_part, zone_part = date_time:match(pattern)
                  if date_part and zone_part then
                      local command = string.format("date -u -d '%s %s' +%%Y-%%m-%%dT%%H:%%M:%%SZ", date_part, zone_part)
                      local handle = io.popen(command)
                      local result = handle:read("*a")
                      handle:close()
                      new_record["pub_date"] = result:match("%S+")
                  end
              end
          end
          return 1, timestamp, new_record
        end
      call: convert_to_utc

  outputs:
    - name: stdout
      match: '*'

Input

{"event": "Restock", "pub_date": "Tue, 30 Jul 2024 18:01:06 +0000"}

and

{"event": "Soldout", "pub_date": "Mon, 29 Jul 2024 10:15:00 +0600"}

Which are handled by dummy in this example.

Output

The output of this process shows the conversion of the datetime of two timezones toISO 8601 format in UTC.

...
[2024/08/01 00:56:25] [ info] [output:stdout:stdout.0] worker #0 started
[0] event_category_a: [[1722452186.727104902, {}], {"event"=>"Restock", "pub_date"=>"2024-07-30T18:01:06Z"}]
[0] event_category_b: [[1722452186.730255842, {}], {"event"=>"Soldout", "pub_date"=>"2024-07-29T04:15:00Z"}]
...

Using configuration variables

Fluent Bit supports definition of configuration variables, which can be done in the following way:

env:
  myvar1: myvalue1

These variables can be accessed from the Lua code by referring to the FLB_ENV Lua table. Being this a Lua table, the subrecords can be accessed following the same syntax, i.e. FLB_ENV['A'].

Configuration

env:
  A: aaa
  B: bbb
  C: ccc

service:
    flush:           1
    log_level:       info

pipeline:
    inputs:
        - name:    random
          tag:     test
          samples: 10

    filters:
        - name:  lua
          match: "*"
          call:  append_tag
          code:  |
              function append_tag(tag, timestamp, record)
                 new_record = record
                 new_record["my_env"] = FLB_ENV
                 return 1, timestamp, new_record
              end

    outputs:
        - name:  stdout
          match: "*"

Output

test: [[1731990257.781970977, {}], {"my_env"=>{"A"=>"aaa", "C"=>"ccc", "HOSTNAME"=>"monox-2.lan", "B"=>"bbb"}, "rand_value"=>4805047635809401856}]

The include examples to verify during CI.

Fluent Bit supports protected mode to prevent crashes if it executes an invalid Lua script. See in the Lua documentation for more information.

As an example that combines a bit of LUA processing with the that demonstrates using environment variables with LUA regex and substitutions.

See also .

Fluent Bit smoke tests
Error Handling in Application Code
Kubernetes filter
Fluent Bit: PR 811
Lua
dummy
test.lua
cb_print()