class Arsenicum::Queue

Attributes

broker[R]
name[R]
router[R]
worker_count[R]

Public Class Methods

new(name, options) click to toggle source
# File lib/arsenicum/queue.rb, line 8
def initialize(name, options)
  @name         = name
  @worker_count = options.delete(:worker_count)
  @router       = build_router options.delete(:router_class)
  @broker       = Arsenicum::Core::Broker.new worker_count: worker_count, router: router
end

Public Instance Methods

handle_failure(e, original_message) click to toggle source
# File lib/arsenicum/queue.rb, line 50
def handle_failure(e, original_message)
  #TODO implement correctly in your derived classes.
end
handle_success(original_message) click to toggle source
# File lib/arsenicum/queue.rb, line 46
def handle_success(original_message)
  #TODO implement correctly in your derived classes.
end
register(task) click to toggle source
# File lib/arsenicum/queue.rb, line 42
def register(task)
  broker[task.id] = task
end
start() click to toggle source
# File lib/arsenicum/queue.rb, line 15
def start
  Arsenicum::Logger.info{"[queue]Queue #{name} is now starting"}
  broker.run
  Arsenicum::Logger.info{"[queue]Queue #{name} start-up completed"}

  loop do
    begin
      (message, original_message) = pick
    rescue => e
      handle_failure e, original_message
      next
    end

    unless message
      sleep(0.5)
      next
    end

    Arsenicum::Logger.info{"Queue picked. message: #{message.inspect}"}
    broker.delegate message, -> { handle_success(original_message) }, -> e { handle_failure(e, original_message) }
  end
end
start_async() click to toggle source
# File lib/arsenicum/queue.rb, line 54
def start_async
  Thread.new{start}
end
stop() click to toggle source
# File lib/arsenicum/queue.rb, line 38
def stop
  broker.stop
end

Private Instance Methods

build_router(router_class) click to toggle source
# File lib/arsenicum/queue.rb, line 59
def build_router(router_class)
  return unless router_class
  router_class.new self
end