WebSocket
The websocket output plugin allows to flush your records into a WebSocket endpoint. For now the functionality is pretty basic and it issues a HTTP GET request to do the handshake, and then use TCP connections to send the data records in either JSON or MessagePack (or JSON) format.

Configuration Parameters

Key
Description
default
Host
IP address or hostname of the target WebScoket Server
127.0.0.1
Port
TCP port of the target WebScoket Server
80
URI
Specify an optional HTTP URI for the target websocket server, e.g: /something
/
Format
Specify the data format to be used in the HTTP request body, by default it uses msgpack. Other supported formats are json, json_stream and json_lines and gelf.
msgpack
json_date_key
Specify the name of the date field in output
date
json_date_format
Specify the format of the date. Supported formats are double and iso8601 (eg: 2018-05-30T09:39:52.000681Z)
double
idle_interval
The interval that websocket output plugin would keep to decide if it is OK to reconnect to Websocket Server
20

Getting Started

In order to insert records into a HTTP server, you can run the plugin from the command line or through the configuration file:

Command Line

The websocket plugin, can read the parameters from the command line in two ways, through the -p argument (property) or setting them directly through the service URI. The URI format is the following:
1
http://host:port/something
Copied!
Using the format specified, you could start Fluent Bit through:
1
$ fluent-bit -i cpu -t cpu -o websocket://192.168.2.3:80/something -m '*'
Copied!

Configuration File

In your main configuration file, append the following Input & Output sections:
1
[INPUT]
2
Name cpu
3
Tag cpu
4
5
[OUTPUT]
6
Name websocket
7
Match *
8
Host 192.168.2.3
9
Port 80
10
URI /something
11
Format json
12
Idle_interval 21
Copied!
Suggested configuration for Idle Interval is 20. Websocket plugin is working with tcp keepalive mode, please refer to networking section for details.
By default, if there is no traffic for about 30 seconds, fluent-bit would abort the tcp connection. As a result, if websocket would like to send data to the same server again, it has to reconnect. This parameter is to help to determine if websocket need to reconnect or not.

Testing

Configuration File

1
[INPUT]
2
Name tcp
3
Listen 0.0.0.0
4
Port 5170
5
Format json
6
[OUTPUT]
7
Name websocket
8
Match *
9
Host 127.0.0.1
10
Port 9000
11
URI /
12
Format json
13
Idle_interval 21
Copied!
Once Fluent Bit is running, you can send some messages using the netcat:
1
$ echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170; sleep 35; echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170
Copied!
In Fluent Bit we should see the following output:
1
bin/fluent-bit -c ../conf/out_ws.conf
2
Fluent Bit v1.5.0
3
* Copyright (C) 2019-2020 The Fluent Bit Authors
4
* Copyright (C) 2015-2018 Treasure Data
5
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
6
* https://fluentbit.io
7
8
[2020/07/07 03:39:31] [ info] [storage] version=1.0.4, initializing...
9
[2020/07/07 03:39:31] [ info] [storage] in-memory
10
[2020/07/07 03:39:31] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
11
[2020/07/07 03:39:31] [ info] [engine] started (pid=12734)
12
[2020/07/07 03:39:31] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170
13
[2020/07/07 03:39:31] [ info] [out_ws] we have following parameter /, 127.0.0.1, 9000, 22
14
[2020/07/07 03:39:31] [ info] [sp] stream processor started
15
[2020/07/07 03:39:36] [ info] [out_ws] handshake for ws
16
[2020/07/07 03:40:11] [ warn] [engine] failed to flush chunk '12734-1594107609.72852130.flb', retry in 6 seconds: task_id=0, input=tcp.0 > output=websocket.0
17
[2020/07/07 03:40:17] [ info] [out_ws] handshake for ws
18
[2020/07/07 03:40:17] [ info] [engine] flush chunk '12734-1594107609.72852130.flb' succeeded at retry 1: task_id=1, input=tcp.0 > output=websocket.0
Copied!

Scenario Description

From the output of fluent-bit log, we see that once data has been ingested into fluent bit, plugin would perform handshake. After a while, no data or traffic is undergoing, tcp connection has been abort. And then another piece of data arrived, a try fro websocket plugin has been triggered, following with another handshake and data flush.
Last modified 6mo ago