Logstash performance and configration

Command

LS_HEAP_SIZE=2g ./logstash-5.1.1/bin/logstash -f posts.conf \
  --pipeline.batch.size 5000 --pipeline.workers 8

Input

JDBC

jdbc_default_timezone

SQL does not allow for timezone data in timestamp fields. You must use a canonical timezone, America/Denver, for example.

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.40-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc://11.22.33.44:3600/test?allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai"
    jdbc_user => "user"
    jdbc_password => "passwd"
    schedule => "* * * * *"
    # statement_filepath => "rank.sql"
    statement => "
    select user_id as uid, user_email as email from test where ${CONDITION} and create_time > :sql_last_value
    "
    use_column_value => true
    tracking_column => create_time
  }
}

Filter

  • grok and mutate can be used to type the data

Coercing datatype in Logstash

grok

%{NUMBER:num:int}

mutate

filter {
  mutate {
    convert => { "num" => "integer" }
  }
}
filter {
  ruby {
    code => "
require 'digest'
event.set('[@metadata][mail]', event.get('email'))
event.set('[@metadata][_id]', Digest::MD5.new.update(event.get('uid')).digest.unpack('Q').first)
reputation = event.get('reputations').split(',').map(&:to_i)
event.set('reputation', reputation.reduce(0, :+))
event.remove('uid')
event.remove('email')
event.remove('@version')
event.remove('@timestamp')
"
  }
}

Output

output {
  stdout { codec => rubydebug { metadata => true } }
  file {
    path => "user.csv"
    codec => line { format => "user %{[@metadata[uid]} %{nickname}" }
  }
  elasticsearch {
    hosts => ["1.2.3.4", "1.2.3.5"]
    action => "update"
    doc_as_upsert => false
    index => "users"
    document_type => "doc"
    flush_size => 5000
    document_id => "%{[@metadata][_id]}"
  }
}

Metrics

filter {
  metrics {
    meter => "events"
    add_tag => "metric"
  }
}

output {
  elasticsearch {
    "document_id" => "%{user_id}"
  }
  if "metric" in [tags] {
    stdout {
      codec => line { format => "rate: %{[events][rate_1m]}" }
    }
  }
  # null {}
}

Performance

pipeline.workers => 48
pipeline.batch_size => 125
pipeline.batch_delay => 5
pipeline.max_inflight => 60000
  • remove mutate filter
    • 2.0 k/s -> 3.7 k/s
  • select * from table -> select fields from table
    • 2.0 k/s -> 11.0 k/s
  • flush_size
    • no effect
  • workers => 8, nproc => 8
    • 38 k/s, CPU 80%
  • workers => 8, nproc => 10
    • 45 k/s, CPU 95%

Output

Elasticsearch

filter {
    ruby {
    code => "require 'digest'
             _id = Digest.MD5.new.update(event.get('id')).digest.unpack('Q').first
             event.set('[@metadata][_id]', _id)
             event.remove('@version')
             event.remove('@timestamp')
            "
    }
}

output {
    # stdout { codec => rubydebug { metadata => true } }
    elasticsearch {
        hosts => ['192.168.249.128', '192.168.249.196']
        action => 'update'
        doc_as_upsert => false
        index => 'posts-index'
        document_type => 'post'
        document_id => "%{[@metadata][_id]}"
    }
}

logstash-5.1.1/vendor/bundle/jruby/19./gems/logstash-output-elasticsearch-5.4.0-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb

def perform_request(method, path, params={}, body=nil)
  with_connection do |url|
    # temporary enabled to print body
    # p body
    resp = perform_request_to_url(url, method, path, params, body)
    [url, resp]
  end
end

File

output {
  file {
    path => 'stat.csv'
    codec => line { format => "%{[@metadata[_id]} %{timestamp} %{level} %{msg}" }
  }
}

Source

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-file-4.0.1/lib/logstash/outputs/file.rb

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-csv-3.0.2/lib/logstash/outputs/csv.rb

