Post

Logstash

Logstash is a pipeline for data ingesting, filtering and transformation. It is written using JRuby and Java. So It is quite a headache to navigate between Ruby code and Java code. In Logstash, there are 3 main concepts: input, filter and output. All of them are pluggable and configurable in its configuration yaml file.

Pipeline

How it works? Let’s dive into the code. Here we use v8.5.1.

  • The main function is here. It calls the run method and interesting things happen. It gets a ruby script InputStream script = config.getScriptSource() and then runs it. Where is this script? A few lines after, you see ruby config comes from environment.rb.
  • environment.rb simply calls LogStash::Runner.run, and runner.rb starts an agent.
  • This agent acts like a controller. It mainly does two things. First, it starts a web server which provides stats endpoints for logstash. Second, it create pipelines. Well, it first figures out the pipeline creation actions from the configuration file and then executes these actions, i.e., the “converge” process. Let’s see how it constructs the pipeline actions. This part is delegated to a file state_resolver.rb. Following the call chain, you see it creates JavaPipelines. This is where Java and Ruby starts to interleave. The base class of JavaPipeline, i.e., JavaBasePipeline is defined in a Java file. Annotation @JRubyClass(name = "JavaBasePipeline") creates the mapping between Java world and Ruby world. A Ruby class subclasses a Java class! There are a lot of details that follow from this point. Read below subsections!

Pipeline queue

In the initialization function of JavaPipeline, it calls a function open_queue. This is a method in the base class. Notice that how @JRubyMethod(name = "open_queue") connects Java and Ruby worlds. This function basically creates a queue and creates an input client (write client) and a filter client (read client) for this queue. OK. At this point, we have a taste of how logstash works internally.

Let’s see what queue it creates and the capacity of this queue. The logic is here. Aha! it supports two modes: file mode and memory mode. File mode is marketed as persisted mode. So basically, memory mode has potential of losing messages during abrupt shutdown. Let’s focus on the memory mode as it is the default mode. It creates a BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue; with capacity pipeline.batch.size * pipeline.workers. Therefore, write client or read client can get blocked if the other is slow. This data can be found from the logstash stats api. I want to stop here. There are already a lot of details talked, you can finish the rest or read this article.

Pipeline workers

Inside a JavaPipeline, you can have multiple workers that fetch messages from the BlockingQueue. The main logic is here. It first set up output plugins, i.e., maybe_setup_out_plugins, and then get the number of worker from config: pipeline_workers = safe_pipeline_worker_count. Then it creates this many of worker threads.

1
2
3
worker_loops = pipeline_workers.times
  .map { Thread.new { init_worker_loop } }
  .map(&:value)

Plugins

You can use bin/logstash-plugin list to list the current installed plugins. Logstash plugins are written in ruby.

Logstash 6.6.0 starts to support native Java plugins. Unfortunately, the Elasticsearch output plugin is still written in Ruby.

Ruby filter plugin

Ruby filter plugin is my favorite pipeline. It allows you writing inline ruby code in logstash yaml configuration file. The core part is this line, which calls your Ruby script for each event. Also, see the Apis of event here.

Metrics

You can configure MetricBeat to collect metrics. You can also query the stats endpoints directly.

This post is licensed under CC BY 4.0 by the author.