All pages
Powered by GitBook
1 of 23

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Filters

AWS Metadata

The AWS Filter Enriches logs with AWS Metadata. Currently the plugin adds the EC2 instance ID and availability zone to log records. To use this plugin, you must be running in EC2 and have the instance metadata service enabled.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description
Default

Note: If you run Fluent Bit in a container, you may have to use instance metadata v1. The plugin behaves the same regardless of which version is used.

Command Line

Configuration File

EC2 Tags

EC2 Tags are a useful feature that enables you to label and organize your EC2 instances by creating custom-defined key-value pairs. These tags are commonly utilized for resource management, cost allocation, and automation. Consequently, including them in the Fluent Bit generated logs is almost essential.

To achieve this, AWS Filter can be configured with tags_enabled true to enable the tagging of logs with the relevant EC2 instance tags. This setup ensures that logs are appropriately tagged, making it easier to manage and analyze them based on specific criteria.

Requirements

To use the tags_enabled true functionality in Fluent Bit, the option must be enabled on the EC2 instance where Fluent Bit is running. Without this option enabled, Fluent Bit will not be able to retrieve the tags associated with the EC2 instance. However, this does not mean that Fluent Bit will fail or stop working altogether. Instead, if option is not enabled, Fluent Bit will continue to operate normally and capture other values, such as the EC2 instance ID or availability zone, based on its configuration.

Example

tags_include

Assume that our EC2 instance has many tags, some of which have lengthy values that are irrelevant to the logs we want to collect. Only two tags, department and project, seem to be valuable for our purpose. Here is a configuration which reflects this requirement:

If we run Fluent Bit, what will the logs look like? Here is an example of what the logs might contain:

tags_exclude

Suppose our EC2 instance has three tags: Name:fluent-bit-docs-example, project:fluentbit, and department:it. In this example, we want to exclude the department tag since we consider it redundant. This is because all of our projects belong to the it department, and we do not need to waste storage space on redundant labels.

Here is an example configuration that achieves this:

The resulting logs might look like this:

ami_id

The EC2 instance image id.

false

account_id

The account ID for current EC2 instance.

false

hostname

The hostname for current EC2 instance.

false

vpc_id

The VPC ID for current EC2 instance.

false

tags_enabled

Specifies if should attach EC2 instance tags. EC2 instance must have the option enabled (which is disabled by default).

false

tags_include

Defines list of specific EC2 tag keys to inject into the logs. Tag keys must be separated by "," character. Tags which are not present in this list will be ignored. Example: Name,tag1,tag2.

tags_exclude

Defines list of specific EC2 tag keys not to inject into the logs. Tag keys must be separated by "," character. Tags which are not present in this list will be injected into the logs. If both tags_include and tags_exclude are specified, configuration is invalid and plugin fails. Example: Name,tag1,tag2

imds_version

Specify which version of the instance metadata service to use. Valid values are 'v1' or 'v2'.

v2

az

The availability zone; for example, "us-east-1a".

true

ec2_instance_id

The EC2 instance ID.

true

ec2_instance_type

The EC2 instance type.

false

private_ip

The EC2 instance private ip.

instance-metadata-tags
instance-metadata-tags

false

$ bin/fluent-bit -c /PATH_TO_CONF_FILE/fluent-bit.conf

[2020/01/17 07:57:17] [ info] [engine] started (pid=32744)
[0] dummy: [1579247838.000171227, {"message"=>"dummy", "az"=>"us-west-2c", "ec2_instance_id"=>"i-0c862eca9038f5aae", "ec2_instance_type"=>"t2.medium", "private_ip"=>"172.31.6.59", "vpc_id"=>"vpc-7ea11c06", "ami_id"=>"ami-0841edc20334f9287", "account_id"=>"YOUR_ACCOUNT_ID", "hostname"=>"ip-172-31-6-59.us-west-2.compute.internal"}]
[0] dummy: [1601274509.970235760, {"message"=>"dummy", "az"=>"us-west-2c", "ec2_instance_id"=>"i-0c862eca9038f5aae", "ec2_instance_type"=>"t2.medium", "private_ip"=>"172.31.6.59", "vpc_id"=>"vpc-7ea11c06", "ami_id"=>"ami-0841edc20334f9287", "account_id"=>"YOUR_ACCOUNT_ID", "hostname"=>"ip-172-31-6-59.us-west-2.compute.internal"}]
[INPUT]
    Name dummy
    Tag dummy

[FILTER]
    Name aws
    Match *
    imds_version v1
    az true
    ec2_instance_id true
    ec2_instance_type true
    private_ip true
    ami_id true
    account_id true
    hostname true
    vpc_id true
    tags_enabled true

[OUTPUT]
    Name stdout
    Match *
[FILTER]
    Name aws
    Match *
    tags_enabled true
    tags_include department,project
{"log"=>"fluentbit is awesome", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "department"=>"it", "project"=>"fluentbit"}
[FILTER]
    Name aws
    Match *
    tags_enabled true
    tags_exclude department
{"log"=>"aws is awesome", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"fluent-bit-docs-example", "project"=>"fluentbit"}
instance-metadata-tags

Standard Output

The stdout filter plugin allows printing to the standard output the data flowed through the filter plugin, which can be very useful while debugging.

The plugin has no configuration parameters, is very simple to use.

Command Line

$ fluent-bit -i cpu -F stdout -m '*' -o null

We have specified to gather CPU usage metrics and print them out in a human-readable way when they flow through the stdout plugin.

Fluent Bit v1.x.x
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/06/04 14:53:59] [ info] [engine] started (pid=3236719)
[2021/06/04 14:53:59] [ info] [storage] version=1.1.1, initializing...
[2021/06/04 14:53:59] [ info] [storage] in-memory
[2021/06/04 14:53:59] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/06/04 14:53:59] [ info] [sp] stream processor started
[0] cpu.0: [1622789640.379532062, {"cpu_p"=>9.000000, "user_p"=>6.500000, "system_p"=>2.500000, "cpu0.p_cpu"=>8.000000, "cpu0.p_user"=>6.000000, "cpu0.p_system"=>2.000000, "cpu1.p_cpu"=>9.000000, "cpu1.p_user"=>6.000000, "cpu1.p_system"=>3.000000}]
[0] cpu.0: [1622789641.379529426, {"cpu_p"=>22.500000, "user_p"=>18.000000, "system_p"=>4.500000, "cpu0.p_cpu"=>34.000000, "cpu0.p_user"=>30.000000, "cpu0.p_system"=>4.000000, "cpu1.p_cpu"=>11.000000, "cpu1.p_user"=>6.000000, "cpu1.p_system"=>5.000000}]
[0] cpu.0: [1622789642.379544020, {"cpu_p"=>26.500000, "user_p"=>16.000000, "system_p"=>10.500000, "cpu0.p_cpu"=>30.000000, "cpu0.p_user"=>24.000000, "cpu0.p_system"=>6.000000, "cpu1.p_cpu"=>22.000000, "cpu1.p_user"=>8.000000, "cpu1.p_system"=>14.000000}]
[0] cpu.0: [1622789643.379507371, {"cpu_p"=>39.500000, "user_p"=>34.500000, "system_p"=>5.000000, "cpu0.p_cpu"=>52.000000, "cpu0.p_user"=>48.000000, "cpu0.p_system"=>4.000000, "cpu1.p_cpu"=>28.000000, "cpu1.p_user"=>21.000000, "cpu1.p_system"=>7.000000}]
^C[2021/06/04 14:54:04] [engine] caught signal (SIGINT)
[2021/06/04 14:54:04] [ info] [input] pausing cpu.0
[2021/06/04 14:54:04] [ warn] [engine] service will stop in 5 seconds
[2021/06/04 14:54:08] [ info] [engine] service stopped

GeoIP2 Filter

Look up Geo data from IP

GeoIP2 Filter allows you to enrich the incoming data stream using location data from GeoIP2 database.

Configuration Parameters

This plugin supports the following configuration parameters:

Key
Description

Getting Started

The following configuration will process incoming remote_addr, and append country information retrieved from GeoLite2 database.

Each Record parameter above specifies the following triplet:

  1. The field name to be added to records (country)

  2. The lookup key to process (remote_addr)

  3. The query for GeoIP2 database (%{country.names.en})

By running Fluent Bit with the configuration above, you will see the following output:

Note that the GeoLite2-City.mmdb database is available from .

Expect

Made for testing: make sure that your records contain the expected key and values

The expect filter plugin allows you to validate that records match certain criteria in their structure, like validating that a key exists or it has a specific value.

The following page just describes the configuration properties available, for a detailed explanation of its usage and use cases, please refer the following page:

  • Validating and your Data and Structure

Configuration Parameters

The plugin supports the following configuration parameters:

Property
Description

Getting Started

As mentioned on top, refer to the following page for specific details of usage of this filter:

Sysinfo

The Sysinfo Filter plugin allows to append system information like fluent-bit version or hostname.

Configuration Prameters

The plugin supports the following configuration parameters:

Key
Description
Supported platform

Wasm

Use Wasm programs as a filter

Wasm Filter allows you to modify the incoming records using technology.

Due to the necessity to have a flexible filtering mechanism, it is now possible to extend Fluent Bit capabilities by writing custom filters using built Wasm programs and its runtime. A Wasm-based filter takes two steps:

  1. (Optional) Compiled as AOT (Ahead Of Time) objects to optimize Wasm execution pipeline

  2. Configure the Filter in the main configuration

action

action to take when a rule does not match. The available options are warn , exit or "result_key". On warn, a warning message is sent to the logging layer when a mismatch of the rules above is found; using exit makes Fluent Bit abort with status code 255; result_key is to add a matching result to each record.

result_key

specify a key name of matching result. This key is to be used only when 'action' is 'result_key'.

key_exists

Check if a key with a given name exists in the record.

key_not_exists

Check if a key does not exist in the record.

key_val_is_null

check that the value of the key is NULL.

key_val_is_not_null

check that the value of the key is NOT NULL.

key_val_eq

check that the value of the key equals the given value in the configuration.

Validating and your Data and Structure

database

Path to the GeoIP2 database.

lookup_key

Field name to process

record

Defines the KEY LOOKUP_KEY VALUE triplet. See below for how to set up this option.

MaxMind's official site

Prepare a Wasm program that will be used by the Filter

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description

Wasm_Path

Path to the built Wasm program that will be used. This can be a relative path against the main configuration file.

Function_Name

Wasm function name that will be triggered to do filtering. It's assumed that the function is built inside the Wasm program specified above.

Accessible_Paths

Specify the whilelist of paths to be able to access paths from WASM programs.

Configuration Examples

Here is a configuration example.

Wasm
[INPUT]
    Name   dummy
    Dummy  {"remote_addr": "8.8.8.8"}

[FILTER]
    Name geoip2
    Match *
    Database GeoLite2-City.mmdb
    Lookup_key remote_addr
    Record country remote_addr %{country.names.en}
    Record isocode remote_addr %{country.iso_code}

[OUTPUT]
    Name   stdout
    Match  *
{"remote_addr": "8.8.8.8", "country": "United States", "isocode": "US"}
[INPUT]
    Name   dummy
    Tag    dummy.local

[FILTER]
    Name wasm
    Match dummy.*
    WASM_Path /path/to/wasm_program.wasm
    Function_Name filter_function_name
    Accessible_Paths .,/path/to/accessible

[OUTPUT]
    Name   stdout
    Match  *

fluentbit_version_key

Specify the key name for fluent-bit version.

All

os_name_key

Specify the key name for os name. e.g. linux, win64 or macos.

All

hostname_key

Specify the key name for hostname.

All

os_version_key

Specify the key name for os version. It is not supported on some platforms.

Linux

kernel_version_key

Specify the key name for kernel version. It is not supported on some platforms.

Linux

Some properties are supported by specific platform.

Getting Started

In order to start filtering records, you can run the filter from the command line or through the configuration file.

The following configuration file is to append fluent-bit version and OS name.

[INPUT]
    Name dummy
    Tag test

[FILTER]
    Name sysinfo
    Match *
    Fluentbit_version_key flb_ver
    Os_name_key os_name

[OUTPUT]
    name stdout
    match *
pipeline:
    inputs:
        - name: dummy
          tag: test
    filters:
        - name: sysinfo
          match: '*'

You can also run the filter from command line.

The output will be

CheckList

The following plugin looks up if a value in a specified list exists and then allows the addition of a record to indicate if found. Introduced in version 1.8.4

Configuration Parameters

The plugin supports the following configuration parameters

Key
Description

Example Configuration

In the following configuration we will read a file test1.log that includes the following values

Additionally, we will use the following lookup file which contains a list of malicious IPs (ip_list.txt)

In the configuration we are using $remote_addr as the lookup key and 7.7.7.7 is malicious. This means the record we would output for the last record would look like the following

Type Converter

The Type Converter Filter plugin allows to convert data type and append new key value pair.

This plugin is useful in combination with plugins which expect incoming string value. e.g. filter_grep, filter_modify

Configuration Parameters

The plugin supports the following configuration parameters. It needs four parameters.

<config_parameter> <src_key_name> <dst_key_name> <dst_data_type>

dst_data_type allows int, uint, float and string.

e.g. int_key id id_str string

Key
Description

Getting Started

In order to start filtering records, you can run the filter from the command line or through the configuration file.

This is a sample in_mem record to filter.

The plugin outputs uint values and filter_type_converter converts them into string type.

Convert uint to string

You can also run the filter from command line.

The output will be

Nightfall

The Nightfall filter scans logs for sensitive data and redacts the sensitive portions. This filter supports scanning for various sensitive information, ranging from API keys and personally identifiable information(PII) to custom regexes you define. You can configure what to scan for in the Nightfall Dashboard.

This filter is not enabled by default in 1.9.0 due to a typo. It must be enabled by setting flag -DFLB_FILTER_NIGHTFALL=ON when building. In 1.9.1 and above this is fixed.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description
Default

Command Line

Configuration File

Record Modifier

The Record Modifier Filter plugin allows to append fields or to exclude specific fields.

Configuration Parameters

The plugin supports the following configuration parameters: Remove_key and Allowlist_key are exclusive.

Key
Description
fluent-bit -i dummy -o stdout -F sysinfo -m '*' -p fluentbit_version_key=flb_ver -p os_name_key=os_name
[0] dummy.0: [[1699172858.989654355, {}], {"message"=>"dummy", "flb_ver"=>"2.2.0", "os_name"=>"linux"}]
Fluentbit_version_key: flb_ver
Os_name_key: os_name
outputs:
- name: stdout
match: '*'

file

