class LogStash::Filters::Multiline
The multiline filter is for combining multiple events from a single source into the same event.
The original goal of this filter was to allow joining of multi-line messages from files into a single event. For example - joining java exception and stacktrace messages into a single event.
TODO(sissel): Document any issues? The config looks like this:
filter { multiline { type => "type" pattern => "pattern, a regexp" negate => boolean what => "previous" or "next" } }
The 'regexp' should match what you believe to be an indicator that the field is part of a multi-line event
The 'what' must be “previous” or “next” and indicates the relation to the multi-line event.
The 'negate' can be “true” or “false” (defaults false). If true, a message not matching the pattern will constitute a match of the multiline filter and the what will be applied. (vice-versa is also true)
For example, java stack traces are multiline and usually have the message starting at the far-left, then each subsequent line indented. Do this:
filter { multiline { type => "somefiletype" pattern => "^\s" what => "previous" } }
This says that any line starting with whitespace belongs to the previous line.
Another example is C line continuations (backslash). Here's how to do that:
filter { multiline { type => "somefiletype " pattern => "\\$" what => "next" } }
Public Class Methods
LogStash::Filters::Base::new
# File lib/logstash/filters/multiline.rb, line 113 def initialize(config = {}) super @threadsafe = false # This filter needs to keep state. @types = Hash.new { |h,k| h[k] = [] } @pending = Hash.new end
Public Instance Methods
Flush any pending messages. This is generally used for unit testing only.
Note: flush is disabled now; it is preferable to use the multiline codec.
# File lib/logstash/filters/multiline.rb, line 232 def __flush events = [] @pending.each do |key, value| value.uncancel events << value end @pending.clear return events end
# File lib/logstash/filters/multiline.rb, line 153 def filter(event) return unless filter?(event) if event["message"].is_a?(Array) match = @grok.match(event["message"].first) else match = @grok.match(event["message"]) end key = event.sprintf(@stream_identity) pending = @pending[key] @logger.debug("Multiline", :pattern => @pattern, :message => event["message"], :match => match, :negate => @negate) # Add negate option match = (match and !@negate) || (!match and @negate) case @what when "previous" if match event.tag "multiline" # previous previous line is part of this event. # append it to the event and cancel it if pending pending.append(event) else @pending[key] = event end event.cancel else # this line is not part of the previous event # if we have a pending event, it's done, send it. # put the current event into pending if pending tmp = event.to_hash event.overwrite(pending) @pending[key] = LogStash::Event.new(tmp) else @pending[key] = event event.cancel end # if/else pending end # if/else match when "next" if match event.tag "multiline" # this line is part of a multiline event, the next # line will be part, too, put it into pending. if pending pending.append(event) else @pending[key] = event end event.cancel else # if we have something in pending, join it with this message # and send it. otherwise, this is a new message and not part of # multiline, send it. if pending pending.append(event) event.overwrite(pending.to_hash) @pending.delete(key) end end # if/else match else # TODO(sissel): Make this part of the 'register' method. @logger.warn("Unknown multiline 'what' value.", :what => @what) end # case @what if !event.cancelled? event["message"] = event["message"].join("\n") if event["message"].is_a?(Array) event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array) filter_matched(event) if match end end
# File lib/logstash/filters/multiline.rb, line 124 def register require "grok-pure" # rubygem 'jls-grok' @grok = Grok.new @patterns_dir = @@patterns_path.to_a + @patterns_dir @patterns_dir.each do |path| # Can't read relative paths from jars, try to normalize away '../' while path =~ /file:\/.*\.jar!.*\/\.\.\// # replace /foo/bar/../baz => /foo/baz path = path.gsub(/[^\/]+\/\.\.\//, "") end if File.directory?(path) path = File.join(path, "*") end Dir.glob(path).each do |file| @logger.info("Grok loading patterns from file", :path => file) @grok.add_patterns_from_file(file) end end @grok.compile(@pattern) @logger.debug("Registered multiline plugin", :type => @type, :config => @config) end