class Chore::Queues::SQS::Consumer
SQS
Consumer
for Chore
. Requests messages from SQS
and passes them to be worked on. Also controls deleting completed messages within SQS
.
Public Class Methods
@param [String] queue_name Name of SQS
queue @param [Hash] opts Options
Chore::Consumer::new
# File lib/chore/queues/sqs/consumer.rb, line 19 def initialize(queue_name, opts={}) super(queue_name, opts) raise Chore::TerribleMistake, "Cannot specify a queue polling size greater than 10" if sqs_polling_amount > 10 end
Resets the API client connection and provides @@reset_at so we know when the last time that was done
# File lib/chore/queues/sqs/consumer.rb, line 25 def self.reset_connection! @@reset_at = Time.now end
Public Instance Methods
Deletes the given message from the SQS
queue
@param [String] message_id Unique ID of the SQS
message @param [Hash] receipt_handle Receipt handle (unique per consume request) of the SQS
message
# File lib/chore/queues/sqs/consumer.rb, line 60 def complete(message_id, receipt_handle) Chore.logger.debug "Completing (deleting): #{message_id}" queue.delete_messages(entries: [{ id: message_id, receipt_handle: receipt_handle }]) end
Begins requesting messages from SQS
, which will invoke the +&handler+ over each message
@param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork
@return [Array<Aws::SQS::Message>]
# File lib/chore/queues/sqs/consumer.rb, line 34 def consume(&handler) while running? begin messages = handle_messages(&handler) sleep (Chore.config.consumer_sleep_interval) if messages.empty? rescue Aws::SQS::Errors::NonExistentQueue => e Chore.logger.error "You specified a queue '#{queue_name}' that does not exist. You must create the queue before starting Chore. Shutting down..." raise Chore::TerribleMistake rescue => e Chore.logger.error { "SQSConsumer#Consume: #{e.inspect} #{e.backtrace * "\n"}" } end end end
Delays retry of a job by backoff_calc
seconds.
@param [UnitOfWork] item Item to be delayed @param [Block] backoff_calc Code that determines the backoff.
# File lib/chore/queues/sqs/consumer.rb, line 69 def delay(item, backoff_calc) delay = backoff_calc.call(item) Chore.logger.debug "Delaying #{item.id} by #{delay} seconds" queue.change_message_visibility_batch(entries: [ { id: item.id, receipt_handle: item.receipt_handle, visibility_timeout: delay }, ]) return delay end
Private Instance Methods
Requests messages from SQS
, and invokes the provided +&block+ over each one. Afterwards, the :on_fetch hook will be invoked, per message
@param [Block] &handler Message handler, passed along by consume
@return [Array<Aws::SQS::Message>]
# File lib/chore/queues/sqs/consumer.rb, line 88 def handle_messages(&block) msg = queue.receive_messages(:max_number_of_messages => sqs_polling_amount, :attribute_names => ['ApproximateReceiveCount']) messages = *msg messages.each do |message| unless duplicate_message?(message.message_id, message.queue_url, queue_timeout) block.call(message.message_id, message.receipt_handle, queue_name, queue_timeout, message.body, message.attributes['ApproximateReceiveCount'].to_i - 1) end Chore.run_hooks_for(:on_fetch, message.receipt_handle, message.body) end messages end
Retrieves the SQS
queue object. The method will cache the results to prevent round trips on subsequent calls
If reset_connection!
has been called, this will result in the connection being re-initialized, as well as clear any cached results from prior calls
@return [Aws::SQS::Queue]
# File lib/chore/queues/sqs/consumer.rb, line 108 def queue if !@sqs_last_connected || (@@reset_at && @@reset_at >= @sqs_last_connected) Aws.empty_connection_pools! @sqs = nil @sqs_last_connected = Time.now @queue = nil end @queue_url ||= sqs.get_queue_url(queue_name: @queue_name).queue_url @queue ||= Aws::SQS::Queue.new(url: @queue_url, client: sqs) end
The visibility timeout (in seconds) of the queue
@return [Integer]
# File lib/chore/queues/sqs/consumer.rb, line 123 def queue_timeout @queue_timeout ||= queue.attributes['VisibilityTimeout'].to_i end
SQS
API client object
@return [Aws::SQS::Client]
# File lib/chore/queues/sqs/consumer.rb, line 130 def sqs @sqs ||= Chore::Queues::SQS.sqs_client end
Maximum number of messages to retrieve on each request
@return [Integer]
# File lib/chore/queues/sqs/consumer.rb, line 137 def sqs_polling_amount Chore.config.queue_polling_size end