Logstash
Logstash is a pipeline for data ingesting, filtering and transformation. It is written using JRuby and Java. So It is quite a headache to navigate between Ruby code and Java code. In Logstash, there are 3 main concepts: input, filter and output. All of them are pluggable and configurable in its configuration yaml file.
Pipeline
How it works? Let’s dive into the code. Here we use v8.5.1.
- The main function is here. It calls the
run
method and interesting things happen. It gets a ruby scriptInputStream script = config.getScriptSource()
and then runs it. Where is this script? A few lines after, you see ruby config comes from environment.rb. environment.rb
simply calls LogStash::Runner.run, andrunner.rb
starts an agent.- This agent acts like a controller. It mainly does two things. First, it starts a web server which provides stats endpoints for logstash. Second, it create pipelines. Well, it first figures out the pipeline creation actions from the configuration file and then executes these actions, i.e., the “converge” process. Let’s see how it constructs the pipeline actions. This part is delegated to a file
state_resolver.rb
. Following the call chain, you see it creates JavaPipelines. This is where Java and Ruby starts to interleave. The base class ofJavaPipeline
, i.e.,JavaBasePipeline
is defined in a Java file. Annotation@JRubyClass(name = "JavaBasePipeline")
creates the mapping between Java world and Ruby world. A Ruby class subclasses a Java class! There are a lot of details that follow from this point. Read below subsections!
Pipeline queue
In the initialization function of JavaPipeline
, it calls a function open_queue. This is a method in the base class. Notice that how @JRubyMethod(name = "open_queue")
connects Java and Ruby worlds. This function basically creates a queue and creates an input client (write client) and a filter client (read client) for this queue. OK. At this point, we have a taste of how logstash works internally.
Let’s see what queue it creates and the capacity of this queue. The logic is here. Aha! it supports two modes: file mode and memory mode. File mode is marketed as persisted
mode. So basically, memory
mode has potential of losing messages during abrupt shutdown. Let’s focus on the memory mode as it is the default mode. It creates a BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
with capacity pipeline.batch.size * pipeline.workers
. Therefore, write client or read client can get blocked if the other is slow. This data can be found from the logstash stats api. I want to stop here. There are already a lot of details talked, you can finish the rest or read this article.
Pipeline workers
Inside a JavaPipeline
, you can have multiple workers that fetch messages from the BlockingQueue
. The main logic is here. It first set up output plugins, i.e., maybe_setup_out_plugins
, and then get the number of worker from config: pipeline_workers = safe_pipeline_worker_count
. Then it creates this many of worker threads.
1
2
3
worker_loops = pipeline_workers.times
.map { Thread.new { init_worker_loop } }
.map(&:value)
Plugins
You can use bin/logstash-plugin list
to list the current installed plugins. Logstash plugins are written in ruby.
Logstash 6.6.0 starts to support native Java plugins. Unfortunately, the Elasticsearch output plugin is still written in Ruby.
Ruby filter plugin
Ruby filter plugin is my favorite pipeline. It allows you writing inline ruby code in logstash yaml configuration file. The core part is this line, which calls your Ruby script for each event
. Also, see the Apis of event
here.
Metrics
You can configure MetricBeat to collect metrics. You can also query the stats endpoints directly.