# CSV output.
#
# Write events to disk in CSV or other delimited format
# Based on the file output, many config values are shared
# Uses the Ruby csv library internally
class LogStash::Outputs::CSV < LogStash::Outputs::File
# This output writes events to files on disk. You can use fields
# from the event as parts of the filename and/or path.
#
# By default, this output writes one event per line in **json** format.
# You can customise the line format using the `line` codec like
# [source,ruby]
# output {
#  file {
#    path => ...
#    codec => line { format => "custom format: %{message}"}
#  }
# }
class LogStash::Outputs::File < LogStash::Outputs::Base
  concurrency :shared

  public
  def multi_receive_encoded(events_and_encoded)
    encoded_by_path = Hash.new {|h,k| h[k] = []}

    events_and_encoded.each do |event,encoded|
      file_output_path = event_path(event)
      encoded_by_path[file_output_path] << encoded
    end

    @io_mutex.synchronize do
      encoded_by_path.each do |path,chunks|
        fd = open(path)
        chunks.each {|chunk| fd.write(chunk) }
        fd.flush
      end
      
      close_stale_files
    end   
  end # def receive

/usr/share/logstash/logstash-core/lib/logstash/outputs/base.rb

class LogStash::Outputs::Base < LogStash::Plugin

/usr/share/logstash/logstash-core/lib/logstash/plugin.rb

class LogStash::Plugin

  # close is called during shutdown, after the plugin worker
  # main task terminates
  def do_close
    @logger.debug("closing", :plugin => self.class.name)
    close
  end

  # Subclasses should implement this close method if you need to perform any
  # special tasks during shutdown (like flushing, etc.)
  def close
    # ..
  end

/usr/share/logstash/logstash-core/lib/logstash-core/logstash-core.rb

require "java"

module LogStash
end

require "logstash-core_jars"

# local dev setup
classes_dir = File.expand_path("../../../build/classes/main", __FILE__)
resources_dir = File.expand_path("../../../build/resources/main", __FILE__)

if File.directory?(classes_dir) && File.directory?(resources_dir)
  # if in local dev setup, add target to classpath
  $CLASSPATH << classes_dir unless $CLASSPATH.include?(classes_dir)
  $CLASSPATH << resources_dir unless $CLASSPATH.include?(resources_dir)
else
  # otherwise use included jar
  begin
    require "logstash-core/logstash-core.jar"
  rescue Exception => e
    raise("Error loading logstash-core/logstash-core.jar file, cause: #{e.message}")
  end
end

/usr/share/logstash/logstash-core/lib/logstash-core/logstash-core.jar

META-INF/
META-INF/MANIFEST.MF
org/
org/logstash/
org/logstash/log/
org/logstash/log/LogstashLogEventFactory.class
org/logstash/log/CustomLogEvent.class
org/logstash/log/CustomLogEventSerializer.class
org/logstash/log/LogstashMessageFactory.class
org/logstash/log/StructuredMessage.class
org/logstash/common/
org/logstash/common/io/
org/logstash/common/io/PageIO.class
org/logstash/common/io/BufferedChecksum.class
org/logstash/common/io/CheckpointIOFactory.class
org/logstash/common/io/FileCheckpointIO.class
org/logstash/common/io/BufferedChecksumStreamInput.class
org/logstash/common/io/ByteBufferPageIO.class
org/logstash/common/io/StreamInput.class
org/logstash/common/io/ByteBufferStreamInput.class
org/logstash/common/io/PageIOFactory.class
org/logstash/common/io/InputStreamStreamInput.class
org/logstash/common/io/Streams.class
org/logstash/common/io/wip/
org/logstash/common/io/wip/MemoryPageIOStream.class
org/logstash/common/io/ByteArrayStreamOutput.class
org/logstash/common/io/MemoryCheckpointIO.class
org/logstash/common/io/BufferedChecksumStreamOutput.class
org/logstash/common/io/CheckpointIO.class
org/logstash/common/io/StreamOutput.class
org/logstash/common/io/MmapPageIO.class
org/logstash/ackedqueue/
org/logstash/ackedqueue/Settings.class
org/logstash/ackedqueue/Queue$TailPageResult.class
org/logstash/ackedqueue/Checkpoint.class
org/logstash/ackedqueue/TailPage.class
org/logstash/ackedqueue/Batch.class
org/logstash/ackedqueue/Queue.class
org/logstash/ackedqueue/MemorySettings.class
org/logstash/ackedqueue/FileSettings.class
org/logstash/ackedqueue/Page.class
org/logstash/ackedqueue/SequencedList.class
org/logstash/ackedqueue/QueueRuntimeException.class
org/logstash/ackedqueue/Queueable.class
org/logstash/ackedqueue/HeadPage.class
log4j2.component.properties