class Chore::Consumer

Base class for a Chore Consumer. Provides the interface that a Chore::Consumer implementation should adhere to.

Attributes

queue_name[RW]

Public Class Methods

new(queue_name, opts={}) click to toggle source

@param [String] queue_name Name of queue to be consumed from @param [Hash] opts

# File lib/chore/consumer.rb, line 15
def initialize(queue_name, opts={})
  @queue_name = queue_name
  @running = true
end
reset_connection!() click to toggle source

Causes the underlying connection for all consumers of this class to be reset. Useful for the case where the consumer is being used across a fork. Should be overriden in consumers (but is not required).

# File lib/chore/consumer.rb, line 22
def self.reset_connection!
end

Public Instance Methods

complete(message_id, receipt_handle) click to toggle source

Complete should mark a message as finished.

@param [String] message_id Unique ID of the message @param [Hash] receipt_handle Unique ID of the consuming transaction in non-filesystem implementations

# File lib/chore/consumer.rb, line 46
def complete(message_id, receipt_handle)
  raise NotImplementedError
end
consume(&handler) click to toggle source

Consume takes a block with an arity of two. The two params are |message_id,message_body| where message_id is any object that the consumer will need to be able to act on a message later (reject, complete, etc)

@param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork

# File lib/chore/consumer.rb, line 30
def consume(&handler)
  raise NotImplementedError
end
dupe_detector() click to toggle source

Instance of duplicate detection implementation class

@return [DuplicateDetector]

# File lib/chore/consumer.rb, line 83
def dupe_detector
  @dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers,
                                    :dupe_on_cache_failure => false})
end
duplicate_message?(dedupe_key, klass, queue_timeout) click to toggle source

Determine whether or not we have already seen this message

@param [String] dedupe_key @param [Class] klass @param [Integer] queue_timeout

@return [TrueClass, FalseClass]

# File lib/chore/consumer.rb, line 76
def duplicate_message?(dedupe_key, klass, queue_timeout)
  dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout)
end
provide_work(n) click to toggle source

Returns up to n work

@param n

# File lib/chore/consumer.rb, line 65
def provide_work(n)
  raise NotImplementedError
end
reject(message_id) click to toggle source

Reject should put a message back on a queue to be processed again later. It takes a message_id as returned via consume.

@param [String] message_id Unique ID of the message

# File lib/chore/consumer.rb, line 38
def reject(message_id)
  raise NotImplementedError
end
running?() click to toggle source

Returns true if the Consumer is currently running

@return [TrueClass, FalseClass]

# File lib/chore/consumer.rb, line 58
def running?
  @running
end
stop() click to toggle source

Perform any shutdown behavior and stop consuming messages

# File lib/chore/consumer.rb, line 51
def stop
  @running = false
end

Private Instance Methods

handle_messages(&handler) click to toggle source

Gets messages from queue implementation and invokes the provided block over each one. Afterwards, the :on_fetch hook will be invoked per message. This block call provides data necessary for the worker (calling context) to populate a UnitOfWork struct.

@param [Block] &handler Message handler, passed along by consume

# File lib/chore/consumer.rb, line 95
def handle_messages(&handler)
  raise NotImplementedError
end