class CrystalMQ::Consumer
Public Class Methods
new(host, topic, channel)
click to toggle source
# File lib/crystalmq.rb, line 40 def initialize(host, topic, channel) @topic = topic @channel = channel @host = host connect_socket rescue SocketError connect_socket retry end
Public Instance Methods
connect_socket()
click to toggle source
# File lib/crystalmq.rb, line 50 def connect_socket @socket = TCPSocket.new(@host, 1235) @socket.sync = true rescue SocketError retry end
consume() { |message| ... }
click to toggle source
# File lib/crystalmq.rb, line 57 def consume @socket.write(ConsumerPayload.new(@topic, @channel).to_msgpack) to_process = [] unpacker = MessagePack::Unpacker.new(@socket) loop do message = MessagePayload.from_msgpack(unpacker.read) yield message.message end @socket.close rescue SocketError connect_socket retry end