Fluent Bit: Official Manual
SlackGitHubCommunity MeetingsSandbox and LabsWebinars
2.2
2.2
  • Fluent Bit v2.2 Documentation
  • About
    • What is Fluent Bit?
    • A Brief History of Fluent Bit
    • Fluentd & Fluent Bit
    • License
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Fluent Bit
    • Upgrade Notes
    • Supported Platforms
    • Requirements
    • Sources
      • Download Source Code
      • Build and Install
      • Build with Static Configuration
    • Linux Packages
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
      • Raspbian / Raspberry Pi
    • Docker
    • Containers on AWS
    • Amazon EC2
    • Kubernetes
    • macOS
    • Windows
    • Yocto / Embedded Linux
  • Administration
    • Configuring Fluent Bit
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • YAML Configuration
        • Configuration File
      • Unit Sizes
      • Multiline Parsing
    • Transport Security
    • Buffering & Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • HTTP Proxy
    • Hot Reload
    • Troubleshooting
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Pipeline Monitoring
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Log Based Metrics
      • Docker Events
      • Dummy
      • Elasticsearch
      • Exec
      • Exec Wasi
      • Fluent Bit Metrics
      • Forward
      • Head
      • HTTP
      • Health
      • Kafka
      • Kernel Logs
      • Kubernetes Events
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • Podman Metrics
      • Process Log Based Metrics
      • Process Exporter Metrics
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Splunk
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • UDP
      • OpenTelemetry
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Filters
      • AWS Metadata
      • CheckList
      • ECS Metadata
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Log to Metrics
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Sysinfo
      • Throttle
      • Type Converter
      • Tensorflow
      • Wasm
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Data Explorer
      • Azure Log Analytics
      • Azure Logs Ingestion API
      • Counter
      • Datadog
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Google Chronicle
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • NATS
      • New Relic
      • NULL
      • Observe
      • Oracle Log Analytics
      • OpenSearch
      • OpenTelemetry
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP & TLS
      • Treasure Data
      • Vivo Exporter
      • WebSocket
  • Stream Processing
    • Introduction to Stream Processing
    • Overview
    • Changelog
    • Getting Started
      • Fluent Bit + SQL
      • Check Keys and NULL values
      • Hands On! 101
  • Fluent Bit for Developers
    • C Library API
    • Ingest Records Manually
    • Golang Output Plugins
    • WASM Filter Plugins
    • WASM Input Plugins
    • Developer guide for beginners on contributing to Fluent Bit
Powered by GitBook
On this page
  • Configuration Parameters
  • Getting Started
  • Command Line
  • Configuration File
  • Testing
  • Configuration File
  • Scenario Description

Was this helpful?

Export as PDF
  1. Data Pipeline
  2. Outputs

WebSocket

Last updated 1 year ago

Was this helpful?

The websocket output plugin allows to flush your records into a WebSocket endpoint. For now the functionality is pretty basic and it issues a HTTP GET request to do the handshake, and then use TCP connections to send the data records in either JSON or (or JSON) format.

Configuration Parameters

Key
Description
default

Host

IP address or hostname of the target WebScoket Server

127.0.0.1

Port

TCP port of the target WebScoket Server

80

URI

Specify an optional HTTP URI for the target websocket server, e.g: /something

/

Format

Specify the data format to be used in the HTTP request body, by default it uses msgpack. Other supported formats are json, json_stream and json_lines and gelf.

msgpack

json_date_key

Specify the name of the date field in output

date

json_date_format

Specify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681)

double

Getting Started

In order to insert records into a HTTP server, you can run the plugin from the command line or through the configuration file:

Command Line

The websocket plugin, can read the parameters from the command line in two ways, through the -p argument (property) or setting them directly through the service URI. The URI format is the following:

http://host:port/something

Using the format specified, you could start Fluent Bit through:

$ fluent-bit -i cpu -t cpu -o websocket://192.168.2.3:80/something -m '*'

Configuration File

In your main configuration file, append the following Input & Output sections:

[INPUT]
    Name  cpu
    Tag   cpu

[OUTPUT]
    Name  websocket
    Match *
    Host  192.168.2.3
    Port  80
    URI   /something
    Format json

Testing

Configuration File

[INPUT]
    Name        tcp
    Listen      0.0.0.0
    Port        5170
    Format      json
[OUTPUT]
    Name           websocket
    Match          *
    Host           127.0.0.1
    Port           8080
    URI            /
    Format         json
    workers	   4
    net.keepalive               on
    net.keepalive_idle_timeout  30

Once Fluent Bit is running, you can send some messages using the netcat:

$ echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170; sleep 35; echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170
bin/fluent-bit   -c ../conf/out_ws.conf
Fluent Bit v1.7.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/02/05 22:17:09] [ info] [engine] started (pid=6056)
[2021/02/05 22:17:09] [ info] [storage] version=1.1.0, initializing...
[2021/02/05 22:17:09] [ info] [storage] in-memory
[2021/02/05 22:17:09] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/02/05 22:17:09] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170
[2021/02/05 22:17:09] [ info] [out_ws] we have following parameter /, 127.0.0.1, 8080, 25
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #1 started
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #0 started
[2021/02/05 22:17:09] [ info] [sp] stream processor started
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #3 started
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #2 started
[2021/02/05 22:17:33] [ info] [out_ws] handshake for ws
[2021/02/05 22:18:08] [ warn] [engine] failed to flush chunk '6056-1612534687.673438119.flb', retry in 7 seconds: task_id=0, input=tcp.0 > output=websocket.0 (out_id=0)
[2021/02/05 22:18:15] [ info] [out_ws] handshake for ws
^C[2021/02/05 22:18:23] [engine] caught signal (SIGINT)
[2021/02/05 22:18:23] [ warn] [engine] service will stop in 5 seconds
[2021/02/05 22:18:27] [ info] [engine] service stopped
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #0 stopping...
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #0 stopped
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #1 stopping...
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #1 stopped
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #2 stopping...
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #2 stopped
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #3 stopping...
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #3 stopped
[2021/02/05 22:18:27] [ info] [out_ws] flb_ws_conf_destroy

Scenario Description

From the output of fluent-bit log, we see that once data has been ingested into fluent bit, plugin would perform handshake. After a while, no data or traffic is undergoing, tcp connection would been abort. And then another piece of data arrived, a retry for websocket plugin has been triggered, with another handshake and data flush.

There is another scenario, once websocket server flaps in a short time, which means it goes down and up in a short time, fluent-bit would resume tcp connection immediately. But in that case, websocket output plugin is a malfunction state, it needs to restart fluent-bit to get back to work.

Websocket plugin is working with tcp keepalive mode, please refer to section for details. Since websocket is a stateful plugin, it will decide when to send out handshake to server side, for example when plugin just begins to work or after connection with server has been dropped. In general, the interval to init a new websocket handshake would be less than the keepalive interval. With that strategy, it could detect and resume websocket connections.

In we should see the following output:

MessagePack
networking
Fluent Bit