class LogStash::Inputs::ZeroMQ

Read events over a 0MQ SUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this input plugin.

The default settings will create a subscriber binding to tcp://127.0.0.1:2120 waiting for connecting publishers.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 74
def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR 
  when "pushpull"
    zmq_const = ZMQ::PULL
  when "pubsub"
    zmq_const = ZMQ::SUB
  end # case socket_type
  @zsocket = context.socket(zmq_const)
  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  if @topology == "pubsub"
    if @topic.nil?
      @logger.debug("ZMQ - No topic provided. Subscribing to all messages")
      error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
    "while setting ZMQ::SUBSCRIBE")
    else
      @topic.each do |t|
        @logger.debug("ZMQ subscribing to topic: #{t}")
        error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
      "while setting ZMQ::SUBSCRIBE == #{t}")
      end
    end
  end

end
run(output_queue) click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 123
def run(output_queue)
  host = Socket.gethostname
  begin
    loop do
      # Here's the unified receiver
      # Get the first part as the msg
      m1 = ""
      rc = @zsocket.recv_string(m1)
      error_check(rc, "in recv_string")
      @logger.debug("ZMQ receiving", :event => m1)
      msg = m1
      # If we have more parts, we'll eat the first as the topic
      # and set the message to the second part
      if @zsocket.more_parts?
        @logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}")
        m2 = ''
        rc2 = @zsocket.recv_string(m2)
        error_check(rc2, "in recv_string")
        @logger.debug("ZMQ receiving", :event => m2)
        msg = m2
      end

      @codec.decode(msg) do |event|
        event["host"] ||= host
        decorate(event)
        output_queue << event
      end
    end
  rescue LogStash::ShutdownSignal
    # shutdown
    return
  rescue => e
    @logger.debug("ZMQ Error", :subscriber => @zsocket,
                  :exception => e)
    retry
  end # begin
end
server?() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 119
def server?
  @mode == "server"
end
teardown() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 115
def teardown
  error_check(@zsocket.close, "while closing the zmq socket")
end

Private Instance Methods

build_source_string() click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 162
def build_source_string
  id = @address.first.clone
end