class LogStash::Outputs::ZeroMQ

Write events to a 0MQ PUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this output plugin.

The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120

Public Instance Methods

publish(payload) click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 113
def publish(payload)
  @logger.debug? && @logger.debug("0mq: sending", :event => payload)
  if @topology == "pubsub"
    # TODO(sissel): Need to figure out how to fit this into the codecs system.
    #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
    #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE),
                #"in topic send_string")
  end
  error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
  @logger.warn("0mq output exception", :address => @address, :exception => e)
end
receive(event) click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 107
def receive(event)
  return unless output?(event)

  @codec.encode(event)
end
register() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 61
def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  if @mode == "server"
    workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!")
  end

  # Translate topology shorthand to socket types
  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR
  when "pushpull"
    zmq_const = ZMQ::PUSH
  when "pubsub"
    zmq_const = ZMQ::PUB
  end # case socket_type

  @zsocket = context.socket(zmq_const)

  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  @codec.on_event(&method(:publish))
end
teardown() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 97
def teardown
  error_check(@zsocket.close, "while closing the socket")
end

Private Instance Methods

server?() click to toggle source
# File lib/logstash/outputs/zeromq.rb, line 102
def server?
  @mode == "server"
end