The single value file that Fluent Bit will use as a lookup table to determine if the specified lookup_key exists

lookup_key

The specific key to look up and determine if it exists, supports record accessor

record

The record to add if the lookup_key is found in the specified file. Note you may add multiple record parameters.

mode

Set the check mode. exact and partial are supported. Default : exact.

print_query_time

Print to stdout the elapseed query time for every matched record. Default: false

ignore_case

Compare strings by ignoring case. Default: false

int_key

This parameter is for integer source.

uint_key

This parameter is for unsigned integer source.

float_key

This parameter is for float source.

str_key

This parameter is for string source.

Debug level between 0 (nothing) and 4 (every detail).

0

tls.verify

When enabled, turns on certificate validation when connecting to the Nightfall API.

true

tls.ca_path

Absolute path to root certificates, required if tls.verify is true.

nightfall_api_key

The Nightfall API key to scan your logs with, obtainable from the Nightfall Dashboard

policy_id

The Nightfall dev platform policy to scan your logs with, configurable in the Nightfall Dashboard.

sampling_rate

The rate controlling how much of your logs you wish to be scanned, must be a float between (0,1]. 1 means all logs will be scanned. Useful for avoiding rate limits in conjunction with Fluent Bit's match rule.

1

tls.debug

[INPUT]
    name           tail
    tag            test1
    path           test1.log
    read_from_head true
    parser         json

[FILTER]
    name       checklist
    match      test1
    file       ip_list.txt
    lookup_key $remote_addr
    record     ioc    abc
    record     badurl null
    log_level  debug

[OUTPUT]
    name       stdout
    match      test1
{"remote_addr": true, "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.2", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.3", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.4", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.5", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.6", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
{"remote_addr": "7.7.7.7", "ioc":"false", "url":"https://badurl.com/payload.htm","badurl":"no"}
1.2.3.4
6.6.4.232
7.7.7.7
{"remote_addr": "7.7.7.7", "ioc":"abc", "url":"https://badurl.com/payload.htm","badurl":"null"}
{"Mem.total"=>1016024, "Mem.used"=>716672, "Mem.free"=>299352, "Swap.total"=>2064380, "Swap.used"=>32656, "Swap.free"=>2031724}
[INPUT]
    Name mem

[FILTER]
    Name type_converter
    Match *
    uint_key Mem.total Mem.total_str string
    uint_key Mem.used  Mem.used_str  string
    uint_key Mem.free  Mem.free_str  string

[OUTPUT]
    Name stdout
    Match *
pipeline:
    inputs:
        - name: mem
    filters:
        - name: type_converter
          match: '*'
          uint_key:
            - Mem.total Mem.total_str string
            - Mem.used  Mem.used_str  string
            - Mem.free  Mem.free_str  string
    outputs:
        - name: stdout
          match: '*'
$ fluent-bit -i mem -o stdout -F type_converter -p 'uint_key=Mem.total Mem.total_str string' -p 'uint_key=Mem.used Mem.used_str string' -p 'uint_key=Mem.free Mem.free_str string' -m '*'
[0] mem.0: [1639915154.160159749, {"Mem.total"=>8146052, "Mem.used"=>4513564, "Mem.free"=>3632488, "Swap.total"=>1918356, "Swap.used"=>0, "Swap.free"=>1918356, "Mem.total_str"=>"8146052", "Mem.used_str"=>"4513564", "Mem.free_str"=>"3632488"}]
$ bin/fluent-bit -c /PATH_TO_CONF_FILE/fluent-bit.conf

[2022/02/09 19:46:22] [ info] [engine] started (pid=53844)
[2022/02/09 19:46:22] [ info] [storage] version=1.1.5, initializing...
[2022/02/09 19:46:22] [ info] [storage] in-memory
[2022/02/09 19:46:22] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2022/02/09 19:46:22] [ info] [cmetrics] version=0.2.2
[2022/02/09 19:46:22] [ info] [input:http:http.0] listening on 0.0.0.0:8000
[2022/02/09 19:46:22] [ info] [sp] stream processor started
[2022/02/09 19:46:30] [ info] [filter:nightfall:nightfall.0] Nightfall request http_do=0, HTTP Status: 200
[0] app.log: [1644464790.280412000, {"A"=>"there is nothing sensitive here", "B"=>[{"A"=>"my credit card number is *******************"}, {"A"=>"*********** is my social security."}], "C"=>false, "D"=>"key ********************"}]
[2022/02/09 19:47:25] [ info] [filter:nightfall:nightfall.0] Nightfall request http_do=0, HTTP Status: 200
[0] app.log: [1644464845.675431000, {"A"=>"a very safe string"}]
[INPUT]
    name http
    host 0.0.0.0
    port 8000

[FILTER]
    Name nightfall
    Match *
    nightfall_api_key <API key>
    policy_id 5991946b-1cc8-4c38-9240-72677029a3f7
    sampling_rate 1
    tls.ca_path /etc/ssl/certs

[OUTPUT]
    Name stdout

Record

Append fields. This parameter needs key and value pair.

Remove_key

If the key is matched, that field is removed.

Allowlist_key

If the key is not matched, that field is removed.

Whitelist_key

An alias of Allowlist_key for backwards compatibility.

Uuid_key

If set, the plugin appends uuid to each record. The value assigned becomes the key in the map.

Getting Started

In order to start filtering records, you can run the filter from the command line or through the configuration file.

This is a sample in_mem record to filter.

Append fields

The following configuration file is to append product name and hostname (via environment variable) to record.

[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name record_modifier
    Match *
    Record hostname ${HOSTNAME}
    Record product Awesome_Tool
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: record_modifier
          match: '*'

You can also run the filter from command line.

The output will be

Remove fields with Remove_key

The following configuration file is to remove 'Swap.*' fields.

You can also run the filter from command line.

The output will be

Remove fields with Allowlist_key

The following configuration file is to remain 'Mem.*' fields.

You can also run the filter from command line.

The output will be

Grep

Select or exclude records per patterns

The Grep Filter plugin allows you to match or exclude specific records based on regular expression patterns for values or nested values.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Value Format
Description

Throttle

The Throttle Filter plugin sets the average Rate of messages per Interval, based on leaky bucket and sliding window algorithm. In case of overflood, it will leak within certain rate.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Value Format

Tensorflow

Tensorflow

Tensorflow Filter allows running Machine Learning inference tasks on the records of data coming from input plugins or stream processor. This filter uses as the inference engine, and requires Tensorflow Lite shared library to be present during build and at runtime.

Tensorflow Lite is a lightweight open-source deep learning framework that is used for mobile and IoT applications. Tensorflow Lite only handles inference (not training), therefore, it loads pre-trained models (.tflite files) that are converted into Tensorflow Lite format (FlatBuffer). You can read more on converting Tensorflow models

[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name record_modifier
    Match *
    Remove_key Swap.total
    Remove_key Swap.used
    Remove_key Swap.free
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: record_modifier
          match: '*'
          remove_key: 
             - Swap.total
             - Swap.used
             - Swap.free
    outputs:
        - name: stdout
          match: '*'
[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name record_modifier
    Match *
    Allowlist_key Mem.total
    Allowlist_key Mem.used
    Allowlist_key Mem.free
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: record_modifier
          match: '*'
          Allowlist_key: 
             - Mem.total
             - Mem.used
             - Mem.free
    outputs:
        - name: stdout
          match: '*'
{"Mem.total"=>1016024, "Mem.used"=>716672, "Mem.free"=>299352, "Swap.total"=>2064380, "Swap.used"=>32656, "Swap.free"=>2031724}
$ fluent-bit -i mem -o stdout -F record_modifier -p 'Record=hostname ${HOSTNAME}' -p 'Record=product Awesome_Tool' -m '*'
[0] mem.local: [1492436882.000000000, {"Mem.total"=>1016024, "Mem.used"=>716672, "Mem.free"=>299352, "Swap.total"=>2064380, "Swap.used"=>32656, "Swap.free"=>2031724, "hostname"=>"localhost.localdomain", "product"=>"Awesome_Tool"}]
$ fluent-bit -i mem -o stdout -F  record_modifier -p 'Remove_key=Swap.total' -p 'Remove_key=Swap.free' -p 'Remove_key=Swap.used' -m '*'
[0] mem.local: [1492436998.000000000, {"Mem.total"=>1016024, "Mem.used"=>716672, "Mem.free"=>295332}]
$ fluent-bit -i mem -o stdout -F  record_modifier -p 'Allowlist_key=Mem.total' -p 'Allowlist_key=Mem.free' -p 'Allowlist_key=Mem.used' -m '*'
[0] mem.local: [1492436998.000000000, {"Mem.total"=>1016024, "Mem.used"=>716672, "Mem.free"=>295332}]
record:
- hostname ${HOSTNAME}
- product Awesome_Tool
outputs:
- name: stdout
match: '*'

Regex

KEY REGEX

Keep records in which the content of KEY matches the regular expression.

Exclude

KEY REGEX

Exclude records in which the content of KEY matches the regular expression.

Logical_Op

Operation

Specify which logical operator to use. AND , OR and legacy are allowed as an Operation. Default is legacy for backward compatibility. In legacy mode the behaviour is either AND or OR depending whether the grep is including (uses AND) or excluding (uses OR). Only available from 2.1+.

Record Accessor Enabled

This plugin enables the Record Accessor feature to specify the KEY. Using the record accessor is suggested if you want to match values against nested values.

Getting Started

In order to start filtering records, you can run the filter from the command line or through the configuration file. The following example assumes that you have a file called lines.txt with the following content:

Command Line

Note: using the command line mode need special attention to quote the regular expressions properly. It's suggested to use a configuration file.

The following command will load the tail plugin and read the content of lines.txt file. Then the grep filter will apply a regular expression rule over the log field (created by tail plugin) and only pass the records which field value starts with aa:

Configuration File

The filter allows to use multiple rules which are applied in order, you can have many Regex and Exclude entries as required.

Nested fields example

If you want to match or exclude records based on nested values, you can use a Record Accessor format as the KEY name. Consider the following record example:

if you want to exclude records that match given nested field (for example kubernetes.labels.app), you can use the following rule:

Excluding records missing/invalid fields

It may be that in your processing pipeline you want to drop records that are missing certain keys.

A simple way to do this is just to exclude with a regex that matches anything, a missing key will fail this check.

Here is an example that checks for a specific valid value for the key as well:

The specified key iot_timestamp must match the expected expression - if it does not or is missing/empty then it will be excluded.

Multiple conditions

If you want to set multiple Regex or Exclude, you can use Logical_Op property to use logical conjuction or disjunction.

Note: If Logical_Op is set, setting both 'Regex' and Exclude results in an error.

Output will be

Description

Rate

Integer

Amount of messages for the time.

Window

Integer

Amount of intervals to calculate average over. Default 5.

Interval

String

Time interval, expressed in "sleep" format. e.g 3s, 1.5m, 0.5h etc

Print_Status

Bool

Whether to print status messages with current rate and the limits to information logs

Functional description

Lets imagine we have configured:

we received 1 message first second, 3 messages 2nd, and 5 3rd. As you can see, disregard that Window is actually 5, we use "slow" start to prevent overflooding during the startup.

But as soon as we reached Window size * Interval, we will have true sliding window with aggregation over complete window.

When we have average over window is more than Rate, we will start dropping messages, so that

will become:

As you can see, last pane of the window was overwritten and 1 message was dropped.

Interval vs Window size

You might noticed possibility to configure Interval of the Window shift. It is counter intuitive, but there is a difference between two examples above:

and

Even though both examples will allow maximum Rate of 60 messages per minute, first example may get all 60 messages within first second, and will drop all the rest for the entire minute:

While the second example will not allow more than 1 message per second every second, making output rate more smooth:

It may drop some data if the rate is ragged. I would recommend to use bigger interval and rate for streams of rare but important events, while keep Window bigger and Interval small for constantly intensive inputs.

Command Line

Note: It's suggested to use a configuration file.

The following command will load the tail plugin and read the content of lines.txt file. Then the throttle filter will apply a rate limit and only pass the records which are read below the certain rate:

Configuration File

The example above will pass 1000 messages per second in average over 300 seconds.

[SERVICE]
    parsers_file /path/to/parsers.conf

[INPUT]
    name   tail
    path   lines.txt
    parser json

[FILTER]
    name   grep
    match  *
    regex  log aa

[OUTPUT]
    name   stdout
    match  *
service:
    parsers_file: /path/to/parsers.conf
pipeline:
    inputs:
        - name: tail
          path: lines.txt
          parser: json
    filters:
        - name: grep
          match: '*'
          regex: log aa
    outputs:
        - name: stdout
          match: '*'
[FILTER]
    Name    grep
    Match   *
    Exclude $kubernetes['labels']['app'] myapp
    filters:
        - name: grep
          match: '*'
          exclude: $kubernetes['labels']['app'] myapp
# Use Grep to verify the contents of the iot_timestamp value.
# If the iot_timestamp key does not exist, this will fail
# and exclude the row.
[FILTER]
    Name                     grep
    Alias                    filter-iots-grep
    Match                    iots_thread.*
    Regex                    iot_timestamp ^\d{4}-\d{2}-\d{2}
    filters:
        - name: grep
          alias: filter-iots-grep
          match: iots_thread.*
          regex: iot_timestamp ^\d{4}-\d{2}-\d{2}
[INPUT]
    Name dummy
    Dummy {"endpoint":"localhost", "value":"something"}
    Tag dummy

[FILTER]
    Name grep
    Match *
    Logical_Op or
    Regex value something
    Regex value error

[OUTPUT]
    Name stdout
pipeline:
    inputs:
        - name: dummy
          dummy: '{"endpoint":"localhost", "value":"something"}'
          tag: dummy
    filters:
        - name: grep
          match: '*'
          logical_op: or
          regex:
            - value something
            - value error
    outputs:
        - name: stdout
{"log": "aaa"}
{"log": "aab"}
{"log": "bbb"}
{"log": "ccc"}
{"log": "ddd"}
{"log": "eee"}
{"log": "fff"}
{"log": "ggg"}
$ bin/fluent-bit -i tail -p 'path=lines.txt' -F grep -p 'regex=log aa' -m '*' -o stdout
{
    "log": "something",
    "kubernetes": {
        "pod_name": "myapp-0",
        "namespace_name": "default",
        "pod_id": "216cd7ae-1c7e-11e8-bb40-000c298df552",
        "labels": {
            "app": "myapp"
        },
        "host": "minikube",
        "container_name": "myapp",
        "docker_id": "370face382c7603fdd309d8c6aaaf434fd98b92421ce"
    }
}
Fluent Bit v2.0.9
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/01/22 09:46:49] [ info] [fluent bit] version=2.0.9, commit=16eae10786, pid=33268
[2023/01/22 09:46:49] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/01/22 09:46:49] [ info] [cmetrics] version=0.5.8
[2023/01/22 09:46:49] [ info] [ctraces ] version=0.2.7
[2023/01/22 09:46:49] [ info] [input:dummy:dummy.0] initializing
[2023/01/22 09:46:49] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2023/01/22 09:46:49] [ info] [filter:grep:grep.0] OR mode
[2023/01/22 09:46:49] [ info] [sp] stream processor started
[2023/01/22 09:46:49] [ info] [output:stdout:stdout.0] worker #0 started
[0] dummy: [1674348410.558341857, {"endpoint"=>"localhost", "value"=>"something"}]
[0] dummy: [1674348411.546425499, {"endpoint"=>"localhost", "value"=>"something"}]
Rate 5
Window 5
Interval 1s
+-------+-+-+-+ 
|1|3|5| | | | | 
+-------+-+-+-+ 
|  3  |         average = 3, and not 1.8 if you calculate 0 for last 2 panes. 
+-----+
+-------------+ 
|1|3|5|7|3|4| | 
+-------------+ 
  |  4.4    |   
  ----------+
