Kafka Producer
The Kafka Producer output plugin lets you ingest your records into an Apache Kafka service. This plugin uses the official librdkafka C library.
In Fluent Bit 4.0.4 and later, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
Configuration parameters
This plugin supports the following parameters:
aws_msk_iam
Enable AWS MSK IAM authentication. Requires Fluent Bit 4.0.4 or later.
false
aws_msk_iam_cluster_arn
Full ARN of the MSK cluster used for region extraction. Required when aws_msk_iam is enabled.
none
brokers
Single or multiple list of Kafka brokers. For example, 192.168.1.3:9092, 192.168.1.4:9092.
none
client_id
Client ID to use when connecting to Kafka.
none
dynamic_topic
Adds unknown topics (found in topic_key) to topics. Only a default topic needs to be configured in topics.
false
format
Specify data format. Available formats: avro (requires Avro encoder build option), gelf, json, msgpack, raw.
json
gelf_full_message_key
Key to use as the long message for GELF format output.
none
gelf_host_key
Key to use as the host for GELF format output.
none
gelf_level_key
Key to use as the log level for GELF format output.
none
gelf_short_message_key
Key to use as the short message for GELF format output.
none
gelf_timestamp_key
Key to use as the timestamp for GELF format output.
none
group_id
Consumer group ID.
none
message_key
Optional key to store the message.
none
message_key_field
If set, the value of message_key_field in the record will indicate the message key. If not set or not found in the record, message_key is used if set.
none
queue_full_retries
Number of local retries to enqueue data when the rdkafka queue is full. The interval between retries is 1 second. Set to 0 for unlimited retries.
10
raw_log_key
When using the raw format, the value of raw_log_key in the record is sent to Kafka as the payload.
none
schema_id
Avro schema ID. Requires the Avro encoder build option.
none
schema_str
Avro schema string. Requires the Avro encoder build option.
none
timestamp_format
Specify the timestamp format. Allowed values: double, iso8601 (seconds precision), iso8601_ns (nanoseconds precision).
double
timestamp_key
Key to store the record timestamp.
@timestamp
topic_key
If multiple topics exist, the value of topic_key in the record indicates the topic to use. If the value isn't present in topics, the first topic in the list is used.
none
topics
Single topic or comma-separated list of topics that Fluent Bit will use to send messages to Kafka. If multiple topics are set, the topic_key field in the record selects the topic.
fluent-bit
Setting rdkafka.log.connection.close to false and rdkafka.request.required.acks to 1 are examples of recommended settings of librdfkafka properties.
Get started
To insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file.
Command line
The Kafka plugin can read parameters through the -p argument (property):
Configuration file
In your main configuration file append the following:
Avro support
Fluent Bit comes with support for Avro encoding for the out_kafka plugin. Avro support is optional and must be activated at build time by using a build def with cmake: -DFLB_AVRO_ENCODER=On such as in the following example which activates:
out_kafkawith Avro encodingFluent Bit Prometheus
Metrics using an embedded HTTP endpoint
Debugging support
Builds the test suites
Kafka configuration file with Avro encoding
In this example, the Fluent Bit configuration tails Kubernetes logs, updates the log lines with Kubernetes metadata using the Kubernetes filter. It then sends the updated log lines to a Kafka broker encoded with a specific Avro schema.
Kafka configuration file with rawformat
rawformatThis example Fluent Bit configuration file creates example records with the payloadkey and msgkey keys. The msgkey value is used as the Kafka message key, and the payloadkey value as the payload.
AWS MSK IAM authentication
Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
Prerequisites
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
Build Requirements
Linux/macOS
The packages libsasl2 and libsasl2-dev must be installed on your build environment.
Windows
No additional SASL libraries required. Windows uses the built-in Security Support Provider Interface (SSPI) for SASL authentication, which only requires OpenSSL/TLS to be enabled.
Runtime Requirements:
Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
AWS Credentials: Provide credentials using any supported AWS method:
IAM roles (recommended for EC2, ECS, or EKS)
Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY)AWS credentials file (
~/.aws/credentials)Instance metadata service (IMDS)
These credentials are discovered by default when
aws_msk_iamflag is enabled.IAM Permissions: The credentials must allow access to the target MSK cluster.
AWS MSK IAM configuration parameters
See aws_msk_iam and aws_msk_iam_cluster_arn in the configuration parameters table.
Configuration example
AWS IAM policy
IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
Last updated
Was this helpful?