Fluent Bit + Wasm = Control
Fluent Bit is a widely-used open source project that enables you to collect logs, metrics, and traces from various sources, filter and transform them, and then forward them to multiple destinations. With over ten billion Docker pulls, Fluent Bit has established itself as a preferred choice for log processing, collecting, and shipping.
Fluent Bit’s pluggable architecture allows users to tailor Fluent Bit according to their specific logging requirements, whether it’s collecting logs from different sources, transforming log data, or shipping them to diverse destinations.
One of Fluent Bit’s newer additions is the Wasm plugin, which allows users to create their own custom filter using WebAssembly (Wasm) to process logs before routing them to their desired location. Wasm is a low-level, binary instruction format designed to be a portable target for the compilation of high-level languages like C, C++, Golang, and others, enabling deployment on the web and other environments. Its portability, language flexibility, and near-native execution speed make it a popular choice for developers.
In this post we’ll cover how Wasm can be used with Fluent Bit to extend Fluent Bit’s capabilities, enabling users to implement custom logic and functionalities. With the integration of Wasm, Fluent Bit can address a range of unique and sophisticated use cases.
We’ll also walk through a sample use case: dynamically routing logs based on Kubernetes labels.
Prerequisites
- Kubernetes Cluster: We will deploy Fluent Bit in a Kubernetes cluster and ship logs of application containers inside Kubernetes. We will be using an EKS cluster, but any cluster will suffice.
- Kubectl and Helm CLI: Installed on your local machine.
- Golang (1.17 / 1.18): Wasm plugins will be written using Golang.
- Tinygo (v0.24.0 or later): For building Wasm programs.
- Familiarity with Fluent Bit concepts: If you’re unfamiliar with concepts such as inputs, outputs, parsers, and filters, please refer to the official documentation.
Understanding the use case
In enterprise environments, you often need to route application logs to multiple destinations based on some criteria. Fluent Bit routes logs based upon configuring tag
and match
fields for each plugin.
In order to route logs to multiple destinations, we need to configure the following:
- an input plugin to read logs from all application containers with a generic source
tag
. - multiple output plugins (sending logs to different destinations) each with a specific
match
value. - a filtering or transformation mechanism that examines the log record and modifies the source tag with an appropriate destination tag.
- a filter to add Kubernetes metadata to each log record. This metadata will be used to evaluate the destination tag.
In Fluent Bit, the filtering mechanism can be performed using the Rewrite Tag
filter, which evaluates a log record against a regular expression. If the expression passes, then the source tag is modified.
But this filter operates only with regular expressions (regex). This restricts us from defining very intricate conditions. To illustrate, consider a Kubernetes Pod having the labels below:
apiVersion: v1
kind: Pod
metadata:
name: my-sample-pod
labels:
region: "ap-south-1"
tenant: "private-a1"
has-sensitive-data: "true"
...
We may have a requirement that if an application has label tenant
equal to private-a1
and region
equal to ap-south-1
and has-sensitive-data
equal to true
then we need to route these logs to S3 in ap-south-1 region.
Evaluating these conditions using regex would be extremely difficult.
However, with the Fluent Bit Wasm plugin, we can create a filter that facilitates retagging under multiple conditions. These conditions can incorporate logical and arithmetic operations, spanning several expressions — essentially, the capabilities we harness in conventional programming.
With the use case explained, let’s start by writing the Wasm program.
Writing the Wasm program
Fluent Bit has no additional requirements for executing Wasm plugins. We just need to write a program in a language that can compile to Wasm. We’ll be using Golang, but it could just as easily be done in any other language that Wasm supports.
Here is our Wasm filter written in Golang that supports our use case.
package main
import (
"fmt"
"unsafe"
"github.com/valyala/fastjson"
)
//export go_filter
func go_filter(tag *uint8, tag_len uint, time_sec uint, time_nsec uint, record *uint8, record_len uint) *uint8 {
// btag := unsafe.Slice(tag, tag_len) // Note, requires Go 1.17 (tinygo 0.20)
// now := time.Unix(int64(time_sec), int64(time_nsec))
brecord := unsafe.Slice(record, record_len)
br := string(brecord)
var p fastjson.Parser
value, err := p.Parse(br)
if err != nil {
fmt.Println(err)
return nil
}
obj, err := value.Object()
if err != nil {
fmt.Println(err)
return nil
}
kubernetesObj := obj.Get("kubernetes")
if kubernetesObj == nil {
s := obj.String()
s += string(rune(0)) // Note: explicit null terminator.
rv := []byte(s)
return &rv[0]
}
labelsObj := kubernetesObj.GetObject("labels")
if labelsObj == nil {
s := obj.String()
s += string(rune(0)) // Note: explicit null terminator.
rv := []byte(s)
return &rv[0]
}
var ar fastjson.Arena
var region, tenant, hasSensitiveData, newTag string
if labelsObj.Get("region") != nil {
region = labelsObj.Get("region").String()
}
if labelsObj.Get("tenant") != nil {
tenant = labelsObj.Get("tenant").String()
}
if labelsObj.Get("has-sensitive-data") != nil {
hasSensitiveData = labelsObj.Get("has-sensitive-data").String()
}
if region == "\"ap-south-1\"" && tenant == "\"private-a1\"" {
newTag = "private-a1-elastic-ap-south-1"
if hasSensitiveData == "\"true\"" {
newTag = "private-a1-s3-ap-south-1"
}
} else if region == "\"ap-south-1\"" {
newTag = "general-elastic-ap-south-1"
if hasSensitiveData == "\"true\"" {
newTag = "general-s3-ap-south-1"
}
} else {
newTag = "general-elastic"
if hasSensitiveData == "\"true\"" {
newTag = "general-s3"
}
}
obj.Set("new_tag", ar.NewString(newTag))
s := obj.String()
s += string(rune(0)) // Note: explicit null terminator.
rv := []byte(s)
return &rv[0]
}
func main() {}
Program explanation
The core logic is written in the function go_filter
. This function name will also be used during Wasm plugin configuration.
The Wasm plugin must have the following function signature:
//export go_filter
func go_filter(tag *uint8, tag_len uint, time_sec uint, time_nsec uint, record *uint8, record_len uint) *uint8
Note: The comment //export go_filter
on function is required, and it should be the same as the function name.
Using the function parameters we will have access to the original log record
, tag
, and timestamp
. Here is an example log record:
{
"log": "2023-10-02T06:52:52.843524746Z stdout F 122.30.117.241 - - [02/Oct/2023:06:52:23 +0000] GET /vortals HTTP/1.0 204 12615",
"kubernetes": {
"pod_name": "app-3-5f8ff8f8c6-xv5lw",
"namespace_name": "default",
"pod_id": "afddd062-c1d9-4333-b3d5-36abd61990eb",
"labels": {
"app": "app-3",
"pod-template-hash": "5f8ff8f8c6"
},
"host": "ip-172-16-18-216.ap-south-1.compute.internal",
"container_name": "app-3",
"docker_id": "f8236c0b81a8926343d982cf94422d24fe882a4b19ec3db2b03482b1d30c8055",
"container_hash": "docker.io/mingrammer/flog@sha256:44180f8610fab7d4c29ff233a79e19cf28bd425c1737aa59c72c1f66613fdf41",
"container_image": "docker.io/mingrammer/flog:0.4.3"
}
}
The Wasm plugin only allows for modifying the log record. However, to support our use case we have to modify the source tag based on some criteria. Therefore, instead of modifying the tag via the Wasm plugin, we’ll evaluate the tag value and insert a new_tag
field in the original log. Using this new_tag
, we utilize the Rewrite Tag
filter for retagging based on simple equality checks.
Processing the record:
The function parameter record
is of type byte slice — which presumably contains a JSON string — is converted to a Go string.
This string is then parsed using the fastjson
package.
Kubernetes and labels:
The function checks if the parsed JSON has a kubernetes
key. If not, the original JSON is returned.
If the kubernetes
key exists, then it checks for a nested labels
key.
Check and assign labels:
It looks for specific labels like region
, tenant
, and has-sensitive-data
within the labels
object.
Depending on the values of these labels, it determines a newTag
.
Determine newTag
:
If the region
is ap-south-1
and the tenant
is private-a1
, it sets newTag
to private-a1-elastic-ap-south-1
. However, if the has-sensitive-data
label is set to true
, newTag
will instead be private-a1-s3-ap-south-1
.
If only the region
is ap-south-1
without the specific tenant
value, the logic is similar but uses the “general” prefix.
For any other region, the default is general-elastic
, unless has-sensitive-data
is true
, in which case it becomes general-s3
.
Modify and Return:
The determined newTag
is added to the original JSON. The modified record will look like this:
The function then converts the modified JSON string back to a byte slice and returns a pointer to its first byte.
Note that there’s an explicit null terminator added to the end of the string before converting it back to a byte slice. This is necessary for compatibility with whatever system reads this output, perhaps a C/C++ framework.
The main
function is empty because the primary function here (go_filter
) is meant to be exported and used as a plugin.
For more info on writing Wasm plugins, follow the official documentation.
Instructions for compiling the Wasm program
Initialize a new Golang project using the below command:
mkdir go-filter && go mod init go-filter
Copy the above Golang program in a file called filter.go
.
With our filter program written, let’s compile it using tinygo
:
tinygo build -wasm-abi=generic -target=wasi -o filter.wasm filter.go
Configuring Fluent Bit To use the Wasm plugin
Ultimately, we are building a pipeline that is structured like this:
Here’s the Fluent Bit configuration that enables the log processing pipeline depicted above:
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Exclude_Path /var/log/containers/*default_fluent-bit*
[FILTER]
Name kubernetes
Match kube.*
[FILTER]
Name wasm
match kube.*
Wasm_Path /fluent-bit/etc/filter.wasm
Function_Name go_filter
[FILTER]
Name rewrite_tag
Match kube.*
Rule $new_tag private-a1-s3-ap-south-1 private-a1-s3-ap-south-1 false
Rule $new_tag private-a1-elastic-ap-south-1 private-a1-elastic-ap-south-1 false
Rule $new_tag general-s3-ap-south-1 general-s3-ap-south-1 false
Rule $new_tag general-elastic-ap-south-1 general-elastic-ap-south-1 false
Rule $new_tag general-s3 general-s3 false
Rule $new_tag general-elastic general-elastic false
[OUTPUT]
Name stdout
Match kube.*
[OUTPUT]
Name stdout
Match private-a1-s3-ap-south-1
[OUTPUT]
Name stdout
Match private-a1-elastic-ap-south-1
[OUTPUT]
Name stdout
Match general-s3-ap-south-1
[OUTPUT]
Name stdout
Match general-elastic-ap-south-1
[OUTPUT]
Name stdout
Match general-s3
[OUTPUT]
Name stdout
Match general-elastic
Breaking down the configuration above, we define one input section:
- Tail: This input section captures all container logs and tags them with
kube.*
.
The filter section applies three filters:
- Kubernetes Filter: This filter appends Kubernetes metadata to all logs aligned with the
kube.*
tag. - Custom Wasm Filter: This section selects all the logs that match the tag
kube.*
and appendsnew_tag
to it, whose value is evaluated as per the criterias we discussed above. - Rewrite Tag Filter: This section selects all the logs that match the tag
kube.*
and applies a processing rule to them. The configuration value ofRule
field is mapped to the format$KEY REGEX NEW_TAG KEEP
- $KEY: The key represents the name of the record key that holds the value that we want to use to match our regular expression. In our case, it is
new_tag
. - Regex: We use simple equality regular expression. As Wasm plugin already evaluated the conditions.
- New Tag: If our regular expression matches the value of the defined key in the rule, we apply a new Tag for that specific record
- Keep: If a rule matches, the filter emits a copy of the record with the newly defined Tag. The
keep
property takes a boolean value to determine whether the original record with the old Tag should be preserved and continue in the pipeline or be discarded. In our case, we will be setting it tofalse
- $KEY: The key represents the name of the record key that holds the value that we want to use to match our regular expression. In our case, it is
For more information about the rewrite_tag
plugin, check the official documentation.
The output section of the configuration identifies multiple destinations.
We have configured seven plugins, each with a different match
value. All of them use the stdout
plugin, which sends data on Fluent Bit’s standard output. This is done for demonstration purposes only—in a practical scenario we would have sent it to S3, Elasticsearch, or some other destination.
Instructions for configuring Fluent Bit
Creating Kubernetes configmap:
For Fluent Bit to access the Wasm plugin, the plugin should be available as a file in the container. We will create a Kubernetes configmap, which contains the required Wasm file, and mount it to the Fluent Bit container as a file. Go to the directory where filter.wasm
exists and execute the below command:
kubectl create configmap wasm-filter --from-file=filter.wasm --namespace=default
Add the Fluent Bit Helm repo:
Use the command below to add the Fluent Bit Helm repository:
helm repo add fluent <https://fluent.github.io/helm-charts>
Override the default configuration:
Create a file called values.yaml
with the following contents:
config:
inputs: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Exclude_Path /var/log/containers/*default_fluent-bit*
filters: |
[FILTER]
Name kubernetes
Match kube.*
[FILTER]
Name wasm
match kube.*
Wasm_Path /fluent-bit/etc/wasm/config/filter.wasm
Function_Name go_filter
[FILTER]
Name rewrite_tag
Match kube.*
Rule $new_tag private-a1-s3-ap-south-1 private-a1-s3-ap-south-1 false
Rule $new_tag private-a1-elastic-ap-south-1 private-a1-elastic-ap-south-1 false
Rule $new_tag general-s3-ap-south-1 general-s3-ap-south-1 false
Rule $new_tag general-elastic-ap-south-1 general-elastic-ap-south-1 false
Rule $new_tag general-s3 general-s3 false
Rule $new_tag general-elastic general-elastic false
outputs: |
[OUTPUT]
Name stdout
Match kube.*
[OUTPUT]
Name stdout
Match private-a1-s3-ap-south-1
[OUTPUT]
Name stdout
Match private-a1-elastic-ap-south-1
[OUTPUT]
Name stdout
Match general-s3-ap-south-1
[OUTPUT]
Name stdout
Match general-elastic-ap-south-1
[OUTPUT]
Name stdout
Match general-s3
[OUTPUT]
Name stdout
Match general-elastic
extraVolumes:
- name: wasm-filter
configMap:
name: wasm-filter
extraVolumeMounts:
- name: wasm-filter
mountPath: /fluent-bit/etc/wasm/config
Deploy Fluent Bit
Use the command below:
helm upgrade -i fluent-bit fluent/fluent-bit --values values.yaml
Wait for the Fluent Bit pods to run
Ensure that the Fluent Bit pods reach the Running
state.
kubectl get pods
Verify Fluent Bit logs
Use the command below to check Fluent Bit logs:
kubectl logs -f
You should be able to view the modified tags as shown in the above below.
Conclusion & learning more
In this post, we examined how to use Fluent Bit and Wasm to dynamically route logs using Kubernetes labels.
The Wasm plugin is just one option for processing data with Fluent Bit. If you are interested in exploring more about Fluent Bit’s ability to process and transform streaming data we recommend the following:
- “Fluent Bit: Advanced Processing” — This on-demand webinar provides an introduction to processing with Fluent Bit and demonstrates best practices and real-world examples for redaction, reduction, enrichment, and tagging of log data.
- “Creating custom processing rules for Fluent Bit with Lua” — In addition to support for Wasm, Fluent Bit also supports custom scripts written in Lua. This step-by-step tutorial walks you through several examples.
To learn even more about Fluent Bit, check out Fluent Bit Academy, your destination for best practices and how-tos on advanced processing, routing, and all things Fluent Bit.
About Fluent Bit and Chronosphere
With Chronosphere’s acquisition of Calyptia in 2024, Chronosphere became the primary corporate sponsor of Fluent Bit. Eduardo Silva — the original creator of Fluent Bit and co-founder of Calyptia — leads a team of Chronosphere engineers dedicated full-time to the project, ensuring its continuous development and improvement.
Fluent Bit is a graduated project of the Cloud Native Computing Foundation (CNCF) under the umbrella of Fluentd, alongside other foundational technologies such as Kubernetes and Prometheus. Chronosphere is also a silver-level sponsor of the CNCF.