class LogStash::Outputs::Tcp

Write events over a TCP socket.

Each event json is separated by a newline.

Can either accept connections from clients or connect to a server, depending on `mode`.

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/tcp.rb, line 134
def receive(event)
  return unless output?(event)

  #if @message_format
    #output = event.sprintf(@message_format) + "\n"
  #else
    #output = event.to_hash.to_json + "\n"
  #end
  
  @codec.encode(event)
end
register() click to toggle source
# File lib/logstash/outputs/tcp.rb, line 70
def register
  require "stud/try"
  if server?
    workers_not_supported

    @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
    @server_socket = TCPServer.new(@host, @port)
    @client_threads = []

    @accept_thread = Thread.new(@server_socket) do |server_socket|
      loop do
        client_thread = Thread.start(server_socket.accept) do |client_socket|
          client = Client.new(client_socket, @logger)
          Thread.current[:client] = client
          client.run
        end
        @client_threads << client_thread
      end
    end

    @codec.on_event do |payload|
      @client_threads.each do |client_thread|
        client_thread[:client].write(payload)
      end
      @client_threads.reject! {|t| !t.alive? }
    end
  else
    client_socket = nil
    @codec.on_event do |payload|
      begin
        client_socket = connect unless client_socket
        r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
        # don't expect any reads, but a readable socket might
        # mean the remote end closed, so read it and throw it away.
        # we'll get an EOFError if it happens.
        client_socket.sysread(16384) if r.any?

        # Now send the payload
        client_socket.syswrite(payload) if w.any?
      rescue => e
        @logger.warn("tcp output exception", :host => @host, :port => @port,
                     :exception => e, :backtrace => e.backtrace)
        client_socket.close rescue nil
        client_socket = nil
        sleep @reconnect_interval
        retry
      end
    end
  end
end

Private Instance Methods

connect() click to toggle source
# File lib/logstash/outputs/tcp.rb, line 122
def connect
  Stud::try do
    return TCPSocket.new(@host, @port)
  end
end
server?() click to toggle source
# File lib/logstash/outputs/tcp.rb, line 129
def server?
  @mode == "server"
end