+-------------+
|1|3|5|7|3|4|7|
+-------------+
    |   5.2   |
    +---------+
+-------------+
|1|3|5|7|3|4|6|
+-------------+
    |   5     |
    +---------+
Rate 60
Window 5
Interval 1m
Rate 1
Window 300
Interval 1s
XX        XX        XX
XX        XX        XX
XX        XX        XX
XX        XX        XX
XX        XX        XX
XX        XX        XX
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  X    X     X    X    X    X
XXXX XXXX  XXXX XXXX XXXX XXXX
+-+-+-+-+-+--+-+-+-+-+-+-+-+-+-+
$ bin/fluent-bit -i tail -p 'path=lines.txt' -F throttle -p 'rate=1' -m '*' -o stdout
[INPUT]
    Name   tail
    Path   lines.txt

[FILTER]
    Name     throttle
    Match    *
    Rate     1000
    Window   300
    Interval 1s

[OUTPUT]
    Name   stdout
    Match  *

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description
Default

input_field

Specify the name of the field in the record to apply inference on.

model_file

Path to the model file (.tflite) to be loaded by Tensorflow Lite.

include_input_fields

Include all input filed in filter's output

True

Creating Tensorflow Lite shared library

Clone Tensorflow repository, install bazel package manager, and run the following command in order to create the shared library:

The script creates the shared library bazel-bin/tensorflow/lite/c/libtensorflowlite_c.so. You need to copy the library to a location (such as /usr/lib) that can be used by Fluent Bit.

Building Fluent Bit with Tensorflow filter plugin

Tensorflow filter plugin is disabled by default. You need to build Fluent Bit with Tensorflow plugin enabled. In addition, it requires access to Tensorflow Lite header files to compile. Therefore, you also need to pass the address of the Tensorflow source code on your machine to the build script:

Command line

If Tensorflow plugin initializes correctly, it reports successful creation of the interpreter, and prints a summary of model's input/output types and dimensions.

Configuration File

Limitations

  1. Currently supports single-input models

  2. Uses Tensorflow 2.3 header files

Tensorflow Lite
here

Parser

The Parser Filter plugin allows for parsing fields in event records.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description
Default
$ bazel build -c opt //tensorflow/lite/c:tensorflowlite_c  # see https://github.com/tensorflow/tensorflow/tree/master/tensorflow/lite/c
cmake -DFLB_FILTER_TENSORFLOW=On -DTensorflow_DIR=<AddressOfTensorflowSourceCode> ...
$ bin/fluent-bit -i mqtt -p 'tag=mqtt.data' -F tensorflow -m '*' -p 'input_field=image' -p 'model_file=/home/user/model.tflite' -p 'include_input_fields=false' -p 'normalization_value=255' -o stdout
[2020/08/04 20:00:00] [ info] Tensorflow Lite interpreter created!
[2020/08/04 20:00:00] [ info] [tensorflow] ===== input #1 =====
[2020/08/04 20:00:00] [ info] [tensorflow] type: FLOAT32  dimensions: {1, 224, 224, 3}
[2020/08/04 20:00:00] [ info] [tensorflow] ===== output #1 ====
[2020/08/04 20:00:00] [ info] [tensorflow] type: FLOAT32  dimensions: {1, 2}
[SERVICE]
    Flush        1
    Daemon       Off
    Log_Level    info

[INPUT]
    Name mqtt
    Tag mqtt.data

[FILTER]
    Name tensorflow
    Match mqtt.data
    input_field image
    model_file /home/m/model.tflite
    include_input_fields false
    normalization_value 255

[OUTPUT]
    Name stdout
    Match *

normalization_value

Divide input values to normalization_value

Key_Name

Specify field name in record to parse.

Parser

Specify the parser name to interpret the field. Multiple Parser entries are allowed (one per line).

Preserve_Key

Keep original Key_Name field in the parsed result. If false, the field will be removed.

False

Reserve_Data

Keep all other original fields in the parsed result. If false, all other original fields will be removed.

False

Getting Started

Configuration File

This is an example of parsing a record {"data":"100 0.5 true This is example"}.

The plugin needs a parser file which defines how to parse each field.

The path of the parser file should be written in configuration file under the [SERVICE] section.

The output is

You can see the records {"data":"100 0.5 true This is example"} are parsed.

Preserve original fields

By default, the parser plugin only keeps the parsed fields in its output.

If you enable Reserve_Data, all other fields are preserved:

This will produce the output:

If you enable Reserved_Data and Preserve_Key, the original key field will be preserved as well:

This will produce the following output:

[PARSER]
    Name dummy_test
    Format regex
    Regex ^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$
[SERVICE]
    Parsers_File /path/to/parsers.conf

[INPUT]
    Name dummy
    Tag  dummy.data
    Dummy {"data":"100 0.5 true This is example"}

[FILTER]
    Name parser
    Match dummy.*
    Key_Name data
    Parser dummy_test

[OUTPUT]
    Name stdout
    Match *
$ fluent-bit -c dummy.conf
Fluent Bit v1.x.x
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2017/07/06 22:33:12] [ info] [engine] started
[0] dummy.data: [1499347993.001371317, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}]
[1] dummy.data: [1499347994.001303118, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}]
[2] dummy.data: [1499347995.001296133, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}]
[3] dummy.data: [1499347996.001320284, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}]
[PARSER]
    Name dummy_test
    Format regex
    Regex ^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$
[SERVICE]
    Parsers_File /path/to/parsers.conf

[INPUT]
    Name dummy
    Tag  dummy.data
    Dummy {"data":"100 0.5 true This is example", "key1":"value1", "key2":"value2"}

[FILTER]
    Name parser
    Match dummy.*
    Key_Name data
    Parser dummy_test
    Reserve_Data On
$ fluent-bit -c dummy.conf
Fluent-Bit v0.12.0
Copyright (C) Treasure Data

