The Lua filter allows you to modify the incoming records (even split one record into multiple records) using custom Lua scripts.
Due to the necessity to have a flexible filtering mechanism, it is now possible to extend Fluent Bit capabilities by writing custom filters using Lua programming language. A Lua-based filter takes two steps:
Configure the Filter in the main configuration
Prepare a Lua script that will be used by the Filter
Configuration Parameters
The plugin supports the following configuration parameters:
Key
Description
Getting Started
In order to test the filter, you can run the plugin from the command line or through the configuration file. The following examples use the dummy input plugin for data ingestion, invoke Lua filter using the test.lua script and call the cb_print() function which only prints the same information to the standard output:
Command Line
From the command line you can use the following options:
The life cycle of a filter have the following steps:
Upon Tag matching by this filter, it may process or bypass the record.
If tag matched, it will accept the record and invoke the function defined in the call property which basically is the name of a function defined in the Lua script.
Invoke Lua function and pass each record in JSON format.
Upon return, validate return value and continue the pipeline.
Callback Prototype
The Lua script can have one or multiple callbacks that can be used by this filter. The function prototype is as follows:
[SERVICE]
flush 1
daemon off
log_level debug
[INPUT]
Name random
Tag test
Samples 10
[FILTER]
Name Lua
Match *
call append_tag
code function append_tag(tag, timestamp, record) new_record = record new_record["tag"] = tag return 1, timestamp, new_record end
[OUTPUT]
Name stdout
Match *
service:flush:1daemon:offlog_level:infopipeline:inputs: - name:randomtag:testsamples:10filters: - name:luamatch:"*"call:append_tagcode:| function append_tag(tag, timestamp, record) new_record = record new_record["tag"] = tag return 1, timestamp, new_record endoutputs: - name:stdoutmatch:"*"
Environment variable processing
As an example that combines a bit of LUA processing with the Kubernetes filter that demonstrates using environment variables with LUA regex and substitutions.
Kubernetes pods generally have various environment variables set by the infrastructure automatically which may contain useful information.
In this example, we want to extract part of the Kubernetes cluster API name.
The environment variable is set like so: KUBERNETES_SERVICE_HOST: api.sandboxbsh-a.project.domain.com
We want to extract the sandboxbsh name and add it to our record as a special key.
[FILTER]
Name lua
Alias filter-iots-lua
Match iots_thread.*
Script filters.lua
Call set_landscape_deployment
-- Use a Lua function to create some additional entries based-- on substrings from the kubernetes properties.functionset_landscape_deployment(tag,timestamp,record)local landscape =os.getenv("KUBERNETES_SERVICE_HOST")if landscape then-- Strip the landscape name from this field, KUBERNETES_SERVICE_HOST-- Should be of this format-- api.sandboxbsh-a.project.domain.com-- Take off the leading "api."-- sandboxbsh-a.project.domain.com--print("landscape1:" .. landscape) landscape = landscape:gsub("^[^.]+.", "")--print("landscape2:" .. landscape)-- Take off everything including and after the - in the cluster name-- sandboxbsh landscape = landscape:gsub("-.*$", "")-- print("landscape3:" .. landscape) record["iot_landscape"] = landscapeend-- 2 - replace existing record with this updatereturn2, timestamp, recordend
Number Type
+Lua treats number as double. It means an integer field (e.g. IDs, log levels) will be converted double. To avoid type conversion, The type_int_key property is available.
The Lua callback function can return an array of tables (i.e., array of records) in its third record return value. With this feature, the Lua filter can split one input record into multiple records according to custom logic.
In this example, we want to filter istio logs to exclude lines with response codes between 1 and 399. Istio is configured to write the logs in json format.
Configuration to get istio logs and apply response code filter to them.
[INPUT] Name tail Path /var/log/containers/*_istio-proxy-*.log multiline.parser docker, cri Tag istio.* Mem_Buf_Limit 64MB Skip_Long_Lines Off[FILTER] Name lua Match istio.* Script response_code_filter.lua call cb_response_code_filter[Output] Name stdout Match *
Use this configuration to obtain a JSON key with datetime, and then convert it to another format.
[INPUT] Name dummy Dummy {"event": "Restock", "pub_date": "Tue, 30 Jul 2024 18:01:06 +0000"} Tag event_category_a[INPUT] Name dummy Dummy {"event": "Soldout", "pub_date": "Mon, 29 Jul 2024 10:15:00 +0600"} Tag event_category_b[FILTER] Name lua Match * Script custom_datetime_format.lua call convert_to_utc[Output] Name stdout Match *
pipeline:inputs: - name:dummydummy:'{"event": "Restock", "pub_date": "Tue, 30 Jul 2024 18:01:06 +0000"}'tag:event_category_a - name:dummydummy:'{"event": "Soldout", "pub_date": "Mon, 29 Jul 2024 10:15:00 +0600"}'tag:event_category_bfilters: - name:luamatch:'*'code:| function convert_to_utc(tag, timestamp, record) local date_time = record["pub_date"] local new_record = record if date_time then if string.find(date_time, ",") then local pattern = "(%a+, %d+ %a+ %d+ %d+:%d+:%d+) ([+-]%d%d%d%d)" local date_part, zone_part = date_time:match(pattern) if date_part and zone_part then local command = string.format("date -u -d '%s %s' +%%Y-%%m-%%dT%%H:%%M:%%SZ", date_part, zone_part)
local handle = io.popen(command) local result = handle:read("*a") handle:close() new_record["pub_date"] = result:match("%S+") end end end return 1, timestamp, new_record endcall:convert_to_utcoutputs: - name:stdoutmatch:'*'