class LogStash::Inputs::Unix

Read events over a UNIX socket.

Like stdin and file inputs, each event is assumed to be one line of text.

Can either accept connections from clients or connect to a server, depending on `mode`.

Public Class Methods

new(*args) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/unix.rb, line 36
def initialize(*args)
  super(*args)
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/unix.rb, line 41
def register
  require "socket"
  require "timeout"

  if server?
    @logger.info("Starting unix input listener", :address => "#{@path}", :force_unlink => "#{@force_unlink}")
    begin
      @server_socket = UNIXServer.new(@path)
    rescue Errno::EADDRINUSE, IOError
      if @force_unlink
        File.unlink(@path)
        begin
          @server_socket = UNIXServer.new(@path)
          return
        rescue Errno::EADDRINUSE, IOError
          @logger.error("!!!Could not start UNIX server: Address in use",
                        :path => @path)
          raise
        end
      end
      @logger.error("Could not start UNIX server: Address in use",
                    :path => @path)
      raise
    end
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/unix.rb, line 113
  def run(output_queue)
    if server?
      @thread = Thread.current
      @client_threads = []
      loop do
        # Start a new thread for each connection.
        begin
          @client_threads << Thread.start(@server_socket.accept) do |s|
            # TODO(sissel): put this block in its own method.

            @logger.debug("Accepted connection",
                          :server => "#{@path}")
            begin
              handle_socket(s, output_queue)
            rescue Interrupted
              s.close rescue nil
            end
          end # Thread.start
        rescue IOError, Interrupted
          if @interrupted
            # Intended shutdown, get out of the loop
            @server_socket.close
            @client_threads.each do |thread|
              thread.raise(IOError.new)
            end
            break
          else
            # Else it was a genuine IOError caused by something else, so propagate it up..
            raise
          end
        end
      end # loop
    else
      loop do
        client_socket = UNIXSocket.new(@path)
        client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
        @logger.debug("Opened connection", :client => @path)
        handle_socket(client_socket, output_queue)
      end # loop
    end
  end # def run

  public
  def teardown
    if server?
      File.unlink(@path)
      @interrupted = true
      @thread.raise(Interrupted.new)
    end
  end # def teardown
end
teardown() click to toggle source
# File lib/logstash/inputs/unix.rb, line 156
def teardown
  if server?
    File.unlink(@path)
    @interrupted = true
    @thread.raise(Interrupted.new)
  end
end

Private Instance Methods

handle_socket(socket, output_queue) click to toggle source
# File lib/logstash/inputs/unix.rb, line 69
def handle_socket(socket, output_queue)
  begin
    hostname = Socket.gethostname
    loop do
      buf = nil
      # NOTE(petef): the timeout only hits after the line is read
      # or socket dies
      # TODO(sissel): Why do we have a timeout here? What's the point?
      if @data_timeout == -1
        buf = socket.readpartial(16384)
      else
        Timeout::timeout(@data_timeout) do
          buf = socket.readpartial(16384)
        end
      end
      @codec.decode(buf) do |event|
        decorate(event)
        event["host"] = hostname
        event["path"] = @path
        output_queue << event
      end
    end # loop do
  rescue => e
    @logger.debug("Closing connection", :path => @path,
    :exception => e, :backtrace => e.backtrace)
  rescue Timeout::Error
    @logger.debug("Closing connection after read timeout",
    :path => @path)
  end # begin

ensure
  begin
    socket.close
  rescue IOError
    #pass
  end # begin
end
server?() click to toggle source
# File lib/logstash/inputs/unix.rb, line 108
def server?
  @mode == "server"
end