Fluent Bit: Official Manual
SlackGitHubCommunity MeetingsSandbox and LabsWebinars
1.5
1.5
  • Fluent Bit v1.5 Documentation
  • About
    • What is Fluent Bit ?
    • A Brief History of Fluent Bit
    • Fluentd & Fluent Bit
    • License
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Upgrade Notes
    • Supported Platforms
    • Requirements
    • Sources
      • Download Source Code
      • Build and Install
      • Build with Static Configuration
    • Linux Packages
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • Raspbian / Raspberry Pi
    • Docker
    • Containers on AWS
    • Amazon EC2
    • Kubernetes
    • Yocto / Embedded Linux
    • Windows
  • Administration
    • Configuring Fluent Bit
      • Format and Schema
      • Configuration File
      • Variables
      • Commands
      • Upstream Servers
      • Unit Sizes
      • Record Accessor
    • Security
    • Buffering & Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • Dump Internals / Signal
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Inputs
      • Collectd
      • CPU Metrics
      • Disk I/O Metrics
      • Docker Events
      • Dummy
      • Exec
      • Forward
      • Head
      • Health
      • Kernel Logs
      • Memory Metrics
      • MQTT
      • Network I/O Metrics
      • Process
      • Random
      • Serial Interface
      • Standard Input
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • Windows Event Log
    • Parsers
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Filters
      • AWS Metadata
      • Expect
      • Grep
      • Kubernetes
      • Lua
      • Parser
      • Record Modifier
      • Rewrite Tag
      • Standard Output
      • Throttle
      • Nest
      • Modify
    • Outputs
      • Amazon CloudWatch
      • Azure
      • BigQuery
      • Counter
      • Datadog
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • NATS
      • New Relic
      • NULL
      • PostgreSQL
      • Stackdriver
      • Standard Output
      • Splunk
      • Syslog
      • TCP & TLS
      • Treasure Data
  • Stream Processing
    • Introduction to Stream Processing
    • Overview
    • Changelog
    • Getting Started
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On! 101
  • Fluent Bit for Developers
    • C Library API
    • Ingest Records Manually
    • Golang Output Plugins
    • Developer guide for beginners on contributing to Fluent Bit
Powered by GitBook
On this page
  • Data Format
  • Usage

Was this helpful?

Export as PDF
  1. Fluent Bit for Developers

Ingest Records Manually

There are some cases where Fluent Bit library is used to send records from the caller application to some destination, this process is called manual data ingestion.

For this purpose a specific input plugin called lib exists and can be using in conjunction with the flb_lib_push() API function.

Data Format

The lib input plugin expect the data comes in a fixed JSON format as follows:

[UNIX_TIMESTAMP, MAP]

Every record must be a JSON array that contains at least two entries. The first one is the UNIX_TIMESTAMP which is a number representing time associated to the event generation (Epoch time) and the second entry is a JSON map with a list of key/values. A valid entry can be the following:

[1449505010, {"key1": "some value", "key2": false}]

Usage

The following C code snippet shows how to insert a few JSON records into a running Fluent Bit engine:

#include <fluent-bit.h>

#define JSON_1   "[1449505010, {\"key1\": \"some value\"}]"
#define JSON_2   "[1449505620, {\"key1\": \"some new value\"}]"

int main()
{
    int ret;
    int in_ffd;
    int out_ffd;
    flb_ctx_t *ctx;

    /* Create library context */
    ctx = flb_create();
    if (!ctx) {
        return -1;
    }

    /* Enable the input plugin for manual data ingestion */
    in_ffd = flb_input(ctx, "lib", NULL);
    if (in_ffd == -1) {
        flb_destroy(ctx);
        return -1;
    }

    /* Enable output plugin 'stdout' (print records to the standard output) */
    out_ffd = flb_output(ctx, "stdout", NULL);
    if (out_ffd == -1) {
        flb_destroy(ctx);
        return -1;
    }

    /* Start the engine */
    ret = flb_start(ctx);
    if (ret == -1) {
        flb_destroy(ctx);
        return -1;
    }

    /* Ingest data manually */
    flb_lib_push(ctx, in_ffd, JSON_1, sizeof(JSON_1) - 1);
    flb_lib_push(ctx, in_ffd, JSON_2, sizeof(JSON_2) - 1);

    /* Stop the engine (5 seconds to flush remaining data) */
    flb_stop(ctx);

    /* Destroy library context, release all resources */
    flb_destroy(ctx);

    return 0;
}

Last updated 4 years ago

Was this helpful?