class LogStash::Inputs::Tcp

Read events over a TCP socket.

Like stdin and file inputs, each event is assumed to be one line of text.

Can either accept connections from clients or connect to a server, depending on `mode`.

Public Class Methods

new(*args) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/tcp.rb, line 57
def initialize(*args)
  super(*args)
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 62
def register
  require "socket"
  require "timeout"
  require "openssl"
  if @ssl_enable
    @ssl_context = OpenSSL::SSL::SSLContext.new
    @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
    @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
    if @ssl_verify
      @cert_store = OpenSSL::X509::Store.new
      # Load the system default certificate path to the store
      @cert_store.set_default_paths
      if File.directory?(@ssl_cacert)
        @cert_store.add_path(@ssl_cacert)
      else
        @cert_store.add_file(@ssl_cacert)
      end
      @ssl_context.cert_store = @cert_store
      @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
    end
  end # @ssl_enable

  if server?
    @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
    begin
      @server_socket = TCPServer.new(@host, @port)
    rescue Errno::EADDRINUSE
      @logger.error("Could not start TCP server: Address in use",
                    :host => @host, :port => @port)
      raise
    end
    if @ssl_enable
      @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
    end # @ssl_enable
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 149
def run(output_queue)
  if server?
    run_server(output_queue)
  else
    run_client(output_queue)
  end
end
run_client(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 200
  def run_client(output_queue) 
    @thread = Thread.current
    while true
      client_socket = TCPSocket.new(@host, @port)
      if @ssl_enable
        client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
        begin
          client_socket.connect
        rescue OpenSSL::SSL::SSLError => ssle
          @logger.error("SSL Error", :exception => ssle,
                        :backtrace => ssle.backtrace)
          # NOTE(mrichar1): Hack to prevent hammering peer
          sleep(5)
          next
        end
      end
      client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
      @logger.debug("Opened connection", :client => "#{client_socket.peer}")
      handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone)
    end # loop
  ensure
    client_socket.close
  end # def run

  public
  def teardown
    if server?
      @interrupted = true
      @thread.raise(LogStash::ShutdownSignal)
    end
  end # def teardown
end
run_server(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 157
  def run_server(output_queue)
    @thread = Thread.current
    @client_threads = []
    loop do
      # Start a new thread for each connection.
      begin
        @client_threads << Thread.start(@server_socket.accept) do |s|
          # TODO(sissel): put this block in its own method.

          # monkeypatch a 'peer' method onto the socket.
          s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
          @logger.debug("Accepted connection", :client => s.peer,
                        :server => "#{@host}:#{@port}")
          begin
            handle_socket(s, s.peer, output_queue, @codec.clone)
          rescue Interrupted
            s.close rescue nil
          end
        end # Thread.start
      rescue OpenSSL::SSL::SSLError => ssle
        # NOTE(mrichar1): This doesn't return a useful error message for some reason
        @logger.error("SSL Error", :exception => ssle,
                      :backtrace => ssle.backtrace)
      rescue IOError, LogStash::ShutdownSignal
        if @interrupted
          # Intended shutdown, get out of the loop
          @server_socket.close
          @client_threads.each do |thread|
            thread.raise(LogStash::ShutdownSignal)
          end
          break
        else
          # Else it was a genuine IOError caused by something else, so propagate it up..
          raise
        end
      end
    end # loop
  rescue LogStash::ShutdownSignal
    # nothing to do
  ensure
    @server_socket.close
  end # def run_server

  def run_client(output_queue) 
    @thread = Thread.current
    while true
      client_socket = TCPSocket.new(@host, @port)
      if @ssl_enable
        client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
        begin
          client_socket.connect
        rescue OpenSSL::SSL::SSLError => ssle
          @logger.error("SSL Error", :exception => ssle,
                        :backtrace => ssle.backtrace)
          # NOTE(mrichar1): Hack to prevent hammering peer
          sleep(5)
          next
        end
      end
      client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
      @logger.debug("Opened connection", :client => "#{client_socket.peer}")
      handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone)
    end # loop
  ensure
    client_socket.close
  end # def run

  public
  def teardown
    if server?
      @interrupted = true
      @thread.raise(LogStash::ShutdownSignal)
    end
  end # def teardown
end 
teardown() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 225
def teardown
  if server?
    @interrupted = true
    @thread.raise(LogStash::ShutdownSignal)
  end
end

Private Instance Methods

handle_socket(socket, client_address, output_queue, codec) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 100
def handle_socket(socket, client_address, output_queue, codec)
  while true
    buf = nil
    # NOTE(petef): the timeout only hits after the line is read
    # or socket dies
    # TODO(sissel): Why do we have a timeout here? What's the point?
    if @data_timeout == -1
      buf = read(socket)
    else
      Timeout::timeout(@data_timeout) do
        buf = read(socket)
      end
    end
    codec.decode(buf) do |event|
      decorate(event)
      event["host"] = client_address
      event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
      output_queue << event
    end
  end # loop do
rescue => e
  codec.respond_to?(:flush) && codec.flush do |event|
    decorate(event)
    event["host"] = client_address
    event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
    output_queue << event
  end

  @logger.debug("An error occurred. Closing connection",
                :client => socket.peer, :exception => e)
ensure
  begin
    socket.close
  rescue IOError
    #pass
  end # begin
end
read(socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 144
def read(socket)
  return socket.sysread(16384)
end
server?() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 139
def server?
  @mode == "server"
end