class LogStash::Inputs::Gelf

Read gelf messages as events over the network.

This input is a good choice if you already use graylog2 today.

The main reasoning for this input is to leverage existing GELF logging libraries such as the gelf log4j appender

Public Class Methods

new(params) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/gelf.rb, line 48
def initialize(params)
  super
  BasicSocket.do_not_reverse_lookup = true
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/gelf.rb, line 54
def register
  require 'gelfd'
  @udp = nil
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/gelf.rb, line 60
def run(output_queue)
  begin
    # udp server
    udp_listener(output_queue)
  rescue => e
    @logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
    sleep(5)
    retry
  end # begin
end

Private Instance Methods

remap_gelf(event) click to toggle source
# File lib/logstash/inputs/gelf.rb, line 116
def remap_gelf(event)
  if event["full_message"]
    event["message"] = event["full_message"].dup
    event.remove("full_message")
    if event["short_message"] == event["message"]
      event.remove("short_message")
    end
  elsif event["short_message"]
    event["message"] = event["short_message"].dup
    event.remove("short_message")
  end
end
strip_leading_underscore(event) click to toggle source
# File lib/logstash/inputs/gelf.rb, line 130
def strip_leading_underscore(event)
   # Map all '_foo' fields to simply 'foo'
   event.to_hash.keys.each do |key|
     next unless key[0,1] == "_"
     event[key[1..-1]] = event[key]
     event.remove(key)
   end
end
udp_listener(output_queue) click to toggle source
# File lib/logstash/inputs/gelf.rb, line 72
def udp_listener(output_queue)
  @logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")

  if @udp 
    @udp.close_read rescue nil
    @udp.close_write rescue nil
  end

  @udp = UDPSocket.new(Socket::AF_INET)
  @udp.bind(@host, @port)

  while true
    line, client = @udp.recvfrom(8192)
    begin
      data = Gelfd::Parser.parse(line)
    rescue => ex
      @logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace)
      next
    end
    
    # Gelfd parser outputs null if it received and cached a non-final chunk
    next if data.nil?    
 
    event = LogStash::Event.new(JSON.parse(data))
    event["source_host"] = client[3]
    if event["timestamp"].is_a?(Numeric)
      event["@timestamp"] = Time.at(event["timestamp"]).gmtime
      event.remove("timestamp")
    end
    remap_gelf(event) if @remap
    strip_leading_underscore(event) if @strip_leading_underscore
    decorate(event)
    output_queue << event
  end
rescue LogStash::ShutdownSignal
  # Do nothing, shutdown.
ensure
  if @udp
    @udp.close_read rescue nil
    @udp.close_write rescue nil
  end
end