Fluent Bit recently introduced a new feature called Processors that allows users to apply data transformations and filtering to incoming data records before they are processed further in the pipeline. Here’s what you need to know to use them.
On: May 7, 2024
Phil Wilkins is a Cloud Evangelist with Oracle and has over three decades of experience in the software industry. He is the author of three previous books, including Logging in Action: With Fluentd, Kubernetes and more.
This post is an excerpt from the forthcoming book Fluent Bit with Kubernetes by Phil Wilkins and published by Manning Publications. It appears here by agreement with the publisher. You can also download an advance copy of the book for free.
Fluent Bit 2.1.2 introduced additional customization logic for input and output plugins called Processors (not to be confused with the Stream Processor). It has a similar feel to using Lua as a custom filter in many respects. But there are some notable differences.
As a result, this isn’t a fully-fledged custom plugin capability, but at the same time, it can have more potential impact.
Let’s look at an example of its use.
So we can easily separate the impact of the processor from the plugin, we’re going to work with two very simple plugins. As a source, we’ll take the `Random` input plugin, which we’ll then manipulate in the following ways:
As the Random plugin generates log events (rather than traces or metrics), we’ll need to tell the processor we want the events to be presented to us as log events. For the output, we’ll use stdout
and again manipulate the contents in the following ways:
To use the processor, we need to introduce a new declaration block for the processor (called processors
), which we then use first to name the type of event we want to interact with (in our case logs
, but with the ability to also interact with signals and traces). Once we identify the filter using the name
attribute, we follow the same parameters as we would if defining a custom filter. For a Lua filter, we need to define the method name to invoke in the code (call
attribute). In this example, we’ve used the YAML facility to inline the code into the configuration with the code
attribute.
We can see this as follows:
service:
log_level: info
http_server: on
flush_interval: 1
pipeline:
inputs:
- name: random
tag: test-tag
interval_sec: 5 #A
processors: #B
logs: #C
- name: lua #D
call: modify #E
code: | #F
function modify(tag, timestamp, record)
new_record = record
new_record["original_val"] = record["rand_value"] #G
local num = tonumber(record["rand_value"])
local newNum = string.format("%d", num/2) #H
new_record["rand_value"] = newNum
new_record["tag"] = tag
local handler = io.popen("ping -c 1 -i 0.1 google.com") #I
new_record["ping"] = handler:read("*a")
return 1, timestamp, new_record
end
#filters: #J
# - name: stdout
# match: "*"
outputs:
- name: stdout
match: "*"
processors: #K
logs:
- name: lua
call: modify_out
code: |
function modify_out(tag, timestamp, record)
new_record = record
local search = record["ping"]
local start = string.find(search, " ms") #L
new_record["ping"] = string.sub(search, 0, start + 2)
new_record["output"] = "new data"
return 1, timestamp, new_record
end
We can run this scenario very easily with the command:
fluent-bit -c processor-demo.yaml
We’ll see the Fluent Bit generating output like this:
[0] test-tag: [[1707683937.869194269, {}], {"rand_value"=>"3468270981094447104", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #A
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=2.82 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>6936541962188894208}] #B
[0] test-tag: [[1707683942.869093656, {}], {"rand_value"=>"718867304183793408", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data.
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.41 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>1437734608367586816}]
[0] test-tag: [[1707683947.869064092, {}], {"rand_value"=>"-4611686018427387904", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #C
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.78 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>-9223372036854775808}]
Note from this console output sample that we have two different generated numbers – the random one and the result of dividing it by two. We can also use the shortened output from the ping command run.
If you enjoyed this article, be sure to download your free copy of Fluent Bit with Kubernetes by Phil Wilkins.
To learn more about Fluent Bit, visit Fluent Bit Academy, your destination for best practices and how-to’s on advanced processing, routing, and all things Fluent Bit. Here’s a sample of what you can find there:
With Chronosphere’s acquisition of Calyptia in 2024, Chronosphere became the primary corporate sponsor of Fluent Bit. Eduardo Silva — the original creator of Fluent Bit and co-founder of Calyptia — leads a team of Chronosphere engineers dedicated full-time to the project. Fluent Bit is a graduated project of the Cloud Native Computing Foundation (CNCF) under the umbrella of Fluentd, alongside other foundational technologies such as Kubernetes and Prometheus. Chronosphere is also a silver-level sponsor of the CNCF.