class LogStash::Filters::Elasticsearch
Search elasticsearch for a previous log event and copy some fields from it into the current event. Below is a complete example of how this filter might be used. Whenever logstash receives an “end” event, it uses this elasticsearch filter to find the matching “start” event based on some operation identifier. Then it copies the @timestamp field from the “start” event into a new field on the “end” event. Finally, using a combination of the “date” filter and the “ruby” filter, we calculate the time duration in hours between the two events.
if [type] == "end" { elasticsearch { hosts => ["es-server"] query => "type:start AND operation:%{[opid]}" fields => ["@timestamp", "started"] } date { match => ["[started]", "ISO8601"] target => "[started]" } ruby { code => "event['duration_hrs'] = (event['@timestamp'] - event['started']) / 3600 rescue nil" } }
Public Instance Methods
filter(event)
click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 55 def filter(event) return unless filter?(event) begin query_str = event.sprintf(@query) results = @client.search q: query_str, sort: @sort, size: 1 @fields.each do |old, new| event[new] = results['hits']['hits'][0]['_source'][old] end filter_matched(event) rescue => e @logger.warn("Failed to query elasticsearch for previous event", :query => query_str, :event => event, :error => e) end end
register()
click to toggle source
# File lib/logstash/filters/elasticsearch.rb, line 47 def register require "elasticsearch" @logger.info("New ElasticSearch filter", :hosts => @hosts) @client = Elasticsearch::Client.new hosts: @hosts end