AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).
@see AMQP::Queue @see AMQP::Queue#subscribe @see AMQP::Queue#cancel
@return [Hash] Custom subscription metadata
@return [AMQP::Channel] Channel this consumer uses
@return [String] Consumer tag, unique consumer identifier
@return [AMQP::Queue] Queue messages are consumed from
# File lib/amqp/consumer.rb, line 49 def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) @callbacks = Hash.new @channel = channel || raise(ArgumentError, "channel is nil") @connection = channel.connection || raise(ArgumentError, "connection is nil") @queue = queue || raise(ArgumentError, "queue is nil") @consumer_tag = consumer_tag || self.class.tag_generator.generate_for(queue) @exclusive = exclusive @no_ack = no_ack @arguments = arguments @no_local = no_local self.register_with_channel self.register_with_queue end
@return [AMQP::ConsumerTagGenerator] Consumer tag generator
# File lib/amqp/consumer.rb, line 38 def self.tag_generator @tag_generator ||= AMQP::ConsumerTagGenerator.new end
@param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances @return [AMQP::ConsumerTagGenerator] Provided argument
# File lib/amqp/consumer.rb, line 44 def self.tag_generator=(generator) @tag_generator = generator end
Acknowledge a delivery tag. @return [Consumer] self
@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
# File lib/amqp/consumer.rb, line 199 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
@api plugin
# File lib/amqp/consumer.rb, line 274 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.resubscribe self.exec_callback_yielding_self(:after_recovery) end
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 254 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end
Legacy {AMQP::Queue} API compatibility. @private @deprecated
# File lib/amqp/consumer.rb, line 140 def callback if @callbacks[:delivery] @callbacks[:delivery].first end end
@return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 113 def cancel(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) if !nowait self.redefine_callback(:cancel, &block) @channel.consumers_awaiting_cancel_ok.push(self) end self end end self end
Begin consuming messages from the queue @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 75 def consume(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments)) if !nowait self.redefine_callback(:consume, &block) @channel.consumers_awaiting_consume_ok.push(self) end self end end self end
@return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
# File lib/amqp/consumer.rb, line 67 def exclusive? !!@exclusive end
# File lib/amqp/consumer.rb, line 186 def handle_cancel(basic_cancel) self.exec_callback(:scancel, basic_cancel) end
# File lib/amqp/consumer.rb, line 300 def handle_cancel_ok(cancel_ok) self.exec_callback_once(:cancel, cancel_ok) self.unregister_with_channel self.unregister_with_queue @consumer_tag = nil # detach from object graph so that this object will be garbage-collected @queue = nil @channel = nil @connection = nil self.clear_callbacks(:delivery) self.clear_callbacks(:consume) self.clear_callbacks(:cancel) self.clear_callbacks(:scancel) end
@private
# File lib/amqp/consumer.rb, line 244 def handle_connection_interruption(method = nil) self.exec_callback_yielding_self(:after_connection_interruption) end
# File lib/amqp/consumer.rb, line 296 def handle_consume_ok(consume_ok) self.exec_callback_once(:consume, consume_ok) end
Implementation
# File lib/amqp/consumer.rb, line 292 def handle_delivery(basic_deliver, metadata, payload) self.exec_callback(:delivery, basic_deliver, metadata, payload) end
@return [String] Readable representation of relevant object state.
# File lib/amqp/consumer.rb, line 175 def inspect "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}" end
# File lib/amqp/consumer.rb, line 180 def on_cancel(&block) self.append_callback(:scancel, &block) self end
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 238 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end
Register a block that will be used to handle delivered messages.
@return [AMQP::Consumer] self @see AMQP::Queue#subscribe
# File lib/amqp/consumer.rb, line 151 def on_delivery(&block) # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. delivery_shim = Proc.new { |basic_deliver, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload) else h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key) end } self.append_callback(:delivery, &delivery_shim) self end
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
@api public
# File lib/amqp/consumer.rb, line 224 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end
@return [Consumer] self
@api public @see bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
# File lib/amqp/consumer.rb, line 210 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end
Used by automatic recovery code. @api plugin @return [AMQP::Consumer] self
# File lib/amqp/consumer.rb, line 95 def resubscribe(&block) @channel.once_open do @queue.once_declared do self.unregister_with_channel @consumer_tag = self.class.tag_generator.generate_for(@queue) self.register_with_channel @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments)) self.redefine_callback(:consume, &block) if block self end end self end
@private
# File lib/amqp/consumer.rb, line 264 def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) end
@private
# File lib/amqp/consumer.rb, line 259 def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) end
{AMQP::Queue} API compatibility.
@return [Boolean] true if this consumer is active (subscribed for message delivery) @api public
# File lib/amqp/consumer.rb, line 133 def subscribed? !@callbacks[:delivery].empty? end
@endgroup
# File lib/amqp/consumer.rb, line 283 def to_s "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>" end
# File lib/amqp/consumer.rb, line 353 def register_with_channel @channel.consumers[@consumer_tag] = self end
# File lib/amqp/consumer.rb, line 357 def register_with_queue @queue.consumers[@consumer_tag] = self end
# File lib/amqp/consumer.rb, line 361 def unregister_with_channel @channel.consumers.delete(@consumer_tag) end
# File lib/amqp/consumer.rb, line 365 def unregister_with_queue @queue.consumers.delete(@consumer_tag) end