If you own an app, you generate logs. And if you have multiple apps, the logs pile up. The volume and intricacy of these logs can sometimes be so massive that even the standard features of popular logging pipelines like Fluentd or Vector fall short. That's why you decided to create your tool, a binary you planned to place in the middle of your processing chain. On the surface, this might seem like a questionable choice, as it's generally best to use domain-specific languages (DSLs) whenever possible. But by writing your tool, you gain more customization options, versatility, and other enhanced capabilities that can enrich your processing.
At one of my previous workplaces, we had a Fluent pipeline with an executable as part of the standard rules. The pipeline starts, typically by grabbing logs from Filebeat and parsing them into JSON.
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<source>
@type tail
path /app/filebeat/logs/json
pos_file /app/filebeat/logs/json.pos
read_lines_limit 100
tag filebeat.json
<parse>
@type json
</parse>
</source>
Okay, so here's the deal. We're adding our logprocessor
binary to process each line of input from files ending with the .log
extension. This way, the match
directive generates its output that can then be passed on to subsequent stages in the log pipeline.
<match **.log>
@type exec_filter
child_respawn -1
command sh -c "logprocessor --json"
<format>
@type json
</format>
<parse>
@type json
</parse>
</match>
Let's take a break for a moment. Why might we need a custom log processor? Well, for instance, this tool can read plain-text logs using regular expressions and combine non-obvious multiline messages. It can also verify the date/time format and log levels of logs generated by various applications written in different languages with their diverse standard library loggers, ultimately outputting the logs in a unified format.
It's time to join the trend and head where all the cool kids are. Embrace the Vector engine (written in Rust) to up your performance and be a true hipster. You might think that the only thing you need here is to replace XML with TOML, but I don't think so. And the issue is, in Vector, there is no command
option to direct logs into stdin
of your executable. It has only exec
sources that work slightly differently.
Let's see the example. You've defined some sources and getting logs from Docker:
[sources.docker_json_logs]
type = "docker_logs"
docker_host = "unix:///var/run/docker.sock"
include_labels = ["format=json"]
And you have an exec
source:
[sources.format_json_logs]
type = "exec"
command = ["sh", "-c", "logprocessor --json"]
Did you notice something odd? Both code snippets talk about the source. And this example definitely won't work. We need somehow to pass the logs into the stdin
of our executable.
A suitable solution would be to save the first source to a file and utilize it as follows:
[sinks.write_json_logs]
type = "file"
inputs = ["docker_json_logs"]
path = "/app/file"
encoding.codec = "ndjson"
[sources.format_json_logs]
type = "exec"
command = ["sh", "-c", "tail -f /app/file | logprocessor --json"]
mode = "streaming"
working_directory = "/logs/json"
streaming.respawn_on_exit = true
We're leveraging the exec
source with the streaming
mode to continuously retrieve logs and redirect them to the desired location, the logprocessor
's stdin
. And, of course, transfer logs further according to our pipeline.
The precise issue is that we have a file, which means we have to rotate and handle it in every way possible, which is incorrect. But, the timeless classic, the Linux special file, is known as a pipe and called mkfifo
, will come to our rescue.
Let's read the exact description from Linux Man Pages:
Once you have created a FIFO special file in this way, any process can open it for reading or writing, in the same way as an ordinary file. However, it has to be open at both ends simultaneously before you can proceed to do any input or output operations on it. Opening a FIFO for reading normally blocks until some other process opens the same FIFO for writing, and vice versa.
In simpler terms, you can write and read from a file without using any disk space. It functions like a pipe but with some volume limitations (usually limited to a 1 MB buffer). However, using the tail
command with the pipe will likely never reach those limits.
Keep an eye out for the p
letter in the output of ls -l
:
mkfifo pipe
ls -l pipe
prw-r--r-- 0 hackernooner hackernoon 4 Feb 00:36 -- pipe
Let's do it a bit smarter and make a bash wrapper, adding stderr
redirects in case something goes wrong with binary (we can also add logprocessor_error.log
file into the part of the log pipeline):
#!/usr/bin/env bash
tail -f -n +1 pipe | RUST_LOG=debug logprocessor $@ 2>logprocessor_error.log
And add it to the Vector configuration section:
[sinks.write_json_logs]
type = "file"
inputs = ["docker_json_logs"]
path = "/app/pipe"
encoding.codec = "ndjson"
[sources.format_json_logs]
type = "exec"
command = ["sh", "-c", "logprocessor.sh --json"]
mode = "streaming"
working_directory = "/app"
streaming.respawn_on_exit = true
And there you have it! With this knowledge, you're now equipped to process each incoming log line through a custom executable log processor for both Fluentd and Vector pipeline tools. The power is in your hands!