WebSocket
The websocket output plugin allows to flush your records into a WebSocket endpoint. For now the functionality is pretty basic and it issues a HTTP GET request to do the handshake, and then use TCP connections to send the data records in either JSON or MessagePack (or JSON) format.

Configuration Parameters

Key
Description
default
Host
IP address or hostname of the target WebScoket Server
127.0.0.1
Port
TCP port of the target WebScoket Server
80
URI
Specify an optional HTTP URI for the target websocket server, e.g: /something
/
Format
Specify the data format to be used in the HTTP request body, by default it uses msgpack. Other supported formats are json, json_stream and json_lines and gelf.
msgpack
json_date_key
Specify the name of the date field in output
date
json_date_format
Specify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681)
double

Getting Started

In order to insert records into a HTTP server, you can run the plugin from the command line or through the configuration file:

Command Line

The websocket plugin, can read the parameters from the command line in two ways, through the -p argument (property) or setting them directly through the service URI. The URI format is the following:
1
http://host:port/something
Copied!
Using the format specified, you could start Fluent Bit through:
1
$ fluent-bit -i cpu -t cpu -o websocket://192.168.2.3:80/something -m '*'
Copied!

Configuration File

In your main configuration file, append the following Input & Output sections:
1
[INPUT]
2
Name cpu
3
Tag cpu
4
5
[OUTPUT]
6
Name websocket
7
Match *
8
Host 192.168.2.3
9
Port 80
10
URI /something
11
Format json
Copied!
Websocket plugin is working with tcp keepalive mode, please refer to networking section for details. Since websocket is a stateful plugin, it will decide when to send out handshake to server side, for example when plugin just begins to work or after connection with server has been dropped. In general, the interval to init a new websocket handshake would be less than the keepalive interval. With that stratgy, it could detect and resume websocket connetions.

Testing

Configuration File

1
[INPUT]
2
Name tcp
3
Listen 0.0.0.0
4
Port 5170
5
Format json
6
[OUTPUT]
7
Name websocket
8
Match *
9
Host 127.0.0.1
10
Port 8080
11
URI /
12
Format json
13
workers 4
14
net.keepalive on
15
net.keepalive_idle_timeout 30
Copied!
Once Fluent Bit is running, you can send some messages using the netcat:
1
$ echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170; sleep 35; echo '{"key 1": 123456789, "key 2": "abcdefg"}' | nc 127.0.0.1 5170
Copied!
In Fluent Bit we should see the following output:
1
bin/fluent-bit -c ../conf/out_ws.conf
2
Fluent Bit v1.7.0
3
* Copyright (C) 2019-2020 The Fluent Bit Authors
4
* Copyright (C) 2015-2018 Treasure Data
5
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
6
* https://fluentbit.io
7
8
[2021/02/05 22:17:09] [ info] [engine] started (pid=6056)
9
[2021/02/05 22:17:09] [ info] [storage] version=1.1.0, initializing...
10
[2021/02/05 22:17:09] [ info] [storage] in-memory
11
[2021/02/05 22:17:09] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
12
[2021/02/05 22:17:09] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170
13
[2021/02/05 22:17:09] [ info] [out_ws] we have following parameter /, 127.0.0.1, 8080, 25
14
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #1 started
15
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #0 started
16
[2021/02/05 22:17:09] [ info] [sp] stream processor started
17
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #3 started
18
[2021/02/05 22:17:09] [ info] [output:websocket:websocket.0] worker #2 started
19
[2021/02/05 22:17:33] [ info] [out_ws] handshake for ws
20
[2021/02/05 22:18:08] [ warn] [engine] failed to flush chunk '6056-1612534687.673438119.flb', retry in 7 seconds: task_id=0, input=tcp.0 > output=websocket.0 (out_id=0)
21
[2021/02/05 22:18:15] [ info] [out_ws] handshake for ws
22
^C[2021/02/05 22:18:23] [engine] caught signal (SIGINT)
23
[2021/02/05 22:18:23] [ warn] [engine] service will stop in 5 seconds
24
[2021/02/05 22:18:27] [ info] [engine] service stopped
25
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #0 stopping...
26
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #0 stopped
27
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #1 stopping...
28
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #1 stopped
29
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #2 stopping...
30
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #2 stopped
31
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #3 stopping...
32
[2021/02/05 22:18:27] [ info] [output:websocket:websocket.0] thread worker #3 stopped
33
[2021/02/05 22:18:27] [ info] [out_ws] flb_ws_conf_destroy
Copied!

Scenario Description

From the output of fluent-bit log, we see that once data has been ingested into fluent bit, plugin would perform handshake. After a while, no data or traffic is undergoing, tcp connection would been abort. And then another piece of data arrived, a retry for websocket plugin has been triggered, with another handshake and data flush.
There is another scenario, once websocket server flaps in a short time, which means it goes down and up in a short time, fluent-bit would resume tcp connection immediately. But in that case, websocket output plugin is a malfunction state, it needs to restart fluent-bit to get back to work.