Logstash performance and configration

Command

LS_HEAP_SIZE=2g ./logstash-5.1.1/bin/logstash -f posts.conf \
  --pipeline.batch.size 5000 --pipeline.workers 8

Internal

Overview

The logstash event processing pipeline has 3 stages: inputs -> filters -> outputs

  • Inputs generate events
  • Filters modify events
    • intermediary processing devices
    • can combine filters with conditionals to perform an action on events meet certain criteria
    • grok (parse and structure arbitray text/120+patterns built-in)
    • mutate (general transformations on event fileds/rename,remove,replace/modify fields)
    • geoip (add information about geographical location of IP addresses)
  • Outputs ship events elsewhere

Inputs/Outputs support codecs(json/protobuf/multiline)(enable to encode or decode the data as it enters or exits the pipeline without having to use a separate filter).

Redis is often used as a “broker” in a centralized Logstash installation, which queues Logstash events from remote Logstash “shippers”.

Data Resiliency

To guard against data loss and ensure that events flow through the pipeline without interruption, Logstash provides the following data resiliency features.

  • Persistent Queues (protect against data loss by storing events in an internal queue on disk)
    • Absorbs bursts of events without needing an external buffering mechanism like Redis or Apache Kafka
    • Provides an at-least-once delivery guarantee
  • Dead Letter Queues (provide on-disk storage for events that Logstash is unable to process. dead_letter_queue input plugin could be used to easily reprocess events in the dead letter queue

Table of Contents

Input

  • elasticsearch
  • file
  • graphite
  • http (receives events over HTTP(S)
  • http_poller (decodes the output of an HTTP API into events)
  • jdbc
  • jms
  • jmx
  • kafka
  • rabbitmq
  • redis
  • sqlite
  • syslog
  • tcp
  • udp
  • websocket

JDBC

jdbc_default_timezone

SQL does not allow for timezone data in timestamp fields. You must use a canonical timezone, America/Denver, for example.

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.40-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc://11.22.33.44:3600/test?allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai"
    jdbc_user => "user"
    jdbc_password => "passwd"
    schedule => "* * * * *"
    # statement_filepath => "rank.sql"
    statement => "
    select user_id as uid, user_email as email from test where ${CONDITION} and create_time > :sql_last_value
    "
    use_column_value => true
    tracking_column => create_time
  }
}

Filter

  • grok and mutate can be used to type the data

Coercing datatype in Logstash

grok

%{NUMBER:num:int}

mutate

filter {
  mutate {
    convert => { "num" => "integer" }
  }
}
filter {
  ruby {
    code => "
require 'digest'
event.set('[@metadata][mail]', event.get('email'))
event.set('[@metadata][_id]', Digest::MD5.new.update(event.get('uid')).digest.unpack('Q').first)
reputation = event.get('reputations').split(',').map(&:to_i)
event.set('reputation', reputation.reduce(0, :+))
event.remove('uid')
event.remove('email')
event.remove('@version')
event.remove('@timestamp')
"
  }
}

