class LogStash::Inputs::Relp

Read RELP events over a TCP socket.

For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>

This protocol implements application-level acknowledgements to help protect against message loss.

Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted

Public Class Methods

new(*args) click to toggle source
Calls superclass method LogStash::Inputs::Base::new
# File lib/logstash/inputs/relp.rb, line 32
def initialize(*args)
  super(*args)
end

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/relp.rb, line 37
def register
  @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
  @relp_server = RelpServer.new(@host, @port,['syslog'])
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/relp.rb, line 60
  def run(output_queue)
    @thread = Thread.current
    loop do
      begin
        # Start a new thread for each connection.
        Thread.start(@relp_server.accept) do |client|
            rs = client[0]
            socket = client[1]
            # monkeypatch a 'peer' method onto the socket.
            socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
            peer = socket.peer
            @logger.debug("Relp Connection to #{peer} created")
          begin
            relp_stream(rs,socket, output_queue, peer)
          rescue Relp::ConnectionClosed => e
            @logger.debug("Relp Connection to #{peer} Closed")
          rescue Relp::RelpError => e
            @logger.warn('Relp error: '+e.class.to_s+' '+e.message)
            #TODO: Still not happy with this, are they all warn level?
            #Will this catch everything I want it to?
            #Relp spec says to close connection on error, ensure this is the case
          end
        end # Thread.start
      rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
        @logger.warn('Relp client trying to open connection with something other than open:'+e.message)
      rescue Relp::InsufficientCommands
        @logger.warn('Relp client incapable of syslog')
      rescue IOError, Interrupted
        if @interrupted
          # Intended shutdown, get out of the loop
          @relp_server.shutdown
          break
        else
          # Else it was a genuine IOError caused by something else, so propagate it up..
          raise
        end
      end
    end # loop
  end # def run

  def teardown
    @interrupted = true
    @thread.raise(Interrupted.new)
  end
end
teardown() click to toggle source
# File lib/logstash/inputs/relp.rb, line 100
def teardown
  @interrupted = true
  @thread.raise(Interrupted.new)
end

Private Instance Methods

relp_stream(relpserver,socket,output_queue,client_address) click to toggle source
# File lib/logstash/inputs/relp.rb, line 43
def relp_stream(relpserver,socket,output_queue,client_address)
  loop do
    frame = relpserver.syslog_read(socket)
    @codec.decode(frame["message"]) do |event|
      decorate(event)
      event["host"] = client_address
      output_queue << event
    end

    #To get this far, the message must have made it into the queue for
    #filtering. I don't think it's possible to wait for output before ack
    #without fundamentally breaking the plugin architecture
    relpserver.ack(socket, frame['txnr'])
  end
end