class Chore::Queues::SQS::Consumer

SQS Consumer for Chore. Requests messages from SQS and passes them to be worked on. Also controls deleting completed messages within SQS.

Public Class Methods

new(queue_name, opts={}) click to toggle source

@param [String] queue_name Name of SQS queue @param [Hash] opts Options

Calls superclass method Chore::Consumer::new
# File lib/chore/queues/sqs/consumer.rb, line 19
def initialize(queue_name, opts={})
  super(queue_name, opts)
  raise Chore::TerribleMistake, "Cannot specify a queue polling size greater than 10" if sqs_polling_amount > 10
end
reset_connection!() click to toggle source

Resets the API client connection and provides @@reset_at so we know when the last time that was done

# File lib/chore/queues/sqs/consumer.rb, line 25
def self.reset_connection!
  @@reset_at = Time.now
end

Public Instance Methods

complete(message_id, receipt_handle) click to toggle source

Deletes the given message from the SQS queue

@param [String] message_id Unique ID of the SQS message @param [Hash] receipt_handle Receipt handle (unique per consume request) of the SQS message

# File lib/chore/queues/sqs/consumer.rb, line 60
def complete(message_id, receipt_handle)
  Chore.logger.debug "Completing (deleting): #{message_id}"
  queue.delete_messages(entries: [{ id: message_id, receipt_handle: receipt_handle }])
end
consume(&handler) click to toggle source

Begins requesting messages from SQS, which will invoke the +&handler+ over each message

@param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork

@return [Array<Aws::SQS::Message>]

# File lib/chore/queues/sqs/consumer.rb, line 34
def consume(&handler)
  while running?
    begin
      messages = handle_messages(&handler)
      sleep (Chore.config.consumer_sleep_interval) if messages.empty?
    rescue Aws::SQS::Errors::NonExistentQueue => e
      Chore.logger.error "You specified a queue '#{queue_name}' that does not exist. You must create the queue before starting Chore. Shutting down..."
      raise Chore::TerribleMistake
    rescue => e
      Chore.logger.error { "SQSConsumer#Consume: #{e.inspect} #{e.backtrace * "\n"}" }
    end
  end
end
delay(item, backoff_calc) click to toggle source

Delays retry of a job by backoff_calc seconds.

@param [UnitOfWork] item Item to be delayed @param [Block] backoff_calc Code that determines the backoff.

# File lib/chore/queues/sqs/consumer.rb, line 69
def delay(item, backoff_calc)
  delay = backoff_calc.call(item)
  Chore.logger.debug "Delaying #{item.id} by #{delay} seconds"

  queue.change_message_visibility_batch(entries: [
    { id: item.id, receipt_handle: item.receipt_handle, visibility_timeout: delay },
  ])

  return delay
end
reject(message_id) click to toggle source

Unimplemented. Rejects the given message from SQS.

@param [String] message_id Unique ID of the SQS message

@return nil

# File lib/chore/queues/sqs/consumer.rb, line 53
def reject(message_id)
end

Private Instance Methods

handle_messages(&block) click to toggle source

Requests messages from SQS, and invokes the provided +&block+ over each one. Afterwards, the :on_fetch hook will be invoked, per message

@param [Block] &handler Message handler, passed along by consume

@return [Array<Aws::SQS::Message>]

# File lib/chore/queues/sqs/consumer.rb, line 88
def handle_messages(&block)
  msg = queue.receive_messages(:max_number_of_messages => sqs_polling_amount, :attribute_names => ['ApproximateReceiveCount'])
  messages = *msg

  messages.each do |message|
    unless duplicate_message?(message.message_id, message.queue_url, queue_timeout)
      block.call(message.message_id, message.receipt_handle, queue_name, queue_timeout, message.body, message.attributes['ApproximateReceiveCount'].to_i - 1)
    end
    Chore.run_hooks_for(:on_fetch, message.receipt_handle, message.body)
  end

  messages
end
queue() click to toggle source

Retrieves the SQS queue object. The method will cache the results to prevent round trips on subsequent calls

If reset_connection! has been called, this will result in the connection being re-initialized, as well as clear any cached results from prior calls

@return [Aws::SQS::Queue]

# File lib/chore/queues/sqs/consumer.rb, line 108
def queue
  if !@sqs_last_connected || (@@reset_at && @@reset_at >= @sqs_last_connected)
    Aws.empty_connection_pools!
    @sqs = nil
    @sqs_last_connected = Time.now
    @queue = nil
  end

  @queue_url ||= sqs.get_queue_url(queue_name: @queue_name).queue_url
  @queue ||= Aws::SQS::Queue.new(url: @queue_url, client: sqs)
end
queue_timeout() click to toggle source

The visibility timeout (in seconds) of the queue

@return [Integer]

# File lib/chore/queues/sqs/consumer.rb, line 123
def queue_timeout
  @queue_timeout ||= queue.attributes['VisibilityTimeout'].to_i
end
sqs() click to toggle source

SQS API client object

@return [Aws::SQS::Client]

# File lib/chore/queues/sqs/consumer.rb, line 130
def sqs
  @sqs ||= Chore::Queues::SQS.sqs_client
end
sqs_polling_amount() click to toggle source

Maximum number of messages to retrieve on each request

@return [Integer]

# File lib/chore/queues/sqs/consumer.rb, line 137
def sqs_polling_amount
  Chore.config.queue_polling_size
end