[2017/07/06 22:33:12] [ info] [engine] started
[0] dummy.data: [1499347993.001371317, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}, "key1":"value1", "key2":"value2"]
[1] dummy.data: [1499347994.001303118, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}, "key1":"value1", "key2":"value2"]
[2] dummy.data: [1499347995.001296133, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}, "key1":"value1", "key2":"value2"]
[3] dummy.data: [1499347996.001320284, {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example"}, "key1":"value1", "key2":"value2"]
[PARSER]
    Name dummy_test
    Format regex
    Regex ^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$
[SERVICE]
    Parsers_File /path/to/parsers.conf

[INPUT]
    Name dummy
    Tag  dummy.data
    Dummy {"data":"100 0.5 true This is example", "key1":"value1", "key2":"value2"}

[FILTER]
    Name parser
    Match dummy.*
    Key_Name data
    Parser dummy_test
    Reserve_Data On
    Preserve_Key On

[OUTPUT]
    Name stdout
    Match *
$ fluent-bit -c dummy.conf
Fluent Bit v2.1.1
* Copyright (C) 2015-2022 The Fluent Bit Authors
...
...
[0] dummy.data: [[1687122778.299116136, {}], {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example", "data"=>"100 0.5 true This is example", "key1"=>"value1", "key2"=>"value2"}]
[0] dummy.data: [[1687122779.296906553, {}], {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example", "data"=>"100 0.5 true This is example", "key1"=>"value1", "key2"=>"value2"}]
[0] dummy.data: [[1687122780.297475803, {}], {"INT"=>"100", "FLOAT"=>"0.5", "BOOL"=>"true", "STRING"=>"This is example", "data"=>"100 0.5 true This is example", "key1"=>"value1", "key2"=>"value2"}]

ECS Metadata

The ECS Filter Enriches logs with AWS Elastic Container Service Metadata. The plugin can enrich logs with task, cluster and container metadata. The plugin uses the to obtain metadata. This filter only works with the ECS EC2 launch type. The filter only works when Fluent Bit is running on an ECS EC2 Container Instance and has access to the ECS Agent introspection API. The filter is not supported on ECS Fargate. To obtain metadata on ECS Fargate, use the or the project.

Configuration Parameters

The plugin supports the following configuration parameters:

Rewrite Tag

Powerful and flexible routing

Tags are what makes possible. Tags are set in the configuration of the Input definitions where the records are generated, but there are certain scenarios where might be useful to modify the Tag in the pipeline so we can perform more advanced and flexible routing.

The rewrite_tag filter, allows to re-emit a record under a new Tag. Once a record has been re-emitted, the original record can be preserved or discarded.

How it Works

The way it works is defining rules that matches specific record key content against a regular expression, if a match exists, a new record with the defined Tag will be emitted, entering from the beginning of the pipeline.

Multiple rules can be specified and they are processed in order until one of them matches.

The new Tag to define can be composed by:

  • Alphabet characters & Numbers

  • Original Tag string or part of it

  • Regular Expressions groups capture

  • Any key or sub-key of the processed record

  • Environment variables

Configuration Parameters

The rewrite_tag filter supports the following configuration parameters:

Key
Description

Rule

Defines the matching criteria and the format of the Tag for the matching record. The Rule format have four components: KEY REGEX NEW_TAG KEEP. For more specific details of the Rule format and it composition read the next section.

Emitter_Name

When the filter emits a record under the new Tag, there is an internal emitter plugin that takes care of the job. Since this emitter expose metrics as any other component of the pipeline, you can use this property to configure an optional name for it.

Emitter_Storage.type

Define a buffering mechanism for the new records created. Note these records are part of the emitter plugin. This option support the values memory (default) or filesystem. If the destination for the new records generated might face backpressure due to latency or slow network, we strongly recommend enabling the filesystem mode.

Emitter_Mem_Buf_Limit

Set a limit on the amount of memory the tag rewrite emitter can consume if the outputs provide backpressure. The default for this limit is 10M. The pipeline will pause once the buffer exceeds the value of this setting. For example, if the value is set to 10M then the pipeline will pause if the buffer exceeds 10M. The pipeline will remain paused until the output drains the buffer below the 10M limit.

Rules

A rule aims to define matching criteria and specify how to create a new Tag for a record. You can define one or multiple rules in the same configuration section. The rules have the following format:

Key

The key represents the name of the record key that holds the value that we want to use to match our regular expression. A key name is specified and prefixed with a $. Consider the following structured record (formatted for readability):

If we wanted to match against the value of the key name we must use $name. The key selector is flexible enough to allow to match nested levels of sub-maps from the structure. If we wanted to check the value of the nested key s2 we can do it specifying $ss['s1']['s2'], for short:

  • $name = "abc-123"

  • $ss['s1']['s2'] = "flb"

Note that a key must point a value that contains a string, it's not valid for numbers, booleans, maps or arrays.

Regex

Using a simple regular expression we can specify a matching pattern to use against the value of the key specified above, also we can take advantage of group capturing to create custom placeholder values.

If we wanted to match any record that it $name contains a value of the format string-number like the example provided above, we might use:

Note that in our example we are using parentheses, this teams that we are specifying groups of data. If the pattern matches the value a placeholder will be created that can be consumed by the NEW_TAG section.

If $name equals abc-123 , then the following placeholders will be created:

  • $0 = "abc-123"

  • $1 = "abc"

  • $2 = "123"

If the Regular expression do not matches an incoming record, the rule will be skipped and the next rule (if any) will be processed.

New Tag

If a regular expression has matched the value of the defined key in the rule, we are ready to compose a new Tag for that specific record. The tag is a concatenated string that can contain any of the following characters: a-z,A-Z, 0-9 and .-,.

A Tag can take any string value from the matching record, the original tag it self, environment variable or general placeholder.

Consider the following incoming data on the rule:

  • Tag = aa.bb.cc

  • Record = {"name": "abc-123", "ss": {"s1": {"s2": "flb"}}}

  • Environment variable $HOSTNAME = fluent

With such information we could create a very custom Tag for our record like the following:

the expected Tag to generated will be:

We make use of placeholders, record content and environment variables.

Keep

If a rule matches a rule the filter will emit a copy of the record with the new defined Tag. The property keep takes a boolean value to define if the original record with the old Tag must be preserved and continue in the pipeline or just be discarded.

You can use true or false to decide the expected behavior. There is no default value and this is a mandatory field in the rule.

Configuration Example

The following configuration example will emit a dummy (hand-crafted) record, the filter will rewrite the tag, discard the old record and print the new record to the standard output interface:

The original tag test_tag will be rewritten as from.test_tag.new.fluent.bit.out:

Monitoring

As described in the Monitoring section, every component of the pipeline of Fluent Bit exposes metrics. The basic metrics exposed by this filter are drop_records and add_records, they summarize the total of dropped records from the incoming data chunk or the new records added.

Since rewrite_tag emit new records that goes through the beginning of the pipeline, it exposes an additional metric called emit_records that summarize the total number of emitted records.

Understanding the Metrics

Using the configuration provided above, if we query the metrics exposed in the HTTP interface we will see the following:

Command:

Metrics output:

The dummy input generated two records, the filter dropped two from the chunks and emitted two new ones under a different Tag.

The records generated are handled by the internal Emitter, so the new records are summarized in the Emitter metrics, take a look at the entry called emitter_for_rewrite_tag.0.

What is the Emitter ?

The Emitter is an internal Fluent Bit plugin that allows other components of the pipeline to emit custom records. On this case rewrite_tag creates an Emitter instance to use it exclusively to emit records, on that way we can have a granular control of who is emitting what.

The Emitter name in the metrics can be changed setting up the Emitter_Name configuration property described above.

routing
[SERVICE]
    Flush     1
    Log_Level info

[INPUT]
    NAME   dummy
    Dummy  {"tool": "fluent", "sub": {"s1": {"s2": "bit"}}}
    Tag    test_tag

[FILTER]
    Name          rewrite_tag
    Match         test_tag
    Rule          $tool ^(fluent)$  from.$TAG.new.$tool.$sub['s1']['s2'].out false
    Emitter_Name  re_emitted

[OUTPUT]
    Name   stdout
    Match  from.*
service:
    flush: 1
    log_level: info
pipeline:
    inputs:
        - name: dummy
          tag:  test_tag
          dummy: '{"tool": "fluent", "sub": {"s1": {"s2": "bit"}}}'
    filters:
        - name: rewrite_tag
          match: test_tag
          rule: $tool ^(fluent)$  from.$TAG.new.$tool.$sub['s1']['s2'].out false
          emitter_name: re_emitted
    outputs:
        - name: stdout
          match: from.*
$KEY  REGEX  NEW_TAG  KEEP
{
  "name": "abc-123",
  "ss": {
    "s1": {
      "s2": "flb"
    }
  }
}
^([a-z]+)-([0-9]+)$
newtag.$TAG.$TAG[1].$1.$ss['s1']['s2'].out.${HOSTNAME}
newtag.aa.bb.cc.bb.abc.flb.out.fluent
$ bin/fluent-bit -c example.conf
Fluent Bit v1.x.x
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

...
[0] from.test_tag.new.fluent.bit.out: [1580436933.000050569, {"tool"=>"fluent", "sub"=>{"s1"=>{"s2"=>"bit"}}}]
$ curl  http://127.0.0.1:2020/api/v1/metrics/ | jq
{
  "input": {
    "dummy.0": {
      "records": 2,
      "bytes": 80
    },
    "emitter_for_rewrite_tag.0": {
      "records": 1,
      "bytes": 40
    }
  },
  "filter": {
    "rewrite_tag.0": {
      "drop_records": 2,
      "add_records": 0,
      "emit_records": 2
    }
  },
  "output": {
    "stdout.0": {
      "proc_records": 1,
      "proc_bytes": 40,
      "errors": 0,
      "retries": 0,
      "retries_failed": 0
    }
  }
}
Key
Description
Default

Add

This parameter is similar to the ADD option in the . You can specify it any number of times and it takes two arguments, a KEY name and VALUE. The value uses Fluent Bit syntax to create a template that uses ECS Metadata values. See the list below for supported metadata templating keys. This option is designed to give you full power to control both the key names for metadata as well as the format for metadata values. See the examples below for more.

No default

ECS_Tag_Prefix

This parameter is similar to the Kube_Tag_Prefix option in the and performs the same function. The full log tag should be prefixed with this string and after the prefix the filter must find the next characters in the tag to be the Docker Container Short ID (the first 12 characters of the full container ID). The filter uses this to identify which container the log came from so it can find which task it is a part of. See the design section below for more information. If not specified, it defaults to empty string, meaning that the tag must be prefixed with the 12 character container short ID. If you just want to attach cluster metadata to system/OS logs from processes that do not run as part of containers or ECS Tasks, then do not set this parameter and enable the Cluster_Metadata_Only option

emptry string

Cluster_Metadata_Only

When enabled, the plugin will only attempt to attach cluster metadata values. This is useful if you want to attach cluster metadata to system/OS logs from processes that do not run as part of containers or ECS Tasks.

Off

ECS_Meta_Cache_TTL

The filter builds a hash table in memory mapping each unique container short ID to its metadata. This option sets a max TTL for objects in the hash table. You should set this if you have frequent container/task restarts. For example, your cluster runs short running batch jobs that complete in less than 10 minutes, there is no reason to keep any stored metadata longer than 10 minutes. So you would set this parameter to "10m".

1h (1 hour)

Supported Templating Variables for the ADD option

The following template variables can be used for values with the Add option. See the tutorial below for examples.

Variable
Description
Supported with Cluster_Metadata_Only On

$ClusterName

The ECS cluster name. Fluent Bit is running on EC2 instance(s) that are part of this cluster.

Yes

$ContainerInstanceARN

The full ARN of the ECS EC2 Container Instance. This is the instance that Fluent Bit is running on.

Yes

$ContainerInstanceID

The ID of the ECS EC2 Container Instance.

Yes

Configuration File

Example 1: Attach Task ID and cluster name to container logs

The output log should be similar to:

Example 2: Attach customized resource name to container logs

The output log would be similar to:

Notice that the template variables in the value for the resource key are separated by dot characters, only dots and commas (. and ,) can come after a template variable. For more information, please check the Record accessor limitation's section.

Example 3: Attach cluster metadata to non-container logs

This examples shows a use case for the Cluster_Metadata_Only option- attaching cluster metadata to ECS Agent logs.

ECS Agent introspection API
built-in FireLens metadata
AWS for Fluent Bit init
[INPUT]
    Name                tail
    Tag                 ecs.*
    Path                /var/lib/docker/containers/*/*.log
    Docker_Mode         On
    Docker_Mode_Flush   5
    Docker_Mode_Parser  container_firstline
    Parser              docker
    DB                  /var/fluent-bit/state/flb_container.db
    Mem_Buf_Limit       50MB
    Skip_Long_Lines     On
    Refresh_Interval    10
    Rotate_Wait         30
    storage.type        filesystem
    Read_From_Head      Off

[FILTER]
    Name ecs
    Match *
    ECS_Tag_Prefix ecs.var.lib.docker.containers.
    ADD ecs_task_id $TaskID
    ADD cluster $ClusterName

[OUTPUT]
    Name stdout
    Match *
    Format json_lines
{
    "date":1665003546.0,
    "log":"some message from your container",
    "ecs_task_id" "1234567890abcdefghijklmnop",
    "cluster": "your_cluster_name",
}
[INPUT]
    Name                tail
    Tag                 ecs.*
    Path                /var/lib/docker/containers/*/*.log
    Docker_Mode         On
    Docker_Mode_Flush   5
    Docker_Mode_Parser  container_firstline
    Parser              docker
    DB                  /var/fluent-bit/state/flb_container.db
    Mem_Buf_Limit       50MB
    Skip_Long_Lines     On
    Refresh_Interval    10
    Rotate_Wait         30
    storage.type        filesystem
    Read_From_Head      Off

[FILTER]
    Name ecs
    Match *
    ECS_Tag_Prefix ecs.var.lib.docker.containers.
    ADD resource $ClusterName.$TaskDefinitionFamily.$TaskID.$ECSContainerName

[OUTPUT]
    Name stdout
    Match *
    Format json_lines
{
    "date":1665003546.0,
    "log":"some message from your container",
    "resource" "cluster.family.1234567890abcdefghijklmnop.app",
}
[INPUT]
    Name                tail
    Tag                 ecsagent.*
    Path                /var/log/ecs/*
    DB                  /var/fluent-bit/state/flb_ecs.db
    Mem_Buf_Limit       50MB
    Skip_Long_Lines     On
    Refresh_Interval    10
    Rotate_Wait         30
    storage.type        filesystem
    # Collect all logs on instance
    Read_From_Head      On

[FILTER]
    Name ecs
    Match *
    Cluster_Metadata_Only On
    ADD cluster $ClusterName

[OUTPUT]
    Name stdout
    Match *
    Format json_lines

$ECSAgentVersion

The Version string of the ECS Agent that is running on the container instance.

Yes

$ECSContainerName

The name of the container from which the log originated. This is the name in your ECS Task Definition.

No

$DockerContainerName

The name of the container from which the log originated. This is the name obtained from Docker and is the name shown if you run docker ps on the instance.

No

$ContainerID

The ID of the container from which the log originated. This is the full 64 character long container ID.

No

$TaskDefinitionFamily

The family name of the task definition for the task from which the log originated.

No

$TaskDefinitionVersion

The version/revision of the task definition for the task from which the log originated.

No

$TaskID

The ID of the ECS Task from which the log originated.

No

$TaskARN

The full ARN of the ECS Task from which the log originated.

No

modify filter
record_accessor
Kubernetes filter

Multiline

Concatenate Multiline or Stack trace log messages. Available on Fluent Bit >= v1.8.2.

The Multiline Filter helps to concatenate messages that originally belong to one context but were split across multiple records or log lines. Common examples are stack traces or applications that print logs in multiple lines.

As part of the built-in functionality, without major configuration effort, you can enable one of ours built-in parsers with auto detection and multi format support:

  • go

  • python

  • ruby

  • java (Google Cloud Platform Java stacktrace format)

Some comments about this filter:

  • The usage of this filter depends on a previous configuration of a definition.

  • If you wish to concatenate messages read from a log file, it is highly recommended to use the multiline support in the itself. This is because performing concatenation while reading the log file is more performant. Concatenating messages originally split by Docker or CRI container engines, is supported in the .

This filter only performs buffering that persists across different Chunks when Buffer is enabled. Otherwise, the filter will process one Chunk at a time and is not suitable for most inputs which might send multiline messages in separate chunks.

When buffering is enabled, the filter does not immediately emit messages it receives. It uses the in_emitter plugin, same as the , and emits messages once they are fully concatenated, or a timeout is reached.

Since concatenated records are re-emitted to the head of the Fluent Bit log pipeline, you can not configure multiple multiline filter definitions that match the same tags. This will cause an infinite loop in the Fluent Bit pipeline; to use multiple parsers on the same logs, configure a single filter definitions with a comma separated list of parsers for multiline.parser. For more, see issue .

Secondly, for the same reason, the multiline filter should be the first filter. Logs will be re-emitted by the multiline filter to the head of the pipeline- the filter will ignore its own re-emitted records, but other filters won't. If there are filters before the multiline filter, they will be applied twice.

Configuration Parameters

The plugin supports the following configuration parameters:

Property
Description

Configuration Example

The following example aims to parse a log file called test.log that contains some full lines, a custom Java stacktrace and a Go stacktrace.

The following example files can be located at:

Example files content:

This is the primary Fluent Bit configuration file. It includes the parsers_multiline.conf and tails the file test.log by applying the multiline parsers multiline-regex-test and go. Then it sends the processing to the standard output.

This second file defines a multiline parser for the example. Note that a second multiline parser called go is used in fluent-bit.conf, but this one is a built-in parser.

An example file with multiline and multiformat content:

By running Fluent Bit with the given configuration file you will obtain:

The lines that did not match a pattern are not considered as part of the multiline message, while the ones that matched the rules were concatenated properly.

Docker Partial Message Use Case

When Fluent Bit is consuming logs from a container runtime, such as docker, these logs will be split above a certain limit, usually 16KB. If your application emits a 100K log line, it will be split into 7 partial messages. If you are using the to send the logs to Fluent Bit, they might look like this:

Fluent Bit can re-combine these logs that were split by the runtime and remove the partial message fields. The filter example below is for this use case.

The two options for mode are mutually exclusive in the filter. If you set the mode to partial_message then the multiline.parser option is not allowed.

Log to Metrics

Generate metrics from logs

The Log To Metrics Filter plugin allows you to generate log-derived metrics. It currently supports modes to count records, provide a gauge for field values or create a histogram. You can also match or exclude specific records based on regular expression patterns for values or nested values. This filter plugin does not actually act as a record filter and does not change or drop records. All records will pass this filter untouched and generated metrics will be emitted into a seperate metric pipeline.

Please note that this plugin is an experimental feature and is not recommended for production use. Configuration parameters and plugin functionality are subject to change without notice.

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Description
Mandatory
Value Format

Getting Started

The following example takes records from two dummy inputs and counts all messages passing through the log_to_metrics filter. It then generates metric records which are provided to the prometheus_exporter:

Configuration - Counter

You can then use e.g. curl command to retrieve the generated metric:

Configuration - Gauge

The gauge mode needs a value_field specified, where the current metric values are generated from. In this example we also apply a regex filter and enable the kubernetes_mode option:

You can then use e.g. curl command to retrieve the generated metric:

As you can see in the output, only one line is printed, as the records from the first input plugin are ignored, as they do not match the regex.

The filter also allows to use multiple rules which are applied in order, you can have many Regex and Exclude entries as required (see filter plugin).

If you execute the above curl command multiple times, you see, that in this example the metric value stays at 60, as the messages generated by the dummy plugin are not changing. In a real-world scenario the values would change and return the last processed value.

Metric label_values

As you can see, the label sets defined by add_label and label_field are added to the metric. The lines in the metric represent every combination of labels. Only actually used combinations are displayed here. To see this, you can add a dummy dummy input to your configuration.

The metric output would then look like:

You can also see, that all the kubernetes labels have been attached to the metric, accordingly.

Configuration - Histogram

Similar to the gauge mode, histogram needs a value_field specified, where the current metric values are generated from. In this example we also apply a regex filter and enable the kubernetes_mode option:

You can then use e.g. curl command to retrieve the generated metric:

As you can see in the output, there are per default the buckets 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 and +Inf, in which values are sorted into. A sum and a counter are also part of this metric. You can specify own buckets in the config, like in the following example:

Please note, that the +Inf bucket will always be included implicitly. The buckets in a histogram are cumulative, so a value added to one bucket will add to all larger buckets, too.

You can also see, that all the kubernetes labels have been attached to the metric, idential to the behavior of label_field described in . That results in two sets for the histogram.

Nest

The Nest Filter plugin allows you to operate on or with nested data. Its modes of operation are

  • nest - Take a set of records and place them in a map

  • lift - Take a map by key and lift its records up

emitter_name

Name for the emitter input instance which re-emits the completed records at the beginning of the pipeline.

emitter_storage.type

The storage type for the emitter input instance. This option supports the values memory (default) and filesystem.

emitter_mem_buf_limit

Set a limit on the amount of memory the emitter can consume if the outputs provide backpressure. The default for this limit is 10M. The pipeline will pause once the buffer exceeds the value of this setting. For example, if the value is set to 10M then the pipeline will pause if the buffer exceeds 10M. The pipeline will remain paused until the output drains the buffer below the 10M limit.

multiline.parser

Specify one or multiple Multiline Parser definitions to apply to the content. You can specify multiple multiline parsers to detect different formats by separating them with a comma.

multiline.key_content

Key name that holds the content to process. Note that a Multiline Parser definition can already specify the key_content to use, but this option allows to overwrite that value for the purpose of the filter.

mode

Mode can be parser for regex concat, or partial_message to concat split docker logs.

buffer

Enable buffered mode. In buffered mode, the filter can concatenate multilines from inputs that ingest records one by one (ex: Forward), rather than in chunks, re-emitting them into the beggining of the pipeline (with the same tag) using the in_emitter instance. With buffer off, this filter will not work with most inputs, except tail.

flush_ms

Flush time for pending multiline records. Defaults to 2000.

Multiline Parser
Tail plugin
Tail plugin
Rewrite Tag Filter
#5235
https://github.com/fluent/fluent-bit/tree/master/documentation/examples/multiline/filter_multiline
Fluentd Docker Log Driver
[SERVICE]
    flush                 1
    log_level             info
    parsers_file          parsers_multiline.conf

[INPUT]
    name                  tail
    path                  test.log
    read_from_head        true

[FILTER]
    name                  multiline
    match                 *
    multiline.key_content log
    multiline.parser      go, multiline-regex-test

[OUTPUT]
    name                  stdout
    match                 *
    
[MULTILINE_PARSER]
    name          multiline-regex-test
    type          regex
    flush_timeout 1000
    #
    # Regex rules for multiline parsing
    # ---------------------------------
    #
    # configuration hints:
    #
    #  - first state always has the name: start_state
    #  - every field in the rule must be inside double quotes
    #
    # rules |   state name  | regex pattern                  | next state
    # ------|---------------|--------------------------------------------
    rule      "start_state"   "/([A-Za-z]+ \d+ \d+\:\d+\:\d+)(.*)/"  "cont"
    rule      "cont"          "/^\s+at.*/"                     "cont"
    
single line...
Dec 14 06:41:08 Exception in thread "main" java.lang.RuntimeException: Something has gone wrong, aborting!
    at com.myproject.module.MyProject.badMethod(MyProject.java:22)
    at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)
    at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)
    at com.myproject.module.MyProject.someMethod(MyProject.java:10)
    at com.myproject.module.MyProject.main(MyProject.java:6)
another line...
panic: my panic

goroutine 4 [running]:
panic(0x45cb40, 0x47ad70)
  /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c
main.main.func1(0xc420024120)
  foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1
created by main.main
  foo.go:5 +0x58

goroutine 1 [chan receive]:
runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c
runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e
runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)
  /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4
runtime.chanrecv1(0xc420024120, 0x0)
  /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b
main.main()
  foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef
runtime.main()
  /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1

goroutine 2 [force gc (idle)]:
runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c
runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e
runtime.forcegchelper()
  /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1
created by runtime.init.4
  /usr/local/go/src/runtime/proc.go:227 +0x35

goroutine 3 [GC sweep wait]:
runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c
runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e
runtime.bgsweep(0xc42001e150)
  /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1
created by runtime.gcenable
  /usr/local/go/src/runtime/mgc.go:216 +0x58
one more line, no multiline
$ fluent-bit -c fluent-bit.conf 

[0] tail.0: [1626736433.143567481, {"log"=>"single line..."}]
[1] tail.0: [1626736433.143570538, {"log"=>"Dec 14 06:41:08 Exception in thread "main" java.lang.RuntimeException: Something has gone wrong, aborting!
    at com.myproject.module.MyProject.badMethod(MyProject.java:22)
    at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)
    at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)
    at com.myproject.module.MyProject.someMethod(MyProject.java:10)
    at com.myproject.module.MyProject.main(MyProject.java:6)"}]
[2] tail.0: [1626736433.143572538, {"log"=>"another line..."}]
[3] tail.0: [1626736433.143572894, {"log"=>"panic: my panic

goroutine 4 [running]:
panic(0x45cb40, 0x47ad70)
  /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c
main.main.func1(0xc420024120)
  foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1
created by main.main
  foo.go:5 +0x58

goroutine 1 [chan receive]:
runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c
runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e
runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)
  /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4
runtime.chanrecv1(0xc420024120, 0x0)
  /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b
main.main()
  foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef
runtime.main()
  /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1

goroutine 2 [force gc (idle)]:
runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c
runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e
runtime.forcegchelper()
  /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1
created by runtime.init.4
  /usr/local/go/src/runtime/proc.go:227 +0x35

goroutine 3 [GC sweep wait]:
runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c
runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e
runtime.bgsweep(0xc42001e150)
  /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1
created by runtime.gcenable
  /usr/local/go/src/runtime/mgc.go:216 +0x58"}]
[4] tail.0: [1626736433.143585473, {"log"=>"one more line, no multiline"}]
{"source": "stdout", "log": "... omitted for brevity...", "partial_message": "true", "partial_id": "dc37eb08b4242c41757d4cd995d983d1cdda4589193755a22fcf47a638317da0", "partial_ordinal": "1", "partial_last": "false", "container_id": "a96998303938eab6087a7f8487ca40350f2c252559bc6047569a0b11b936f0f2", "container_name": "/hopeful_taussig"}]
[FILTER]
     name                  multiline
     match                 *
     multiline.key_content log
     mode                  partial_message

Yes

metric_description

Sets a help text for the metric.

Yes

bucket

Defines a bucket for histogram

Yes, for mode histogram

e.g. 0.75

add_label

Add a custom label NAME and set the value to the value of KEY

label_field

Includes a record field as label dimension in the metric.

Name of record key. Supports notation for nested fields.

value_field

Specify the record field that holds a numerical value

Yes, for modes [gauge and histogram]

Name of record key. Supports notation for nested fields.

kubernetes_mode

If enabled, it will automatically put pod_id, pod_name, namespace_name, docker_id and container_name into the metric as labels. This option is intended to be used in combination with the filter plugin, which fills those fields.

Regex

Include records in which the content of KEY matches the regular expression.

KEY REGEX

Exclude

Exclude records in which the content of KEY matches the regular expression.

KEY REGEX

tag

Defines the tag for the generated metrics record

Yes

metric_mode

Defines the mode for the metric. Valid values are [counter, gauge or histogram]

Yes

metric_name

grep
the previous chapter

Sets the name of the metric.

Example usage (nest)

As an example using JSON notation, to nest keys matching the Wildcard value Key* under a new key NestKey the transformation becomes,

Example (input)

Example (output)

Example usage (lift)

As an example using JSON notation, to lift keys nested under the Nested_under value NestKey* the transformation becomes,

Example (input)

Example (output)

Configuration Parameters

The plugin supports the following configuration parameters:

Key
Value Format
Operation
Description

Operation

ENUM [nest or lift]

Select the operation nest or lift

Wildcard

FIELD WILDCARD

nest

Nest records which field matches the wildcard

Nest_under

FIELD STRING

nest

Getting Started

In order to start filtering records, you can run the filter from the command line or through the configuration file. The following invokes the Memory Usage Input Plugin, which outputs the following (example),

Example #1 - nest

Command Line

Note: Using the command line mode requires quotes parse the wildcard properly. The use of a configuration file is recommended.

The following command will load the mem plugin. Then the nest filter will match the wildcard rule to the keys and nest the keys matching Mem.* under the new key NEST.

Configuration File

Result

The output of both the command line and configuration invocations should be identical and result in the following output.

Example #2 - nest and lift undo

This example nests all Mem.* and Swap,* items under the Stats key and then reverses these actions with a lift operation. The output appears unchanged.

Configuration File

Result

Example #3 - nest 3 levels deep

This example takes the keys starting with Mem.* and nests them under LAYER1, which itself is then nested under LAYER2, which is nested under LAYER3.

Configuration File

Result

Example #4 - multiple nest and lift filters with prefix

This example starts with the 3-level deep nesting of Example 2 and applies the lift filter three times to reverse the operations. The end result is that all records are at the top level, without nesting, again. One prefix is added for each level that is lifted.

Configuration file

Result

Modify

The Modify Filter plugin allows you to change records using rules and conditions.

Example usage

As an example using JSON notation to,

  • Rename Key2 to RenamedKey

  • Add a key OtherKey with value Value3 if OtherKey does not yet exist

Example (input)

Example (output)

Configuration Parameters

Rules

The plugin supports the following rules:

Operation
Parameter 1
Parameter 2
Description
  • Rules are case insensitive, parameters are not

  • Any number of rules can be set in a filter instance.

  • Rules are applied in the order they appear, with each rule operating on the result of the previous rule.

Conditions

The plugin supports the following conditions:

Condition
Parameter
Parameter 2
Description
  • Conditions are case insensitive, parameters are not

  • Any number of conditions can be set.

  • Conditions apply to the whole filter instance and all its rules. Not to individual rules.

  • All conditions have to be true

Example #1 - Add and Rename

In order to start filtering records, you can run the filter from the command line or through the configuration file. The following invokes the , which outputs the following (example),

Using command Line

Note: Using the command line mode requires quotes parse the wildcard properly. The use of a configuration file is recommended.

Configuration File

Result

The output of both the command line and configuration invocations should be identical and result in the following output.

Example #2 - Conditionally Add and Remove

Configuration File

Result

Example #3 - Emoji

Configuration File

Result

[SERVICE]
    flush              1
    log_level          info

[INPUT]
    Name               dummy
    Dummy              {"message":"dummy", "kubernetes":{"namespace_name": "default", "docker_id": "abc123", "pod_name": "pod1", "container_name": "mycontainer", "pod_id": "def456", "labels":{"app": "app1"}}, "duration": 20, "color": "red", "shape": "circle"}
    Tag                dummy.log

[INPUT]
    Name               dummy
    Dummy              {"message":"hello", "kubernetes":{"namespace_name": "default", "docker_id": "abc123", "pod_name": "pod1", "container_name": "mycontainer", "pod_id": "def456", "labels":{"app": "app1"}}, "duration": 60, "color": "blue", "shape": "square"}
    Tag                dummy.log2

[FILTER]
    name               log_to_metrics
    match              dummy.log*
    tag                test_metric
    metric_mode        counter
    metric_name        count_all_dummy_messages
    metric_description This metric counts dummy messages

[OUTPUT]
    name               prometheus_exporter
    match              *
    host               0.0.0.0
    port               2021
> curl -s http://127.0.0.1:2021/metrics


# HELP log_metric_counter_count_all_dummy_messages This metric counts dummy messages
# TYPE log_metric_counter_count_all_dummy_messages counter
log_metric_counter_count_all_dummy_messages 49
[FILTER]
    name               log_to_metrics
    match              dummy.log*
    tag                test_metric
    metric_mode        gauge
    metric_name        current_duration
    metric_description This metric shows the current duration
    value_field        duration
    kubernetes_mode    on
    regex              message .*el.*
    add_label          app $kubernetes['labels']['app']
    label_field        color
    label_field        shape
> curl -s http://127.0.0.1:2021/metrics


# HELP log_metric_gauge_current_duration This metric shows the current duration
# TYPE log_metric_gauge_current_duration gauge
log_metric_gauge_current_duration{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="square"} 60
> curl -s http://127.0.0.1:2021/metrics

# HELP log_metric_gauge_current_duration This metric shows the current duration
# TYPE log_metric_gauge_current_duration gauge
log_metric_gauge_current_duration{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="square"} 60
log_metric_gauge_current_duration{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 20
[FILTER]
    name               log_to_metrics
    match              dummy.log*
    tag                test_metric
    metric_mode        histogram
    metric_name        current_duration
    metric_description This metric shows the request duration
    value_field        duration
    kubernetes_mode    on
    regex              message .*el.*
    add_label          app $kubernetes['labels']['app']
    label_field        color
    label_field        shape
> curl -s http://127.0.0.1:2021/metrics


# HELP log_metric_histogram_current_duration This metric shows the request duration
# TYPE log_metric_histogram_current_duration histogram
log_metric_histogram_current_duration_bucket{le="0.005",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.01",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.025",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.05",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.1",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.25",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.5",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="1.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="2.5",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="5.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="10.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="+Inf",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 28
log_metric_histogram_current_duration_sum{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 560
log_metric_histogram_current_duration_count{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="red",shape="circle"} 28
log_metric_histogram_current_duration_bucket{le="0.005",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.01",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.025",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.05",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.1",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.25",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="0.5",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="1.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="2.5",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="5.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="10.0",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 0
log_metric_histogram_current_duration_bucket{le="+Inf",namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 27
log_metric_histogram_current_duration_sum{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 1620
log_metric_histogram_current_duration_count{namespace_name="default",pod_name="pod1",container_name="mycontainer",docker_id="abc123",pod_id="def456",app="app1",color="blue",shape="circle"} 27
[FILTER]
    name               log_to_metrics
    match              dummy.log*
    tag                test_metric
    metric_mode        histogram
    metric_name        current_duration
    metric_description This metric shows the HTTP request duration as histogram in milliseconds
    value_field        duration
    kubernetes_mode    on
    bucket             1
    bucket             5
    bucket             10
    bucket             50
    bucket             100
    bucket             250
    bucket             500
    bucket             1000
    regex              message .*el.*
    label_field        color
    label_field        shape
[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard Mem.*
    Nest_under Memstats
    Remove_prefix Mem.
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: nest
          match: '*'
          operation: nest
          wildcard: Mem.*
          nest_under: Memstats
          remove_prefix: Mem.
    outputs:
        - name: stdout
          match: '*'
[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard Mem.*
    Wildcard Swap.*
    Nest_under Stats
    Add_prefix NESTED

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under Stats
    Remove_prefix NESTED
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: nest
          match: '*'
          Operation: nest
          Wildcard:
            - Mem.*
            - Swap.*
          Nest_under: Stats
          Add_prefix: NESTED
        - name: nest
          match: '*'
          Operation: lift
          Nested_under: Stats
          Remove_prefix: NESTED
    outputs:
        - name: stdout
          match: '*'
[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard Mem.*
    Nest_under LAYER1

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard LAYER1*
    Nest_under LAYER2

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard LAYER2*
    Nest_under LAYER3
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: Mem.*
          Nest_under: LAYER1
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: LAYER1*
          Nest_under: LAYER2
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: LAYER2*
          Nest_under: LAYER3
    outputs:
        - name: stdout
          match: '*'
[INPUT]
    Name mem
    Tag  mem.local

[OUTPUT]
    Name  stdout
    Match *

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard Mem.*
    Nest_under LAYER1

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard LAYER1*
    Nest_under LAYER2

[FILTER]
    Name nest
    Match *
    Operation nest
    Wildcard LAYER2*
    Nest_under LAYER3

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under LAYER3
    Add_prefix Lifted3_

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under Lifted3_LAYER2
    Add_prefix Lifted3_Lifted2_

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under Lifted3_Lifted2_LAYER1
    Add_prefix Lifted3_Lifted2_Lifted1_
pipeline:
    inputs:
        - name: mem
          tag: mem.local
    filters:
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: Mem.*
          Nest_under: LAYER1
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: LAYER1*
          Nest_under: LAYER2
        - name: nest
          match: '*'
          Operation: nest
          Wildcard: LAYER2*
          Nest_under: LAYER3
        - name: nest
          match: '*'
          Operation: lift
          Nested_under: LAYER3
          Add_prefix: Lifted3_
        - name: nest
          match: '*'
          Operation: lift
          Nested_under: Lifted3_LAYER2
          Add_prefix: Lifted3_Lifted2_
        - name: nest
          match: '*'
          Operation: lift
          Nested_under: Lifted3_Lifted2_LAYER1
          Add_prefix: Lifted3_Lifted2_Lifted1_
    outputs:
        - name: stdout
          match: '*'
{
  "Key1"     : "Value1",
  "Key2"     : "Value2",
  "OtherKey" : "Value3"
}
{
  "OtherKey" : "Value3"
  "NestKey"  : {
    "Key1"     : "Value1",
    "Key2"     : "Value2",
  }
}
{
  "OtherKey" : "Value3"
  "NestKey"  : {
    "Key1"     : "Value1",
    "Key2"     : "Value2",
  }
}
{
  "Key1"     : "Value1",
  "Key2"     : "Value2",
  "OtherKey" : "Value3"
}
[0] memory: [1488543156, {"Mem.total"=>1016044, "Mem.used"=>841388, "Mem.free"=>174656, "Swap.total"=>2064380, "Swap.used"=>139888, "Swap.free"=>1924492}]
$ bin/fluent-bit -i mem -p 'tag=mem.local' -F nest -p 'Operation=nest' -p 'Wildcard=Mem.*' -p 'Nest_under=Memstats' -p 'Remove_prefix=Mem.' -m '*' -o stdout
[2018/04/06 01:35:13] [ info] [engine] started
[0] mem.local: [1522978514.007359767, {"Swap.total"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Memstats"=>{"total"=>4050908, "used"=>714984, "free"=>3335924}}]
[2018/06/21 17:42:37] [ info] [engine] started (pid=17285)
[0] mem.local: [1529566958.000940636, {"Mem.total"=>8053656, "Mem.used"=>6940380, "Mem.free"=>1113276, "Swap.total"=>16532988, "Swap.used"=>1286772, "Swap.free"=>15246216}]
[0] mem.local: [1524795923.009867831, {"Swap.total"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "LAYER3"=>{"LAYER2"=>{"LAYER1"=>{"Mem.total"=>4050908, "Mem.used"=>1112036, "Mem.free"=>2938872}}}}]


{
  "Swap.total"=>1046524,
  "Swap.used"=>0,
  "Swap.free"=>1046524,
  "LAYER3"=>{
    "LAYER2"=>{
      "LAYER1"=>{
        "Mem.total"=>4050908,
        "Mem.used"=>1112036,
        "Mem.free"=>2938872
      }
    }
  }
}
[0] mem.local: [1524862951.013414798, {"Swap.total"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Lifted3_Lifted2_Lifted1_Mem.total"=>4050908, "Lifted3_Lifted2_Lifted1_Mem.used"=>1253912, "Lifted3_Lifted2_Lifted1_Mem.free"=>2796996}]


{
  "Swap.total"=>1046524, 
  "Swap.used"=>0, 
  "Swap.free"=>1046524, 
  "Lifted3_Lifted2_Lifted1_Mem.total"=>4050908, 
  "Lifted3_Lifted2_Lifted1_Mem.used"=>1253912, 
  "Lifted3_Lifted2_Lifted1_Mem.free"=>2796996
}

Nest records matching the Wildcard under this key

Nested_under

FIELD STRING

lift

Lift records nested under the Nested_under key

Add_prefix

FIELD STRING

ANY

Prefix affected keys with this string

Remove_prefix

FIELD STRING

ANY

Remove prefix from affected keys if it matches this string

Record Accessor
Record Accessor
kubernetes

Remove_wildcard

WILDCARD:KEY

NONE

Remove all key/value pairs with key matching wildcard KEY

Remove_regex

REGEXP:KEY

NONE

Remove all key/value pairs with key matching regexp KEY

Rename

STRING:KEY

STRING:RENAMED_KEY

Rename a key/value pair with key KEY to RENAMED_KEY if KEY exists AND RENAMED_KEY does not exist

Hard_rename

STRING:KEY

STRING:RENAMED_KEY

Rename a key/value pair with key KEY to RENAMED_KEY if KEY exists. If RENAMED_KEY already exists, this field is overwritten

Copy

STRING:KEY

STRING:COPIED_KEY

Copy a key/value pair with key KEY to COPIED_KEY if KEY exists AND COPIED_KEY does not exist

Hard_copy

STRING:KEY

STRING:COPIED_KEY

Copy a key/value pair with key KEY to COPIED_KEY if KEY exists. If COPIED_KEY already exists, this field is overwritten

Move_to_start

WILDCARD:KEY

NONE

Move key/value pairs with keys matching KEY to the start of the message

Move_to_end

WILDCARD:KEY

NONE

Move key/value pairs with keys matching KEY to the end of the message

No_key_matches

REGEXP:KEY

NONE

Is true if no key matches regex KEY

Key_value_equals

STRING:KEY

STRING:VALUE

Is true if KEY exists and its value is VALUE

Key_value_does_not_equal

STRING:KEY

STRING:VALUE

Is true if KEY exists and its value is not VALUE

Key_value_matches

STRING:KEY

REGEXP:VALUE

Is true if key KEY exists and its value matches VALUE

Key_value_does_not_match

STRING:KEY

REGEXP:VALUE

Is true if key KEY exists and its value does not match VALUE

Matching_keys_have_matching_values

REGEXP:KEY

REGEXP:VALUE

Is true if all keys matching KEY have values that match VALUE

Matching_keys_do_not_have_matching_values

REGEXP:KEY

REGEXP:VALUE

Is true if all keys matching KEY have values that do not match VALUE

for the rules to be applied.
  • You can set Record Accessor as STRING:KEY for nested key.

  • Set

    STRING:KEY

    STRING:VALUE

    Add a key/value pair with key KEY and value VALUE. If KEY already exists, this field is overwritten

    Add

    STRING:KEY

    STRING:VALUE

    Add a key/value pair with key KEY and value VALUE if KEY does not exist

    Remove

    STRING:KEY

    NONE

    Key_exists

    STRING:KEY

    NONE

    Is true if KEY exists

    Key_does_not_exist

    STRING:KEY

    NONE

    Is true if KEY does not exist

    A_key_matches

    REGEXP:KEY

    NONE

    Memory Usage Input Plugin

    Remove a key/value pair with key KEY if it exists

    Is true if a key matches regex KEY

    {
      "Key1"     : "Value1",
      "Key2"     : "Value2"
    }
    {
      "Key1"       : "Value1",
      "RenamedKey" : "Value2",
      "OtherKey"   : "Value3"
    }
    [0] memory: [1488543156, {"Mem.total"=>1016044, "Mem.used"=>841388, "Mem.free"=>174656, "Swap.total"=>2064380, "Swap.used"=>139888, "Swap.free"=>1924492}]
    [1] memory: [1488543157, {"Mem.total"=>1016044, "Mem.used"=>841420, "Mem.free"=>174624, "Swap.total"=>2064380, "Swap.used"=>139888, "Swap.free"=>1924492}]
    [2] memory: [1488543158, {"Mem.total"=>1016044, "Mem.used"=>841420, "Mem.free"=>174624, "Swap.total"=>2064380, "Swap.used"=>139888, "Swap.free"=>1924492}]
    [3] memory: [1488543159, {"Mem.total"=>1016044, "Mem.used"=>841420, "Mem.free"=>174624, "Swap.total"=>2064380, "Swap.used"=>139888, "Swap.free"=>1924492}]
    bin/fluent-bit -i mem \
      -p 'tag=mem.local' \
      -F modify \
      -p 'Add=Service1 SOMEVALUE' \
      -p 'Add=Service2 SOMEVALUE3' \
      -p 'Add=Mem.total2 TOTALMEM2' \
      -p 'Rename=Mem.free MEMFREE' \
      -p 'Rename=Mem.used MEMUSED' \
      -p 'Rename=Swap.total SWAPTOTAL' \
      -p 'Add=Mem.total TOTALMEM' \
      -m '*' \
      -o stdout
    [INPUT]
        Name mem
        Tag  mem.local
    
    [OUTPUT]
        Name  stdout
        Match *
    
    [FILTER]
        Name modify
        Match *
        Add Service1 SOMEVALUE
        Add Service3 SOMEVALUE3
        Add Mem.total2 TOTALMEM2
        Rename Mem.free MEMFREE
        Rename Mem.used MEMUSED
        Rename Swap.total SWAPTOTAL
        Add Mem.total TOTALMEM
    pipeline:
        inputs:
            - name: mem
              tag: mem.local
        filters:
            - name: modify
              match: '*'
              Add:
                - Service1 SOMEVALUE
                - Service3 SOMEVALUE3
                - Mem.total2 TOTALMEM2
                - Mem.total TOTALMEM
              Rename:
                - Mem.free MEMFREE
                - Mem.used MEMUSED
                - Swap.total SWAPTOTAL
        outputs:
            - name: stdout
              match: '*'
    [2018/04/06 01:35:13] [ info] [engine] started
    [0] mem.local: [1522980610.006892802, {"Mem.total"=>4050908, "MEMUSED"=>738100, "MEMFREE"=>3312808, "SWAPTOTAL"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Service1"=>"SOMEVALUE", "Service3"=>"SOMEVALUE3", "Mem.total2"=>"TOTALMEM2"}]
    [1] mem.local: [1522980611.000658288, {"Mem.total"=>4050908, "MEMUSED"=>738068, "MEMFREE"=>3312840, "SWAPTOTAL"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Service1"=>"SOMEVALUE", "Service3"=>"SOMEVALUE3", "Mem.total2"=>"TOTALMEM2"}]
    [2] mem.local: [1522980612.000307652, {"Mem.total"=>4050908, "MEMUSED"=>738068, "MEMFREE"=>3312840, "SWAPTOTAL"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Service1"=>"SOMEVALUE", "Service3"=>"SOMEVALUE3", "Mem.total2"=>"TOTALMEM2"}]
    [3] mem.local: [1522980613.000122671, {"Mem.total"=>4050908, "MEMUSED"=>738068, "MEMFREE"=>3312840, "SWAPTOTAL"=>1046524, "Swap.used"=>0, "Swap.free"=>1046524, "Service1"=>"SOMEVALUE", "Service3"=>"SOMEVALUE3", "Mem.total2"=>"TOTALMEM2"}]
    [INPUT]
        Name mem
        Tag  mem.local
        Interval_Sec 1
    
    [FILTER]
        Name    modify
        Match   mem.*
    
        Condition Key_Does_Not_Exist cpustats
        Condition Key_Exists Mem.used
    
        Set cpustats UNKNOWN
    
    [FILTER]
        Name    modify
        Match   mem.*
    
        Condition Key_Value_Does_Not_Equal cpustats KNOWN
    
        Add sourcetype memstats
    
    [FILTER]
        Name    modify
        Match   mem.*
    
        Condition Key_Value_Equals cpustats UNKNOWN
    
        Remove_wildcard Mem
        Remove_wildcard Swap
        Add cpustats_more STILL_UNKNOWN
    
    [OUTPUT]
        Name           stdout
        Match          *
    pipeline:
        inputs:
            - name: mem
              tag: mem.local
              interval_sec: 1
        filters:
            - name: modify
              match: mem.*
              Condition:
                - Key_Does_Not_Exist cpustats
                - Key_Exists Mem.used
              Set: cpustats UNKNOWN
            - name: modify
              match: mem.*
              Condition: Key_Value_Does_Not_Equal cpustats KNOWN
              Add: sourcetype memstats
            - name: modify
              match: mem.*
              Condition: Key_Value_Equals cpustats UNKNOWN
              Remove_wildcard:
                - Mem
                - Swap
              Add: cpustats_more STILL_UNKNOWN
        outputs:
            - name: stdout
              match: '*'
    [2018/06/14 07:37:34] [ info] [engine] started (pid=1493)
    [0] mem.local: [1528925855.000223110, {"cpustats"=>"UNKNOWN", "sourcetype"=>"memstats", "cpustats_more"=>"STILL_UNKNOWN"}]
    [1] mem.local: [1528925856.000064516, {"cpustats"=>"UNKNOWN", "sourcetype"=>"memstats", "cpustats_more"=>"STILL_UNKNOWN"}]
    [2] mem.local: [1528925857.000165965, {"cpustats"=>"UNKNOWN", "sourcetype"=>"memstats", "cpustats_more"=>"STILL_UNKNOWN"}]
    [3] mem.local: [1528925858.000152319, {"cpustats"=>"UNKNOWN", "sourcetype"=>"memstats", "cpustats_more"=>"STILL_UNKNOWN"}]
    [INPUT]
        Name mem
        Tag  mem.local
    
    [OUTPUT]
        Name  stdout
        Match *
    
    [FILTER]
        Name modify
        Match *
    
        Remove_Wildcard Mem
        Remove_Wildcard Swap
        Set This_plugin_is_on 🔥
        Set 🔥 is_hot
        Copy 🔥 💦
        Rename  💦 ❄️
        Set ❄️ is_cold
        Set 💦 is_wet
    pipeline:
        inputs:
            - name: mem
              tag: mem.local
              interval_sec: 1
        filters:
            - name: modify
              match: mem.*
              Remove_wildcard:
                - Mem
                - Swap
              Set:
                - This_plugin_is_on 🔥
                - 🔥 is_hot
              Copy: 🔥 💦
              Rename:  💦 ❄️
              Set:
                - ❄️ is_cold
                - 💦 is_wet
        outputs:
            - name: stdout
              match: '*'
    [2018/06/14 07:46:11] [ info] [engine] started (pid=21875)
    [0] mem.local: [1528926372.000197916, {"This_plugin_is_on"=>"🔥", "🔥"=>"is_hot", "❄️"=>"is_cold", "💦"=>"is_wet"}]
    [1] mem.local: [1528926373.000107868, {"This_plugin_is_on"=>"🔥", "🔥"=>"is_hot", "❄️"=>"is_cold", "💦"=>"is_wet"}]
    [2] mem.local: [1528926374.000181042, {"This_plugin_is_on"=>"🔥", "🔥"=>"is_hot", "❄️"=>"is_cold", "💦"=>"is_wet"}]
    [3] mem.local: [1528926375.000090841, {"This_plugin_is_on"=>"🔥", "🔥"=>"is_hot", "❄️"=>"is_cold", "💦"=>"is_wet"}]
    [0] mem.local: [1528926376.000610974, {"This_plugin_is_on"=>"🔥", "🔥"=>"is_hot", "❄️"=>"is_cold", "💦"=>"is_wet"}]

    Lua

    The Lua filter allows you to modify the incoming records (even split one record into multiple records) using custom Lua scripts.

    Due to the necessity to have a flexible filtering mechanism, it is now possible to extend Fluent Bit capabilities by writing custom filters using Lua programming language. A Lua-based filter takes two steps:

    1. Configure the Filter in the main configuration

    2. Prepare a Lua script that will be used by the Filter

    Configuration Parameters

    The plugin supports the following configuration parameters:

    Key
    Description

    Getting Started

    In order to test the filter, you can run the plugin from the command line or through the configuration file. The following examples use the input plugin for data ingestion, invoke Lua filter using the script and call the function which only prints the same information to the standard output:

    Command Line

    From the command line you can use the following options:

    Configuration File

    In your main configuration file append the following Input, Filter & Output sections:

    Lua Script Filter API

    The life cycle of a filter have the following steps:

    1. Upon Tag matching by this filter, it may process or bypass the record.

    2. If tag matched, it will accept the record and invoke the function defined in the call property which basically is the name of a function defined in the Lua script.

    3. Invoke Lua function and pass each record in JSON format.

    Callback Prototype

    The Lua script can have one or multiple callbacks that can be used by this filter. The function prototype is as follows:

    Function Arguments

    name
    description

    Return Values

    Each callback must return three values:

    name
    data type
    description

    Code Examples

    For functional examples of this interface, please refer to the code samples provided in the source code of the project located here:

    Inline configuration

    The include examples to verify during CI.

    In classic mode:

    Environment variable processing

    As an example that combines a bit of LUA processing with the that demonstrates using environment variables with LUA regex and substitutions.

    Kubernetes pods generally have various environment variables set by the infrastructure automatically which may contain useful information.

    In this example, we want to extract part of the Kubernetes cluster API name.

    The environment variable is set like so: KUBERNETES_SERVICE_HOST: api.sandboxbsh-a.project.domain.com

    We want to extract the sandboxbsh name and add it to our record as a special key.

    Number Type

    +Lua treats number as double. It means an integer field (e.g. IDs, log levels) will be converted double. To avoid type conversion, The type_int_key property is available.

    Protected Mode

    Fluent Bit supports protected mode to prevent crash when executes invalid Lua script. See also .

    Record Split

    The Lua callback function can return an array of tables (i.e., array of records) in its third record return value. With this feature, the Lua filter can split one input record into multiple records according to custom logic.

    For example:

    Lua script

    Configuration

    Input

    Output

    See also .

    Response code filtering

    In this example, we want to filter istio logs to exclude lines with response codes between 1 and 399. Istio is configured to write the logs in json format.

    Lua script

    Script response_code_filter.lua

    Configuration

    Configuration to get istio logs and apply response code filter to them.

    Input

    Output

    In the output only the messages with response code 0 or greater than 399 are shown.

    By default when the Lua script is invoked, the record timestamp is passed as a floating number which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys sec for seconds since epoch and nsec for nanoseconds.

    code

    Inline LUA code instead of loading from a path via script.

    enable_flb_null

    If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false.

    Upon return, validate return value and continue the pipeline.

    script

    Path to the Lua script that will be used. This can be a relative path against the main configuration file.

    call

    Lua function name that will be triggered to do filtering. It's assumed that the function is declared inside the script parameter defined above.

    type_int_key

    If these keys are matched, the fields are converted to integer. If more than one key, delimit by space. Note that starting from Fluent Bit v1.6 integer data types are preserved and not converted to double as in previous versions.

    type_array_key

    If these keys are matched, the fields are handled as array. If more than one key, delimit by space. It is useful the array can be empty.

    protected_mode

    If enabled, Lua script will be executed in protected mode. It prevents Fluent Bit from crashing when invalid Lua script is executed or the triggered Lua function throws exceptions. Default is true.

    tag

    Name of the tag associated with the incoming record.

    timestamp

    Unix timestamp with nanoseconds associated with the incoming record. The original format is a double (seconds.nanoseconds)

    record

    Lua table with the record content

    code

    integer

    The code return value represents the result and further action that may follows. If code equals -1, means that the record will be dropped. If code equals 0, the record will not be modified, otherwise if code equals 1, means the original timestamp and record have been modified so it must be replaced by the returned values from timestamp (second return value) and record (third return value). If code equals 2, means the original timestamp is not modified and the record has been modified so it must be replaced by the returned values from record (third return value). The code 2 is supported from v1.4.3.

    timestamp

    double

    If code equals 1, the original record timestamp will be replaced with this new value.

    record

    table

    If code equals 1, the original record information will be replaced with this new value. Note that the record value must be a valid Lua table. This value can be an array of tables (i.e., array of objects in JSON format), and in that case the input record is effectively split into multiple records. (see below for more details)

    dummy
    test.lua
    cb_print()
    https://github.com/fluent/fluent-bit/tree/master/scripts
    Fluent Bit smoke tests
    Kubernetes filter
    Error Handling in Application Code
    Fluent Bit: PR 811

    time_as_table

    $ fluent-bit -i dummy -F lua -p script=test.lua -p call=cb_print -m '*' -o null
    [INPUT]
        Name    dummy
    
    [FILTER]
        Name    lua
        Match   *
        script  test.lua
        call    cb_print
    
    [OUTPUT]
        Name    null
        Match   *
    function cb_print(tag, timestamp, record)
        ...
        return code, timestamp, record
    end
    service:
        flush:           1
        daemon:          off
        log_level:       info
    
    pipeline:
        inputs:
            - name:    random
              tag:     test
              samples: 10
    
        filters:
            - name:  lua
              match: "*"
              call:  append_tag
              code:  |
                  function append_tag(tag, timestamp, record)
                     new_record = record
                     new_record["tag"] = tag
                     return 1, timestamp, new_record
                  end
    
        outputs:
            - name:  stdout
              match: "*"
    [SERVICE]
    	flush 1
    	daemon off
    	log_level debug
    
    [INPUT]
    	Name random
    	Tag test
    	Samples 10
    
    [FILTER]
    	Name Lua
    	Match *
    	call append_tag
    	code function append_tag(tag, timestamp, record) new_record = record new_record["tag"] = tag return 1, timestamp, new_record end
    
    [OUTPUT]
    	Name stdout
    	Match *
          [FILTER]
              Name                lua
              Alias               filter-iots-lua
              Match               iots_thread.*
              Script              filters.lua
              Call                set_landscape_deployment
    
      filters.lua: |
        -- Use a Lua function to create some additional entries based
        -- on substrings from the kubernetes properties.
        function set_landscape_deployment(tag, timestamp, record)
            local landscape = os.getenv("KUBERNETES_SERVICE_HOST")
            if landscape then
                -- Strip the landscape name from this field, KUBERNETES_SERVICE_HOST
                -- Should be of this format
                -- api.sandboxbsh-a.project.domain.com
                -- Take off the leading "api."
                -- sandboxbsh-a.project.domain.com
                --print("landscape1:" .. landscape)
                landscape = landscape:gsub("^[^.]+.", "")
                --print("landscape2:" .. landscape)
                -- Take off everything including and after the - in the cluster name
                -- sandboxbsh
                landscape = landscape:gsub("-.*$", "")
                -- print("landscape3:" .. landscape)
                record["iot_landscape"] = landscape
            end
            -- 2 - replace existing record with this update
            return 2, timestamp, record
        end
    function cb_split(tag, timestamp, record)
        if record["x"] ~= nil then
            return 2, timestamp, record["x"]
        else
            return 2, timestamp, record
        end
    end
    [Input]
        Name    stdin
    
    [Filter]
        Name    lua
        Match   *
        script  test.lua
        call    cb_split
    
    [Output]
        Name    stdout
        Match   *
    {"x": [ {"a1":"aa", "z1":"zz"}, {"b1":"bb", "x1":"xx"}, {"c1":"cc"} ]}
    {"x": [ {"a2":"aa", "z2":"zz"}, {"b2":"bb", "x2":"xx"}, {"c2":"cc"} ]}
    {"a3":"aa", "z3":"zz", "b3":"bb", "x3":"xx", "c3":"cc"}
    [0] stdin.0: [1538435928.310583591, {"a1"=>"aa", "z1"=>"zz"}]
    [1] stdin.0: [1538435928.310583591, {"x1"=>"xx", "b1"=>"bb"}]
    [2] stdin.0: [1538435928.310583591, {"c1"=>"cc"}]
    [3] stdin.0: [1538435928.310588359, {"z2"=>"zz", "a2"=>"aa"}]
    [4] stdin.0: [1538435928.310588359, {"b2"=>"bb", "x2"=>"xx"}]
    [5] stdin.0: [1538435928.310588359, {"c2"=>"cc"}]
    [6] stdin.0: [1538435928.310589790, {"z3"=>"zz", "x3"=>"xx", "c3"=>"cc", "a3"=>"aa", "b3"=>"bb"}]
    function cb_response_code_filter(tag, timestamp, record)
      response_code = record["response_code"]
      if (response_code == nil or response_code == '') then
        return 0,0,0
      elseif (response_code ~= 0 and response_code < 400) then
        return -1,0,0
      else
        return 0,0,0
      end
    end
        [INPUT]
            Name                tail
            Path                /var/log/containers/*_istio-proxy-*.log
            multiline.parser    docker, cri
            Tag                 istio.*
            Mem_Buf_Limit       64MB
            Skip_Long_Lines     Off
    
        [FILTER]
            Name                lua
            Match               istio.*
            Script              response_code_filter.lua
            call                cb_response_code_filter
    
        [Output]
            Name                stdout
            Match               *
    {
        "log": {
            "response_code": 200,
            "bytes_sent": 111328341,
            "authority": "randomservice.randomservice",
            "duration": 14493,
            "request_id": "2e9d38f8-36a9-40a6-bdb2-47c8eb7d399d",
            "upstream_local_address": "10.11.82.178:42738",
            "downstream_local_address": "10.10.21.17:80",
            "upstream_cluster": "outbound|80||randomservice.svc.cluster.local",
            "x_forwarded_for": null,
            "route_name": "default",
            "upstream_host": "10.11.6.90:80",
            "user_agent": "RandomUserAgent",
            "response_code_details": "via_upstream",
            "downstream_remote_address": "10.11.82.178:51096",
            "bytes_received": 1148,
            "path": "/?parameter=random",
            "response_flags": "-",
            "start_time": "2022-07-28T11:16:51.663Z",
            "upstream_transport_failure_reason": null,
            "method": "POST",
            "connection_termination_details": null,
            "protocol": "HTTP/1.1",
            "requested_server_name": null,
            "upstream_service_time": "6161"
        },
        "stream": "stdout",
        "time": "2022-07-28T11:17:06.704109897Z"
    }

    Kubernetes

    Fluent Bit Kubernetes Filter allows to enrich your log files with Kubernetes metadata.

    When Fluent Bit is deployed in Kubernetes as a DaemonSet and configured to read the log files from the containers (using tail or systemd input plugins), this filter aims to perform the following operations:

    • Analyze the Tag and extract the following metadata:

      • Pod Name

      • Namespace

      • Container Name

      • Container ID

    • Query Kubernetes API Server to obtain extra metadata for the POD in question:

      • Pod ID

      • Labels

      • Annotations

    The data is cached locally in memory and appended to each record.

    Configuration Parameters

    The plugin supports the following configuration parameters:

    Key
    Description
    Default

    Processing the 'log' value

    Kubernetes Filter aims to provide several ways to process the data contained in the log key. The following explanation of the workflow assumes that your original Docker parser defined in parsers.conf is as follows:

    Since Fluent Bit v1.2 we are not suggesting the use of decoders (Decode_Field_As) if you are using Elasticsearch database in the output to avoid data type conflicts.

    To perform processing of the log key, it's mandatory to enable the Merge_Log configuration property in this filter, then the following processing order will be done:

    • If a Pod suggest a parser, the filter will use that parser to process the content of log.

    • If the option Merge_Parser was set and the Pod did not suggest a parser, process the log content using the suggested parser in the configuration.

    • If no Pod was suggested and no Merge_Parser is set, try to handle the content as JSON.

    If log value processing fails, the value is untouched. The order above is not chained, meaning it's exclusive and the filter will try only one of the options above, not all of them.

    Kubernetes Annotations

    A flexible feature of Fluent Bit Kubernetes filter is that allow Kubernetes Pods to suggest certain behaviors for the log processor pipeline when processing the records. At the moment it support:

    • Suggest a pre-defined parser

    • Request to exclude logs

    The following annotations are available:

    Annotation
    Description
    Default

    Annotation Examples in Pod definition

    Suggest a parser

    The following Pod definition runs a Pod that emits Apache logs to the standard output, in the Annotations it suggest that the data should be processed using the pre-defined parser called apache:

    Request to exclude logs

    There are certain situations where the user would like to request that the log processor simply skip the logs from the Pod in question:

    Note that the annotation value is boolean which can take a true or false and must be quoted.

    Workflow of Tail + Kubernetes Filter

    Kubernetes Filter depends on either or input plugins to process and enrich records with Kubernetes metadata. Here we will explain the workflow of Tail and how it configuration is correlated with Kubernetes filter. Consider the following configuration example (just for demo purposes, not production):

    In the input section, the plugin will monitor all files ending in .log in path /var/log/containers/. For every file it will read every line and apply the docker parser. Then the records are emitted to the next step with an expanded tag.

    Tail support Tags expansion, which means that if a tag have a star character (*), it will replace the value with the absolute path of the monitored file, so if you file name and path is:

    then the Tag for every record of that file becomes:

    note that slashes are replaced with dots.

    When runs, it will try to match all records that starts with kube. (note the ending dot), so records from the file mentioned above will hit the matching rule and the filter will try to enrich the records

    Kubernetes Filter do not care from where the logs comes from, but it cares about the absolute name of the monitored file, because that information contains the pod name and namespace name that are used to retrieve associated metadata to the running Pod from the Kubernetes Master/API Server.

    If you have large pod specifications (can be caused by large numbers of environment variables, etc.), be sure to increase the Buffer_Size parameter of the kubernetes filter. If object sizes exceed this buffer, some metadata will fail to be injected to the logs.

    If the configuration property Kube_Tag_Prefix was configured (available on Fluent Bit >= 1.1.x), it will use that value to remove the prefix that was appended to the Tag in the previous Input section. Note that the configuration property defaults to kube.var.logs.containers. , so the previous Tag content will be transformed from:

    to:

    the transformation above do not modify the original Tag, just creates a new representation for the filter to perform metadata lookup.

    that new value is used by the filter to lookup the pod name and namespace, for that purpose it uses an internal Regular expression:

    If you want to know more details, check the source code of that definition .

    You can see on web site how this operation is performed, check the following demo link:

    Custom Regex

    Under certain and not common conditions, a user would want to alter that hard-coded regular expression, for that purpose the option Regex_Parser can be used (documented on top).

    Final Comments

    So at this point the filter is able to gather the values of pod_name and namespace, with that information it will check in the local cache (internal hash table) if some metadata for that key pair exists, if so, it will enrich the record with the metadata value, otherwise it will connect to the Kubernetes Master/API Server and retrieve that information.

    Optional Feature: Using Kubelet to Get Metadata

    There is an reported about kube-apiserver fall over and become unresponsive when cluster is too large and too many requests are sent to it. For this feature, fluent bit Kubernetes filter will send the request to kubelet /pods endpoint instead of kube-apiserver to retrieve the pods information and use it to enrich the log. Since Kubelet is running locally in nodes, the request would be responded faster and each node would only get one request one time. This could save kube-apiserver power to handle other requests. When this feature is enabled, you should see no difference in the kubernetes metadata added to logs, but the Kube-apiserver bottleneck should be avoided when cluster is large.

    Configuration Setup

    There are some configuration setup needed for this feature.

    Role Configuration for Fluent Bit DaemonSet Example:

    The difference is that kubelet need a special permission for resource nodes/proxy to get HTTP request in. When creating the role or clusterRole, you need to add nodes/proxy into the rule for resource.

    Fluent Bit Configuration Example:

    So for fluent bit configuration, you need to set the Use_Kubelet to true to enable this feature.

    DaemonSet config Example:

    The key point is to set hostNetwork to true and dnsPolicy to ClusterFirstWithHostNet that fluent bit DaemonSet could call Kubelet locally. Otherwise it could not resolve the dns for kubelet.

    Now you are good to use this new feature!

    Verify that the Use_Kubelet option is working

    Basically you should see no difference about your experience for enriching your log files with Kubernetes metadata.

    To check if Fluent Bit is using the kubelet, you can check fluent bit logs and there should be a log like this:

    And if you are in debug mode, you could see more:

    Troubleshooting

    The following section goes over specific log messages you may run into and how to solve them to ensure that Fluent Bit's Kubernetes filter is operating properly

    I can't see metadata appended to my pod or other Kubernetes objects

    If you are not seeing metadata added to your kubernetes logs and see the following in your log message, then you may be facing connectivity issues with the Kubernetes API server.

    Potential fix #1: Check Kubernetes roles

    When Fluent Bit is deployed as a DaemonSet it generally runs with specific roles that allow the application to talk to the Kubernetes API server. If you are deployed in a more restricted environment check that all the Kubernetes roles are set correctly.

    You can test this by running the following command (replace fluentbit-system with the namespace where your fluentbit is installed)

    If set roles are configured correctly, it should simply respond with yes.

    For instance, using Azure AKS, running the above command may respond with:

    If you have connectivity to the API server, but still "could not get meta for POD" - debug logging might give you a message with Azure does not have opinion for this user. Then the following subject may need to be included in the fluentbit ClusterRoleBinding:

    appended to subjects array:

    Potential fix #2: Check Kubernetes IPv6

    There may be cases where you have IPv6 on in the environment and you need to enable this within Fluent Bit. Under the service tag please set the following option ipv6 to on .

    Potential fix #3: Check connectivity to Kube_URL

    By default the Kube_URL is set to https://kubernetes.default.svc:443 . Ensure that you have connectivity to this endpoint from within the cluster and that there are no special permission interfering with the connection.

    I can't see new objects getting metadata

    In some cases, you may only see some objects being appended with metadata while other objects are not enriched. This can occur at times when local data is cached and does not contain the correct id for the kubernetes object that requires enrichment. For most Kubernetes objects the Kubernetes API server is updated which will then be reflected in Fluent Bit logs, however in some cases for Pod objects this refresh to the Kubernetes API server can be skipped, causing metadata to be skipped.

    Absolute path to scan for certificate files

    Kube_Token_File

    Token file

    /var/run/secrets/kubernetes.io/serviceaccount/token

    Kube_Tag_Prefix

    When the source records comes from Tail input plugin, this option allows to specify what's the prefix used in Tail configuration.

    kube.var.log.containers.

    Merge_Log

    When enabled, it checks if the log field content is a JSON string map, if so, it append the map fields as part of the log structure.

    Off

    Merge_Log_Key

    When Merge_Log is enabled, the filter tries to assume the log field from the incoming message is a JSON string message and make a structured representation of it at the same level of the log field in the map. Now if Merge_Log_Key is set (a string name), all the new structured fields taken from the original log content are inserted under the new key.

    Merge_Log_Trim

    When Merge_Log is enabled, trim (remove possible \n or \r) field values.

    On

    Merge_Parser

    Optional parser name to specify how to parse the data contained in the log key. Recommended use is for developers or testing only.

    Keep_Log

    When Keep_Log is disabled, the log field is removed from the incoming message once it has been successfully merged (Merge_Log must be enabled as well).

    On

    tls.debug

    Debug level between 0 (nothing) and 4 (every detail).

    -1

    tls.verify

    When enabled, turns on certificate validation when connecting to the Kubernetes API server.

    On

    Use_Journal

    When enabled, the filter reads logs coming in Journald format.

    Off

    Cache_Use_Docker_Id

    When enabled, metadata will be fetched from K8s when docker_id is changed.

    Off

    Regex_Parser

    Set an alternative Parser to process record Tag and extract pod_name, namespace_name, container_name and docker_id. The parser must be registered in a (refer to parser filter-kube-test as an example).

    K8S-Logging.Parser

    Allow Kubernetes Pods to suggest a pre-defined Parser (read more about it in Kubernetes Annotations section)

    Off

    K8S-Logging.Exclude

    Allow Kubernetes Pods to exclude their logs from the log processor (read more about it in Kubernetes Annotations section).

    Off

    Labels

    Include Kubernetes resource labels in the extra metadata.

    On

    Annotations

    Include Kubernetes resource annotations in the extra metadata.

    On

    Kube_meta_preload_cache_dir

    If set, Kubernetes meta-data can be cached/pre-loaded from files in JSON format in this directory, named as namespace-pod.meta

    Dummy_Meta

    If set, use dummy-meta data (for test/dev purposes)

    Off

    DNS_Retries

    DNS lookup retries N times until the network start working

    6

    DNS_Wait_Time

    DNS lookup interval between network status checks

    30

    Use_Kubelet

    this is an optional feature flag to get metadata information from kubelet instead of calling Kube Server API to enhance the log. This could mitigate the .

    Off

    Kubelet_Port

    kubelet port using for HTTP request, this only works when Use_Kubelet set to On.

    10250

    Kubelet_Host

    kubelet host using for HTTP request, this only works when Use_Kubelet set to On.

    127.0.0.1

    Kube_Meta_Cache_TTL

    configurable TTL for K8s cached metadata. By default, it is set to 0 which means TTL for cache entries is disabled and cache entries are evicted at random when capacity is reached. In order to enable this option, you should set the number to a time interval. For example, set this value to 60 or 60s and cache entries which have been created more than 60s will be evicted.

    0

    Kube_Token_TTL

    configurable 'time to live' for the K8s token. By default, it is set to 600 seconds. After this time, the token is reloaded from Kube_Token_File or the Kube_Token_Command.

    600

    Kube_Token_Command

    Command to get Kubernetes authorization token. By default, it will be NULL and we will use token file to get token. If you want to manually choose a command to get it, you can set the command here. For example, run aws-iam-authenticator -i your-cluster-name token --token-only to set token. This option is currently Linux-only.

    Buffer_Size

    Set the buffer size for HTTP client when reading responses from Kubernetes API server. The value must be according to the Unit Size specification. A value of 0 results in no limit, and the buffer will expand as-needed. Note that if pod specifications exceed the buffer limit, the API response will be discarded when retrieving metadata, and some kubernetes metadata will fail to be injected to the logs.

    32k

    Kube_URL

    API Server end-point

    https://kubernetes.default.svc:443

    Kube_CA_File

    CA certificate file

    /var/run/secrets/kubernetes.io/serviceaccount/ca.crt

    fluentbit.io/parser[_stream][-container]

    Suggest a pre-defined parser. The parser must be registered already by Fluent Bit. This option will only be processed if Fluent Bit configuration (Kubernetes Filter) have enabled the option K8S-Logging.Parser. If present, the stream (stdout or stderr) will restrict that specific stream. If present, the container can override a specific container in a Pod.

    fluentbit.io/exclude[_stream][-container]

    Request to Fluent Bit to exclude or not the logs generated by the Pod. This option will only be processed if Fluent Bit configuration (Kubernetes Filter) have enabled the option K8S-Logging.Exclude.

    False

    Tail
    Systemd
    Tail
    Kubernetes Filter
    here
    Rublar.com
    https://rubular.com/r/HZz3tYAahj6JCd
    issue

    Kube_CA_Path

    [PARSER]
        Name         docker
        Format       json
        Time_Key     time
        Time_Format  %Y-%m-%dT%H:%M:%S.%L
        Time_Keep    On
    apiVersion: v1
    kind: Pod
    metadata:
      name: apache-logs
      labels:
        app: apache-logs
      annotations:
        fluentbit.io/parser: apache
    spec:
      containers:
      - name: apache
        image: edsiper/apache_logs
    apiVersion: v1
    kind: Pod
    metadata:
      name: apache-logs
      labels:
        app: apache-logs
      annotations:
        fluentbit.io/exclude: "true"
    spec:
      containers:
      - name: apache
        image: edsiper/apache_logs
    [INPUT]
        Name    tail
        Tag     kube.*
        Path    /var/log/containers/*.log
        Parser  docker
    
    [FILTER]
        Name             kubernetes
        Match            kube.*
        Kube_URL         https://kubernetes.default.svc:443
        Kube_CA_File     /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File  /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix  kube.var.log.containers.
        Merge_Log        On
        Merge_Log_Key    log_processed
    /var/log/container/apache-logs-annotated_default_apache-aeeccc7a9f00f6e4e066aeff0434cf80621215071f1b20a51e8340aa7c35eac6.log
    kube.var.log.containers.apache-logs-annotated_default_apache-aeeccc7a9f00f6e4e066aeff0434cf80621215071f1b20a51e8340aa7c35eac6.log
    kube.var.log.containers.apache-logs-annotated_default_apache-aeeccc7a9f00f6e4e066aeff0434cf80621215071f1b20a51e8340aa7c35eac6.log
    apache-logs-annotated_default_apache-aeeccc7a9f00f6e4e066aeff0434cf80621215071f1b20a51e8340aa7c35eac6.log
    (?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$
    ---
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: fluentbitds
      namespace: fluentbit-system
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRole
    metadata:
      name: fluentbit
    rules:
      - apiGroups: [""]
        resources:
          - namespaces
          - pods
          - nodes
          - nodes/proxy
        verbs: 
          - get
          - list
          - watch
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: fluentbit
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: ClusterRole
      name: fluentbit
    subjects:
      - kind: ServiceAccount
        name: fluentbitds
        namespace: fluentbit-system
        [INPUT]
            Name              tail
            Tag               kube.*
            Path              /var/log/containers/*.log
            DB                /var/log/flb_kube.db
            Parser            docker
            Docker_Mode       On
            Mem_Buf_Limit     50MB
            Skip_Long_Lines   On
            Refresh_Interval  10
    
        [FILTER]
            Name                kubernetes
            Match               kube.*
            Kube_URL            https://kubernetes.default.svc.cluster.local:443
            Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
            Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
            Merge_Log           On
            Buffer_Size         0
            Use_Kubelet         true
            Kubelet_Port        10250
    ---
    apiVersion: apps/v1
    kind: DaemonSet
    metadata:
      name: fluentbit
      namespace: fluentbit-system
      labels:
        app.kubernetes.io/name: fluentbit
    spec:
      selector:
        matchLabels:
          name: fluentbit
      template:
        metadata:
          labels:
            name: fluentbit
        spec:
          serviceAccountName: fluentbitds
          containers:
            - name: fluent-bit
              imagePullPolicy: Always
              image: fluent/fluent-bit:latest
              volumeMounts:
                - name: varlog
                  mountPath: /var/log
                - name: varlibdockercontainers
                  mountPath: /var/lib/docker/containers
                  readOnly: true
                - name: fluentbit-config
                  mountPath: /fluent-bit/etc/
              resources:
                limits:
                  memory: 1500Mi
                requests:
                  cpu: 500m
                  memory: 500Mi
          hostNetwork: true
          dnsPolicy: ClusterFirstWithHostNet
          volumes:
            - name: varlog
              hostPath:
                path: /var/log
            - name: varlibdockercontainers
              hostPath:
                path: /var/lib/docker/containers
            - name: fluentbit-config
              configMap:
                name: fluentbit-config
    [ info] [filter:kubernetes:kubernetes.0] testing connectivity with Kubelet...
    [debug] [filter:kubernetes:kubernetes.0] Send out request to Kubelet for pods information.
    [debug] [filter:kubernetes:kubernetes.0] Request (ns=<namespace>, pod=node name) http_do=0, HTTP Status: 200
    [ info] [filter:kubernetes:kubernetes.0] connectivity OK
    [2021/02/05 10:33:35] [debug] [filter:kubernetes:kubernetes.0] Request (ns=<Namespace>, pod=<podName>) http_do=0, HTTP Status: 200
    [2021/02/05 10:33:35] [debug] [filter:kubernetes:kubernetes.0] kubelet find pod: <podName> and ns: <Namespace> match
    [2020/10/15 03:48:57] [ info] [filter_kube] testing connectivity with API server...
    [2020/10/15 03:48:57] [error] [filter_kube] upstream connection error
    [2020/10/15 03:48:57] [ warn] [filter_kube] could not get meta for POD
    kubectl auth can-i list pods --as=system:serviceaccount:fluentbit-system:fluentbit
    no - Azure does not have opinion for this user.
    - apiGroup: rbac.authorization.k8s.io
      kind: Group
      name: system:serviceaccounts
    parsers file
    Kube API heavy traffic issue for large cluster