class LogStash::Outputs::ZeroMQ
Write events to a 0MQ PUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this output plugin.
The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120
Public Instance Methods
publish(payload)
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 113 def publish(payload) @logger.debug? && @logger.debug("0mq: sending", :event => payload) if @topology == "pubsub" # TODO(sissel): Need to figure out how to fit this into the codecs system. #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}") #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE), #"in topic send_string") end error_check(@zsocket.send_string(payload), "in send_string") rescue => e @logger.warn("0mq output exception", :address => @address, :exception => e) end
receive(event)
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 107 def receive(event) return unless output?(event) @codec.encode(event) end
register()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 61 def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) if @mode == "server" workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!") end # Translate topology shorthand to socket types case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PUSH when "pubsub" zmq_const = ZMQ::PUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end @codec.on_event(&method(:publish)) end
teardown()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 97 def teardown error_check(@zsocket.close, "while closing the socket") end
Private Instance Methods
server?()
click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 102 def server? @mode == "server" end