Kafka

Kafka output plugin allows to ingest your records into an Apache Kafka service. This plugin use the official librdkafka C library (built-in dependency)

Configuration Parameters

Setting rdkafka.log.connection.close to false and rdkafka.request.required.acks to 1 are examples of recommended settings of librdfkafka properties.

Getting Started

In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:

Command Line

The kafka plugin, can read the parameters from the command line in two ways, through the -p argument (property), e.g:

$ fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test

Configuration File

In your main configuration file append the following Input & Output sections:

[INPUT]
    Name  cpu

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     192.168.1.3:9092
    Topics      test

Avro Support

Fluent-bit comes with support for avro encoding for the out_kafka plugin. Avro support is optional and must be activated at build-time by using a build def with cmake: -DFLB_AVRO_ENCODER=On such as in the following example which activates:

  • out_kafka with avro encoding

  • fluent-bit's prometheus

  • metrics via an embedded http endpoint

  • debugging support

  • builds the test suites

cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../

Kafka Configuration File with Avro Encoding

This is example fluent-bit config tails kubernetes logs, decorates the log lines with kubernetes metadata via the kubernetes filter, and then sends the fully decorated log lines to a kafka broker encoded with a specific avro schema.

[INPUT]
    Name              tail
    Tag               kube.*
    Alias             some-alias
    Path              /logdir/*.log
    DB                /dbdir/some.db
    Skip_Long_Lines   On
    Refresh_Interval  10
    Parser some-parser

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://some_kube_api:443
    Kube_CA_File        /certs/ca.crt
    Kube_Token_File     /tokens/token
    Kube_Tag_Prefix     kube.var.log.containers.
    Merge_Log           On
    Merge_Log_Key       log_processed

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     192.168.1.3:9092
    Topics      test
    Schema_str  {"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}
    Schema_id some_schema_id
    rdkafka.client.id some_client_id
    rdkafka.debug All
    rdkafka.enable.ssl.certificate.verification true

    rdkafka.ssl.certificate.location /certs/some.cert
    rdkafka.ssl.key.location /certs/some.key
    rdkafka.ssl.ca.location /certs/some-bundle.crt
    rdkafka.security.protocol ssl
    rdkafka.request.required.acks 1
    rdkafka.log.connection.close false

    Format avro
    rdkafka.log_level 7
    rdkafka.metadata.broker.list 192.168.1.3:9092

Last updated