class LogStash::Inputs::ZeroMQ
Read events over a 0MQ SUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this input plugin.
The default settings will create a subscriber binding to tcp://127.0.0.1:2120 waiting for connecting publishers.
Public Instance Methods
register()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 74 def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PULL when "pubsub" zmq_const = ZMQ::SUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end if @topology == "pubsub" if @topic.nil? @logger.debug("ZMQ - No topic provided. Subscribing to all messages") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""), "while setting ZMQ::SUBSCRIBE") else @topic.each do |t| @logger.debug("ZMQ subscribing to topic: #{t}") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t), "while setting ZMQ::SUBSCRIBE == #{t}") end end end end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 123 def run(output_queue) host = Socket.gethostname begin loop do # Here's the unified receiver # Get the first part as the msg m1 = "" rc = @zsocket.recv_string(m1) error_check(rc, "in recv_string") @logger.debug("ZMQ receiving", :event => m1) msg = m1 # If we have more parts, we'll eat the first as the topic # and set the message to the second part if @zsocket.more_parts? @logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}") m2 = '' rc2 = @zsocket.recv_string(m2) error_check(rc2, "in recv_string") @logger.debug("ZMQ receiving", :event => m2) msg = m2 end @codec.decode(msg) do |event| event["host"] ||= host decorate(event) output_queue << event end end rescue LogStash::ShutdownSignal # shutdown return rescue => e @logger.debug("ZMQ Error", :subscriber => @zsocket, :exception => e) retry end # begin end
server?()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 119 def server? @mode == "server" end
teardown()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 115 def teardown error_check(@zsocket.close, "while closing the zmq socket") end
Private Instance Methods
build_source_string()
click to toggle source
# File lib/logstash/inputs/zeromq.rb, line 162 def build_source_string id = @address.first.clone end