class Arsenicum::Queue
Attributes
broker[R]
name[R]
router[R]
worker_count[R]
Public Class Methods
new(name, options)
click to toggle source
# File lib/arsenicum/queue.rb, line 8 def initialize(name, options) @name = name @worker_count = options.delete(:worker_count) @router = build_router options.delete(:router_class) @broker = Arsenicum::Core::Broker.new worker_count: worker_count, router: router end
Public Instance Methods
handle_failure(e, original_message)
click to toggle source
# File lib/arsenicum/queue.rb, line 50 def handle_failure(e, original_message) #TODO implement correctly in your derived classes. end
handle_success(original_message)
click to toggle source
# File lib/arsenicum/queue.rb, line 46 def handle_success(original_message) #TODO implement correctly in your derived classes. end
register(task)
click to toggle source
# File lib/arsenicum/queue.rb, line 42 def register(task) broker[task.id] = task end
start()
click to toggle source
# File lib/arsenicum/queue.rb, line 15 def start Arsenicum::Logger.info{"[queue]Queue #{name} is now starting"} broker.run Arsenicum::Logger.info{"[queue]Queue #{name} start-up completed"} loop do begin (message, original_message) = pick rescue => e handle_failure e, original_message next end unless message sleep(0.5) next end Arsenicum::Logger.info{"Queue picked. message: #{message.inspect}"} broker.delegate message, -> { handle_success(original_message) }, -> e { handle_failure(e, original_message) } end end
start_async()
click to toggle source
# File lib/arsenicum/queue.rb, line 54 def start_async Thread.new{start} end
stop()
click to toggle source
# File lib/arsenicum/queue.rb, line 38 def stop broker.stop end
Private Instance Methods
build_router(router_class)
click to toggle source
# File lib/arsenicum/queue.rb, line 59 def build_router(router_class) return unless router_class router_class.new self end