class CrystalMQ::Consumer

Public Class Methods

new(host, topic, channel) click to toggle source
# File lib/crystalmq.rb, line 40
def initialize(host, topic, channel)
  @topic = topic
  @channel = channel
  @host = host
  connect_socket
rescue SocketError
  connect_socket
  retry
end

Public Instance Methods

connect_socket() click to toggle source
# File lib/crystalmq.rb, line 50
def connect_socket
  @socket = TCPSocket.new(@host, 1235)
  @socket.sync = true
rescue SocketError
  retry
end
consume() { |message| ... } click to toggle source
# File lib/crystalmq.rb, line 57
def consume
  @socket.write(ConsumerPayload.new(@topic, @channel).to_msgpack)
  to_process = []
  unpacker = MessagePack::Unpacker.new(@socket)
  
  loop do
    message = MessagePayload.from_msgpack(unpacker.read)
    yield message.message
  end
  @socket.close
rescue SocketError
  connect_socket
  retry
end