Kafka
Kafka output plugin allows to ingest your records into an Apache Kafka service. This plugin use the official librdkafka C library (built-in dependency)
Configuration Parameters
Key | Description | default |
---|---|---|
format | Specify data format, options available: json, msgpack. | json |
message_key | Optional key to store the message | |
message_key_field | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | |
timestamp_key | Set the key to store the record timestamp | @timestamp |
timestamp_format | Specify timestamp format, should be 'double', 'iso8601' (seconds precision) or 'iso8601_ns' (fractional seconds precision) | double |
brokers | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | |
topics | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used. | fluent-bit |
topic_key | If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is router and the record is {"key1": 123, "router": "route_2"}, Fluent Bit will use topic route_2. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | |
dynamic_topic | adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured | Off |
queue_full_retries | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The | 10 |
rdkafka.{property} |
|
Setting
rdkafka.log.connection.close
tofalse
andrdkafka.request.required.acks
to 1 are examples of recommended settings of librdfkafka properties.
Getting Started
In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:
Command Line
The kafka plugin can read parameters through the -p argument (property), e.g:
Configuration File
In your main configuration file append the following Input & Output sections:
Avro Support
Fluent-bit comes with support for avro encoding for the out_kafka plugin. Avro support is optional and must be activated at build-time by using a build def with cmake: -DFLB_AVRO_ENCODER=On
such as in the following example which activates:
out_kafka with avro encoding
fluent-bit's prometheus
metrics via an embedded http endpoint
debugging support
builds the test suites
Kafka Configuration File with Avro Encoding
This is example fluent-bit config tails kubernetes logs, decorates the log lines with kubernetes metadata via the kubernetes filter, and then sends the fully decorated log lines to a kafka broker encoded with a specific avro schema.
Last updated