Kafka
The Kafka input plugin enables Fluent Bit to consume messages directly from one or more Apache Kafka topics. By subscribing to specified topics, this plugin efficiently collects and forwards Kafka messages for further processing within your Fluent Bit pipeline.
Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
This plugin uses the official librdkafka C library as a built-in dependency.
Configuration parameters
brokers
Single or multiple list of Kafka Brokers. For example: 192.168.1.3:9092
, 192.168.1.4:9092
.
none
topics
Single entry or list of comma-separated topics (,
) that Fluent Bit will subscribe to.
none
format
Serialization format of the messages. If set to json
, the payload will be parsed as JSON.
none
client_id
Client id passed to librdkafka.
none
group_id
Group id passed to librdkafka.
fluent-bit
poll_ms
Kafka brokers polling interval in milliseconds.
500
Buffer_Max_Size
Specify the maximum size of buffer per cycle to poll Kafka messages from subscribed topics. To increase throughput, specify larger size.
4M
Get started
To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file as shown below.
Command line
The Kafka plugin can read parameters through the -p
argument (property):
$ fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic
Configuration file (recommended)
In your main configuration file append the following:
pipeline:
inputs:
- name: kafka
brokers: 192.168.1.3:9092
topics: some-topic
poll_ms: 100
outputs:
- name: stdout
match: '*'
Example of using Kafka input and output plugins
The Fluent Bit source repository contains a full example of using Fluent Bit to process Kafka records:
pipeline:
inputs:
- name: kafka
brokers: kafka-broker:9092
topics: fb-source
poll_ms: 100
format: json
filters:
- name: lua
match: '*'
script: kafka.lua
call: modify_kafka_message
outputs:
- name: kafka
brokers: kafka-broker:9092
topics: fb-sink
The previous example will connect to the broker listening on kafka-broker:9092
and subscribe to the fb-source
topic, polling for new messages every 100 milliseconds.
Since the payload will be in JSON format, the plugin is configured to parse the payload with format json
.
Every message received is then processed with kafka.lua
and sent back to the fb-sink
topic of the same broker.
The example can be executed locally with make start
in the examples/kafka_filter
directory (docker/compose
is used).
AWS MSK IAM Authentication
Available since Fluent Bit v4.0.4
Fluent Bit supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This allows you to securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
Prerequisites
Build Requirements
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
The packages
libsasl2
andlibsasl2-dev
must be installed on your build environment.
Runtime Requirements
Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
AWS Credentials: Provide credentials using any supported AWS method:
IAM roles (recommended for EC2, ECS, or EKS)
Environment variables (
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
)AWS credentials file (
~/.aws/credentials
)Instance metadata service (IMDS)
Note these credentials are discovery by default when
aws_msk_iam
flag is enabled.IAM Permissions: The credentials must allow access to the target MSK cluster (see example policy below).
Configuration Parameters
aws_msk_iam
Enable AWS MSK IAM authentication
Boolean
No (default: false)
aws_msk_iam_cluster_arn
Full ARN of the MSK cluster for region extraction
String
Yes (if aws_msk_iam
is true)
Configuration Example
pipeline:
inputs:
- name: kafka
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
topics: my-topic
aws_msk_iam: true
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
outputs:
- name: stdout
match: '*'
Example AWS IAM Policy
Note: IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kafka-cluster:*",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopic",
"kafka-cluster:Connect"
],
"Resource": "*"
}
]
}
Last updated
Was this helpful?