Kafka

The Kafka input plugin enables Fluent Bit to consume messages directly from one or more Apache Kafka topics. By subscribing to specified topics, this plugin efficiently collects and forwards Kafka messages for further processing within your Fluent Bit pipeline.

Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.

This plugin uses the official librdkafka C library as a built-in dependency.

Configuration parameters

Key
Description
default

brokers

Single or multiple list of Kafka Brokers. For example: 192.168.1.3:9092, 192.168.1.4:9092.

none

topics

Single entry or list of comma-separated topics (,) that Fluent Bit will subscribe to.

none

format

Serialization format of the messages. If set to json, the payload will be parsed as JSON.

none

client_id

Client id passed to librdkafka.

none

group_id

Group id passed to librdkafka.

fluent-bit

poll_ms

Kafka brokers polling interval in milliseconds.

500

Buffer_Max_Size

Specify the maximum size of buffer per cycle to poll Kafka messages from subscribed topics. To increase throughput, specify larger size.

4M

rdkafka.{property}

{property} can be any librdkafka properties

none

threaded

Indicates whether to run this input in its own thread.

false

Get started

To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file as shown in the following examples.

Command line

The Kafka plugin can read parameters through the -p argument (property):

fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic

In your main configuration file append the following:

pipeline:
  inputs:
    - name: kafka
      brokers: 192.168.1.3:9092
      topics: some-topic
      poll_ms: 100

  outputs:
    - name: stdout
      match: '*'

Example of using Kafka input and output plugins

The Fluent Bit source repository contains a full example of using Fluent Bit to process Kafka records:

pipeline:
  inputs:
    - name: kafka
      brokers: kafka-broker:9092
      topics: fb-source
      poll_ms: 100
      format: json

  filters:
    - name: lua
      match: '*'
      script: kafka.lua
      call: modify_kafka_message

  outputs:
    - name: kafka
      brokers: kafka-broker:9092
      topics: fb-sink

The previous example will connect to the broker listening on kafka-broker:9092 and subscribe to the fb-source topic, polling for new messages every 100 milliseconds.

Since the payload will be in JSON format, the plugin is configured to parse the payload with format json.

Every message received is then processed with kafka.lua and sent back to the fb-sink topic of the same broker.

The example can be executed locally with make start in the examples/kafka_filter directory (docker/compose is used).

AWS MSK IAM authentication

Fluent Bit v4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This lets you securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.

Build requirements

If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:

  • The packages libsasl2 and libsasl2-dev must be installed on your build environment.

Runtime requirements

  • Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).

  • AWS Credentials: Provide these AWS credentials using any supported AWS method. These credentials are discovered by default when aws_msk_iam flag is enabled.

    • IAM roles (recommended for EC2, ECS, or EKS)

    • Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

    • AWS credentials file (~/.aws/credentials)

    • Instance metadata service (IMDS)

  • IAM Permissions: The credentials must allow access to the target MSK cluster, as shown in the following example policy.

Configuration parameters [#config-aws]

Property
Description
Required

aws_msk_iam

If true, enables AWS MSK IAM authentication. Possible values: true, false.

false

aws_msk_iam_cluster_arn

Full ARN of the MSK cluster for region extraction. This value is required if aws_msk_iam is true.

none

Configuration example

pipeline:
  inputs:
    - name: kafka
      brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
      topics: my-topic
      aws_msk_iam: true
      aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3

  outputs:
    - name: stdout
      match: '*'

Example AWS IAM policy

IAM policies and permissions can be complex and might vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult your AWS administrator or an AWS expert who is familiar with MSK and IAM security.

The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:*",
        "kafka-cluster:DescribeCluster",
        "kafka-cluster:ReadData",
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:Connect"
      ],
      "Resource": "*"
    }
  ]
}

Last updated

Was this helpful?