class LogStash::Inputs::Log4j

Read events over a TCP socket from Log4j SocketAppender.

Can either accept connections from clients or connect to a server, depending on `mode`. Depending on mode, you need a matching SocketAppender or SocketHubAppender on the remote side

Public Class Methods

new(*args) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/log4j.rb, line 35
def initialize(*args)
  super(*args)
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/log4j.rb, line 40
def register
  require "java"
  require "jruby/serialization"

  if __FILE__ !~ /^(jar:)?file:\/\//
    if File.exists?("vendor/jar/elasticsearch-0.90.3/lib/log4j-1.2.17.jar")
      require "vendor/jar/elasticsearch-0.90.3/lib/log4j-1.2.17.jar"
    end
  end

  if server?
    @logger.info("Starting Log4j input listener", :address => "#{@host}:#{@port}")
    @server_socket = TCPServer.new(@host, @port)
  end
  @logger.info("Log4j input")
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/log4j.rb, line 113
  def run(output_queue)
    if server?
      loop do
        # Start a new thread for each connection.
        Thread.start(@server_socket.accept) do |s|
          # TODO(sissel): put this block in its own method.

          # monkeypatch a 'peer' method onto the socket.
          s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
          @logger.debug("Accepted connection", :client => s.peer,
                        :server => "#{@host}:#{@port}")
          handle_socket(s, output_queue)
        end # Thread.start
      end # loop
    else
      loop do
        client_socket = TCPSocket.new(@host, @port)
        client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
        @logger.debug("Opened connection", :client => "#{client_socket.peer}")
        handle_socket(client_socket, output_queue)
      end # loop
    end
  end # def run
end

Private Instance Methods

handle_socket(socket, output_queue) click to toggle source
# File lib/logstash/inputs/log4j.rb, line 58
def handle_socket(socket, output_queue)
  begin
    # JRubyObjectInputStream uses JRuby class path to find the class to de-serialize to
    ois = JRubyObjectInputStream.new(java.io.BufferedInputStream.new(socket.to_inputstream))
    loop do
      # NOTE: event_raw is org.apache.log4j.spi.LoggingEvent
      log4j_obj = ois.readObject
      event = LogStash::Event.new("message" => log4j_obj.getRenderedMessage)
      decorate(event)
      event["host"] = socket.peer
      event["path"] = log4j_obj.getLoggerName
      event["priority"] = log4j_obj.getLevel.toString
      event["logger_name"] = log4j_obj.getLoggerName
      event["thread"] = log4j_obj.getThreadName
      event["class"] = log4j_obj.getLocationInformation.getClassName
      event["file"] = log4j_obj.getLocationInformation.getFileName + ":" + log4j_obj.getLocationInformation.getLineNumber
      event["method"] = log4j_obj.getLocationInformation.getMethodName
      event["NDC"] = log4j_obj.getNDC if log4j_obj.getNDC
      event["stack_trace"] = log4j_obj.getThrowableStrRep.to_a.join("\n") if log4j_obj.getThrowableInformation
      
      # Add the MDC context properties to '@fields'
      if log4j_obj.getProperties
        log4j_obj.getPropertyKeySet.each do |key|
          event[key] = log4j_obj.getProperty(key)
        end  
      end  

      output_queue << event
    end # loop do
  rescue => e
    @logger.debug("Closing connection", :client => socket.peer,
                  :exception => e)
  rescue Timeout::Error
    @logger.debug("Closing connection after read timeout",
                  :client => socket.peer)
  end # begin
ensure
  begin
    socket.close
  rescue IOError
    pass
  end # begin
end
readline(socket) click to toggle source
# File lib/logstash/inputs/log4j.rb, line 108
def readline(socket)
  line = socket.readline
end
server?() click to toggle source
# File lib/logstash/inputs/log4j.rb, line 103
def server?
  @mode == "server"
end