Explaining the Fluent Bit Processor

A digital illustration featuring a green circular design with a white funnel and circuit lines icon, set against a blue and green abstract background with network-like connections, symbolizing the seamless flow of data in a Fluent Bit Processor.
ACF Image Blog

Fluent Bit recently introduced a new feature called Processors that allows users to apply data transformations and filtering to incoming data records before they are processed further in the pipeline. Here’s what you need to know to use them.

Phil Wilkins
Phil Wilkins | Guest author

Phil Wilkins is a Cloud Evangelist with Oracle and has over three decades of experience in the software industry. He is the author of three previous books, including Logging in Action: With Fluentd, Kubernetes and more.

7 MINS READ

This post is an excerpt from the forthcoming book Fluent Bit with Kubernetes by Phil Wilkins and published by Manning Publications. It appears here by agreement with the publisher. You can also download an advance copy of the book for free.

What is the Fluent Bit Processor

Fluent Bit 2.1.2 introduced additional customization logic for input and output plugins called Processors (not to be confused with the Stream Processor). It has a similar feel to using Lua as a custom filter in many respects. But there are some notable differences.

  • The processor can be viewed as being an ‘inline’ piece of logic. As a result, the logic implemented can have a direct material impact on the performance of the input or output plugin. As a result, we can significantly impact the plugin’s behavior and performance, such as its ability to ingest data, so it should be treated with care.
  • The processor can be configured to interact with the different telemetry types Fluent Bit can handle since version 2.0 (logs, metrics, and traces).
  • The data being manipulated will not yet have been added to the buffer, and the actions of this logic could potentially prevent that from occurring.
  • We can only configure the processor feature using YAML.

As a result, this isn’t a fully-fledged custom plugin capability, but at the same time, it can have more potential impact.

Let’s look at an example of its use.

Processor example

So we can easily separate the impact of the processor from the plugin, we’re going to work with two very simple plugins. As a source, we’ll take the `Random` input plugin, which we’ll then manipulate in the following ways:

  • The original random value will be copied to a new event attribute.
  • The random value will be divided by 2 (truncating any fractional parts
  • We’ll capture the response from pinging Google.com

As the Random plugin generates log events (rather than traces or metrics), we’ll need to tell the processor we want the events to be presented to us as log events. For the output, we’ll use stdout and again manipulate the contents in the following ways:

  • Truncate the response from the ping invocation.
  • Add an attribute with a value to the log event.

To use the processor, we need to introduce a new declaration block for the processor (called processors), which we then use first to name the type of event we want to interact with (in our case logs, but with the ability to also interact with signals and traces). Once we identify the filter using the name attribute, we follow the same parameters as we would if defining a custom filter. For a Lua filter, we need to define the method name to invoke in the code (call attribute). In this example, we’ve used the YAML facility to inline the code into the configuration with the code attribute.

We can see this as follows:

service:
  log_level: info
  http_server: on
  flush_interval: 1

pipeline:
  inputs:
    - name: random
      tag: test-tag
      interval_sec: 5                                                     #A
      processors:                                                         #B
        logs:                                                             #C
          - name: lua                                                     #D
            call: modify                                                  #E
            code: |                                                       #F
              function modify(tag, timestamp, record)
                new_record = record
                new_record["original_val"] = record["rand_value"]         #G
                local num = tonumber(record["rand_value"])
                local newNum = string.format("%d", num/2)                 #H
                new_record["rand_value"] = newNum
                new_record["tag"] = tag
                local handler = io.popen("ping -c 1 -i 0.1 google.com")   #I
                new_record["ping"] = handler:read("*a")
                return 1, timestamp, new_record
              end
  #filters:                                                               #J
  #  - name: stdout
  #    match: "*"
  outputs:
    - name: stdout
      match: "*"
      processors:                                                         #K
        logs:
          - name: lua
            call: modify_out
            code: |
              function modify_out(tag, timestamp, record)
                  new_record = record
                  local search = record["ping"]
                 local start = string.find(search, " ms")                 #L
                new_record["ping"] = string.sub(search, 0, start + 2) 
                  new_record["output"] = "new data"
                  return 1, timestamp, new_record
                end

Code notes:

  • #A In this YAML configuration, we’ve set the random number generator input to run every 5 seconds. This means you’ll have time to evaluate the output generated.
  • #B We start the configuration for the Processor
  • #C This tells Fluent Bit we’re interested in the log-based inputs (other options being traces and metrics)
  • #D We tell the Processor we want to use the Lua plugin (we could use other plugins here as well)
  • #E As with using Lua as a custom Filter, we need to identify the name of the method to invoke.
  • #F Rather than reference a separate file, we can embed the script into the YAML configuration. But be aware that any errors from LuaJIT will report line numbers based on the start of the Lua code, not.
  • #G Copy the random
  • #H Get the random number divided by two and applied back into the data structure
  • #I Ping google.com once with a very short delay
  • #J We have this commented-out filter, so if necessary, we can look at the final input
  • #K Inside the output plugin configuration, we declare another processor.
  • #L The output from pinging Google.com is length – so we will truncate the response string and update the attribute.

We can run this scenario very easily with the command:

fluent-bit -c processor-demo.yaml

We’ll see the Fluent Bit generating output like this:

[0] test-tag: [[1707683937.869194269, {}], {"rand_value"=>"3468270981094447104", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #A
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=2.82 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>6936541962188894208}] #B
[0] test-tag: [[1707683942.869093656, {}], {"rand_value"=>"718867304183793408", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data.
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.41 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>1437734608367586816}]
[0] test-tag: [[1707683947.869064092, {}], {"rand_value"=>"-4611686018427387904", "ping"=>"PING google.com (142.250.178.14) 56(84) bytes of data. #C
64 bytes from lhr48s27-in-f14.1e100.net (142.250.178.14): icmp_seq=1 ttl=116 time=3.78 ms", "output"=>"new data", "tag"=>"test-tag", "original_val"=>-9223372036854775808}]

Output notes:

  • #A As a result of our processor we can see an additional element including the result of pinging Google.com
  • #B The random number from the original plugin has been copied to a new attribute, and the original rand_value tag has had its value modified
  • #C The text that is normally produced as a result of using ping has been changed prior to completing the output operation

Note from this console output sample that we have two different generated numbers – the random one and the result of dividing it by two. We can also use the shortened output from the ping command run.


Learn more about Fluent Bit

If you enjoyed this article, be sure to download your free copy of Fluent Bit with Kubernetes by Phil Wilkins.

To learn more about Fluent Bit, visit Fluent Bit Academy, your destination for best practices and how-to’s on advanced processing, routing, and all things Fluent Bit. Here’s a sample of what you can find there:

About Fluent Bit and Chronosphere

With Chronosphere’s acquisition of Calyptia in 2024, Chronosphere became the primary corporate sponsor of Fluent Bit. Eduardo Silva — the original creator of Fluent Bit and co-founder of Calyptia — leads a team of Chronosphere engineers dedicated full-time to the project. Fluent Bit is a graduated project of the Cloud Native Computing Foundation (CNCF) under the umbrella of Fluentd, alongside other foundational technologies such as Kubernetes and Prometheus. Chronosphere is also a silver-level sponsor of the CNCF.

Share This:
Table Of Contents