Output

  • csv
  • elasticsearch
  • email
  • exec
  • file
  • graphite
  • http (send events to a generic HTTP(S) endpoint
  • influxdb
  • kafka
  • mongodb
  • rabbitmq
  • redis
  • solr_http
  • syslog
  • tcp
  • udp
  • webhdfs
  • websocket

Configuration

output {
  stdout { codec => rubydebug { metadata => true } }
  file {
    path => "user.csv"
    codec => line { format => "user %{[@metadata[uid]} %{nickname}" }
  }
  elasticsearch {
    hosts => ["1.2.3.4", "1.2.3.5"]
    action => "update"
    doc_as_upsert => false
    index => "users"
    document_type => "doc"
    flush_size => 5000
    document_id => "%{[@metadata][_id]}"
  }
}

Metrics

filter {
  metrics {
    meter => "events"
    add_tag => "metric"
  }
}

output {
  elasticsearch {
    "document_id" => "%{user_id}"
  }
  if "metric" in [tags] {
    stdout {
      codec => line { format => "rate: %{[events][rate_1m]}" }
    }
  }
  # null {}
}

Performance

pipeline.workers => 48
pipeline.batch_size => 125
pipeline.batch_delay => 5
pipeline.max_inflight => 60000
  • remove mutate filter
    • 2.0 k/s -> 3.7 k/s
  • select * from table -> select fields from table
    • 2.0 k/s -> 11.0 k/s
  • flush_size
    • no effect
  • workers => 8, nproc => 8
    • 38 k/s, CPU 80%
  • workers => 8, nproc => 10
    • 45 k/s, CPU 95%

Output

Elasticsearch

filter {
    ruby {
    code => "require 'digest'
             _id = Digest.MD5.new.update(event.get('id')).digest.unpack('Q').first
             event.set('[@metadata][_id]', _id)
             event.remove('@version')
             event.remove('@timestamp')
            "
    }
}

output {
    # stdout { codec => rubydebug { metadata => true } }
    elasticsearch {
        hosts => ['192.168.249.128', '192.168.249.196']
        action => 'update'
        doc_as_upsert => false
        index => 'posts-index'
        document_type => 'post'
        document_id => "%{[@metadata][_id]}"
    }
}

logstash-5.1.1/vendor/bundle/jruby/19./gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb

def perform_request(method, path, params={}, body=nil)
  with_connection do |url|
    # temporary enabled to print body
    # p body
    resp = perform_request_to_url(url, method, path, params, body)
    [url, resp]
  end
end

File

output {
  file {
    path => 'stat.csv'
    codec => line { format => "%{[@metadata[_id]} %{timestamp} %{level} %{msg}" }
  }
}

Source

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-file-4.0.1/lib/logstash/outputs/file.rb

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-csv-3.0.2/lib/logstash/outputs/csv.rb

# CSV output.
#
# Write events to disk in CSV or other delimited format
# Based on the file output, many config values are shared
# Uses the Ruby csv library internally
class LogStash::Outputs::CSV < LogStash::Outputs::File
# This output writes events to files on disk. You can use fields
# from the event as parts of the filename and/or path.
#
# By default, this output writes one event per line in **json** format.
# You can customise the line format using the `line` codec like
# [source,ruby]
# output {
#  file {
#    path => ...
#    codec => line { format => "custom format: %{message}"}
#  }
# }
class LogStash::Outputs::File < LogStash::Outputs::Base
  concurrency :shared

  public
  def multi_receive_encoded(events_and_encoded)
    encoded_by_path = Hash.new {|h,k| h[k] = []}

    events_and_encoded.each do |event,encoded|
      file_output_path = event_path(event)
      encoded_by_path[file_output_path] << encoded
    end

    @io_mutex.synchronize do
      encoded_by_path.each do |path,chunks|
        fd = open(path)
        chunks.each {|chunk| fd.write(chunk) }
        fd.flush
      end
      
      close_stale_files
    end   
  end # def receive

/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb

class LogStash::Outputs::Base < LogStash::Plugin

/usr/share/logstash/logstash-core/lib/logstash/plugin.rb

class LogStash::Plugin

  # close is called during shutdown, after the plugin worker
  # main task terminates
  def do_close
    @logger.debug("closing", :plugin => self.class.name)
    close
  end

  # Subclasses should implement this close method if you need to perform any
  # special tasks during shutdown (like flushing, etc.)
  def close
    # ..
  end

/usr/share/logstash/logstash-core/lib/logstash-core/logstash-core.rb

require "java"

module LogStash
end

require "logstash-core_jars"

# local dev setup
classes_dir = File.expand_path("../../../build/classes/main", __FILE__)
resources_dir = File.expand_path("../../../build/resources/main", __FILE__)

if File.directory?(classes_dir) && File.directory?(resources_dir)
  # if in local dev setup, add target to classpath
  $CLASSPATH << classes_dir unless $CLASSPATH.include?(classes_dir)
  $CLASSPATH << resources_dir unless $CLASSPATH.include?(resources_dir)
else
  # otherwise use included jar
  begin
    require "logstash-core/logstash-core.jar"
  rescue Exception => e
    raise("Error loading logstash-core/logstash-core.jar file, cause: #{e.message}")
  end
end

/usr/share/logstash/logstash-core/lib/logstash-core/logstash-core.jar

META-INF/
META-INF/MANIFEST.MF
org/
org/logstash/
org/logstash/log/
org/logstash/log/LogstashLogEventFactory.class
org/logstash/log/CustomLogEvent.class
org/logstash/log/CustomLogEventSerializer.class
org/logstash/log/LogstashMessageFactory.class
org/logstash/log/StructuredMessage.class
org/logstash/common/
org/logstash/common/io/
org/logstash/common/io/PageIO.class
org/logstash/common/io/BufferedChecksum.class
org/logstash/common/io/CheckpointIOFactory.class
org/logstash/common/io/FileCheckpointIO.class
org/logstash/common/io/BufferedChecksumStreamInput.class
org/logstash/common/io/ByteBufferPageIO.class
org/logstash/common/io/StreamInput.class
org/logstash/common/io/ByteBufferStreamInput.class
org/logstash/common/io/PageIOFactory.class
org/logstash/common/io/InputStreamStreamInput.class
org/logstash/common/io/Streams.class
org/logstash/common/io/wip/
org/logstash/common/io/wip/MemoryPageIOStream.class
org/logstash/common/io/ByteArrayStreamOutput.class
org/logstash/common/io/MemoryCheckpointIO.class
org/logstash/common/io/BufferedChecksumStreamOutput.class
org/logstash/common/io/CheckpointIO.class
org/logstash/common/io/StreamOutput.class
org/logstash/common/io/MmapPageIO.class
org/logstash/ackedqueue/
org/logstash/ackedqueue/Settings.class
org/logstash/ackedqueue/Queue$TailPageResult.class
org/logstash/ackedqueue/Checkpoint.class
org/logstash/ackedqueue/TailPage.class
org/logstash/ackedqueue/Batch.class
org/logstash/ackedqueue/Queue.class
org/logstash/ackedqueue/MemorySettings.class
org/logstash/ackedqueue/FileSettings.class
org/logstash/ackedqueue/Page.class
org/logstash/ackedqueue/SequencedList.class
org/logstash/ackedqueue/QueueRuntimeException.class
org/logstash/ackedqueue/Queueable.class
org/logstash/ackedqueue/HeadPage.class
log4j2.component.properties

References