class Pebbles::River::Worker

Implements a queue worker.

Attributes

handler[R]
queue_options[R]
river[R]

Public Class Methods

new(handler, options = {}) click to toggle source

Initializes worker with a handler. Options:

  • `queue`: Same queue options as `River#queue`.

  • `on_exception`: If provided, called with `exception` as an argument when a message could not be handled due to an exception. (Connection errors are not reported, however.)

  • `logger`: Optional logger. Defaults to stderr. Pass nil to disable.

  • `managed_acking`: If true, ack/nack handling is automatic; every message is automatically acked unless the handler returns false or the handler raises an exception, in which case it's nacked. If false, the handler must do the ack/nacking. Defaults to true.

  • `prefetch`: If specified, sets channel's prefetch count.

The handler must implement `call(payload, extra)`, where the payload is the message payload, and the extra argument contains message metadata as a hash. If the handler returns false, it is considered rejected, and will be nacked. Otherwise, the message with be acked.

# File lib/pebbles/river/worker.rb, line 35
def initialize(handler, options = {})
  options.assert_valid_keys(
    :queue,
    :logger,
    :on_exception,
    :managed_acking,
    :prefetch)

  unless handler.respond_to?(:call)
    raise ArgumentError.new('Handler must implement #call protocool')
  end

  @queue_options = (options[:queue] || {}).freeze
  if options[:managed_acking] != nil
    @managed_acking = !!options.fetch(:managed_acking, true)
  else
    @managed_acking = true
  end
  @dead_lettered = !!@queue_options[:dead_letter_routing_key]
  @on_exception = options[:on_exception] || ->(*args) { }
  @handler = handler
  @river = River.new(options.slice(:prefetch))
  @next_event_time = Time.now
  @rate_limiter = RateLimiter.new(1.0, 10)
  @logger = options.fetch(:logger, Logger.new($stderr))
end
run(handler, options = {}) click to toggle source
# File lib/pebbles/river/worker.rb, line 8
def run(handler, options = {})
  Worker.new(handler, options).run
end

Public Instance Methods

enabled?() click to toggle source

Are we enabled?

# File lib/pebbles/river/worker.rb, line 104
def enabled?
  @enabled
end
run() click to toggle source

Runs the handler. This will process the queue indefinitely.

# File lib/pebbles/river/worker.rb, line 91
def run
  @enabled = true
  while enabled? do
    run_once
  end
end
run_once() click to toggle source

Runs the handler once.

# File lib/pebbles/river/worker.rb, line 63
def run_once
  with_exceptions do
    now = Time.now

    if @next_event_time > now
      sleep(@next_event_time - now)
      now = Time.now
    end

    if should_run?
      if process_next
        @next_event_time = now
      else
        if @handler.respond_to?(:on_idle)
          with_exceptions do
            @handler.on_idle
          end
        end
        @next_event_time = now + 1
      end
    else
      @next_event_time = now + 5
    end
  end
  nil
end
stop() click to toggle source

Stops any concurrent run.

# File lib/pebbles/river/worker.rb, line 99
def stop
  @enabled = false
end

Private Instance Methods

ignore_exceptions() { || ... } click to toggle source
# File lib/pebbles/river/worker.rb, line 203
def ignore_exceptions(&block)
  yield
rescue => e
  if @logger
    @logger.warn("Ignoring exception (#{e.class}): #{e}")
  end
end
process_message(delivery_info, properties, content) click to toggle source
# File lib/pebbles/river/worker.rb, line 136
def process_message(delivery_info, properties, content)
  begin
    message = Message.new(content, delivery_info, queue)
  rescue InvalidPayloadError => e
    if @logger
      @logger.error("Invalid payload, ignoring message: #{e}")
      reject(delivery_info, requeue: false)
    end
  rescue => e
    ignore_exceptions do
      reject(delivery_info)
    end
    raise e
  else
    begin
      result = @handler.call(message)
    rescue Bunny::Exception
      raise
    rescue => e
      if @managed_acking
        ignore_exceptions do
          reject(delivery_info)
        end
      end
      raise e
    else
      if @managed_acking
        case result
          when false
            reject(delivery_info)
          else
            message.ack
        end
      end
    end
  end
end
process_next() click to toggle source
# File lib/pebbles/river/worker.rb, line 123
def process_next
  with_exceptions do
    queue.pop(manual_ack: true) do |delivery_info, properties, content|
      if delivery_info
        process_message(delivery_info, properties, content)
        return true
      else
        return false
      end
    end
  end
end
queue() click to toggle source
# File lib/pebbles/river/worker.rb, line 118
def queue
  @river.connect unless @river.connected?
  return @queue ||= @river.queue(@queue_options)
end
reject(delivery_info, requeue: nil) click to toggle source
# File lib/pebbles/river/worker.rb, line 174
def reject(delivery_info, requeue: nil)
  if requeue.nil?
    # Normally requeue, except if we are dead-lettering to another queue, where
    # requeue = false means to bounce it to DLX.
    requeue = !@dead_lettered
  end
  queue.channel.reject(delivery_info.delivery_tag.to_i, requeue)
end
should_run?() click to toggle source
# File lib/pebbles/river/worker.rb, line 110
def should_run?
  if @handler.respond_to?(:should_run?)
    @handler.should_run?
  else
    true
  end
end
with_exceptions() { || ... } click to toggle source
# File lib/pebbles/river/worker.rb, line 183
def with_exceptions(&block)
  begin
    yield
  rescue Bunny::Exception
    raise
  rescue Timeout::Error
    if @logger
      @logger.error("Timeout polling for messages (ignoring)")
    end
  rescue => e
    if @logger
      @logger.error("Exception (#{e.class}) while handling message: #{e}")
    end
    @rate_limiter.increment
    ignore_exceptions do
      @on_exception.call(e)
    end
  end
end