Kafka
Kafka output plugin allows to ingest your records into an Apache Kafka service. This plugin use the official librdkafka C library (built-in dependency)

Configuration Parameters

Key
Description
default
format
Specify data format, options available: json, msgpack.
json
message_key
Optional key to store the message
message_key_field
If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set).
timestamp_key
Set the key to store the record timestamp
@timestamp
timestamp_format
'iso8601' or 'double'
double
brokers
Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092.
topics
Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used.
fluent-bit
topic_key
If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is router and the record is {"key1": 123, "router": "route_2"}, Fluent Bit will use topic route_2. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used.
dynamic_topic
adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured
Off
queue_full_retries
Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The queue_full_retries option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the queue_full_retries value to 0 set's an unlimited number of retries.
10
rdkafka.{property}
{property} can be any librdkafka properties
Setting rdkafka.log.connection.close to false and rdkafka.request.required.acks to 1 are examples of recommended settings of librdfkafka properties.

Getting Started

In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:

Command Line

The kafka plugin, can read the parameters from the command line in two ways, through the -p argument (property), e.g:
1
$ fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test
Copied!

Configuration File

In your main configuration file append the following Input & Output sections:
1
[INPUT]
2
Name cpu
3
4
[OUTPUT]
5
Name kafka
6
Match *
7
Brokers 192.168.1.3:9092
8
Topics test
Copied!

Avro Support

Fluent-bit comes with support for avro encoding for the out_kafka plugin. Avro support is optional and must be activated at build-time by using a build def with cmake: -DFLB_AVRO_ENCODER=On such as in the following example which activates:
  • out_kafka with avro encoding
  • fluent-bit's prometheus
  • metrics via an embedded http endpoint
  • debugging support
  • builds the test suites
1
cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../
Copied!

Kafka Configuration File with Avro Encoding

This is example fluent-bit config tails kubernetes logs, decorates the log lines with kubernetes metadata via the kubernetes filter, and then sends the fully decorated log lines to a kafka broker encoded with a specific avro schema.
1
[INPUT]
2
Name tail
3
Tag kube.*
4
Alias some-alias
5
Path /logdir/*.log
6
DB /dbdir/some.db
7
Skip_Long_Lines On
8
Refresh_Interval 10
9
Parser some-parser
10
11
[FILTER]
12
Name kubernetes
13
Match kube.*
14
Kube_URL https://some_kube_api:443
15
Kube_CA_File /certs/ca.crt
16
Kube_Token_File /tokens/token
17
Kube_Tag_Prefix kube.var.log.containers.
18
Merge_Log On
19
Merge_Log_Key log_processed
20
21
[OUTPUT]
22
Name kafka
23
Match *
24
Brokers 192.168.1.3:9092
25
Topics test
26
Schema_str {"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}
27
Schema_id some_schema_id
28
rdkafka.client.id some_client_id
29
rdkafka.debug All
30
rdkafka.enable.ssl.certificate.verification true
31
32
rdkafka.ssl.certificate.location /certs/some.cert
33
rdkafka.ssl.key.location /certs/some.key
34
rdkafka.ssl.ca.location /certs/some-bundle.crt
35
rdkafka.security.protocol ssl
36
rdkafka.request.required.acks 1
37
rdkafka.log.connection.close false
38
39
Format avro
40
rdkafka.log_level 7
41
rdkafka.metadata.broker.list 192.168.1.3:9092
Copied!
Last modified 1mo ago