class Arsenicum::Core::Broker
Constants
- PROCESSOR_COUNT_DEFAULT
Attributes
available_workers[R]
default_task[RW]
mutex[R]
router[R]
tasks[R]
worker_count[R]
worker_options[R]
workers[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/arsenicum/core/broker.rb, line 12 def initialize(options = {}) @worker_count = (options.delete(:worker_count) || PROCESSOR_COUNT_DEFAULT).to_i @tasks = {} @router = options.delete :router serializer = options[:serializer] || Arsenicum::Serializer::JSON.new formatter = options[:formatter] || Arsenicum::Formatter.new @worker_options = options.delete(:worker_options) || {} @worker_options.merge! serializer: serializer, formatter: formatter @current_worker_index = -1 # because it is incremented whenever used. (primary index should be ZERO) @mutex = Mutex.new end
Public Instance Methods
[](task_id)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 25 def [](task_id) tasks[task_id.to_sym] || default_task end
[]=(task_id, task)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 29 def []=(task_id, task) tasks[task_id.to_sym] = task end
Also aliased as: register
broker(success_handler, failure_handler, task_id, *parameters)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 42 def broker(success_handler, failure_handler, task_id, *parameters) worker = loop do w = next_worker break w if w sleep 0.5 end Arsenicum::Logger.info { "[broker][Task brokering]id=#{task_id}, params=#{parameters.inspect}" } worker.ask_async success_handler, failure_handler, task_id, *parameters end
delegate(message, success_handler, failure_handler)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 54 def delegate(message, success_handler, failure_handler) (task_id, parameters) = router.route(message) broker success_handler, failure_handler, task_id, parameters end
get_back_worker(worker)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 77 def get_back_worker(worker) if worker.active? available_workers << worker else remove worker worker.stop prepare_worker end end
reload()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 68 def reload stop available_workers.clear workers.clear prepare_workers end
remove(worker)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 63 def remove(worker) available_workers.delete(worker) workers.delete(worker) end
run()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 35 def run @workers = [] @available_workers = [] prepare_workers end
stop()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 59 def stop workers.each(&:stop) end
Private Instance Methods
next_worker()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 108 def next_worker mutex.synchronize{available_workers.shift} end
next_worker_index()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 97 def next_worker_index mutex.synchronize{ @current_worker_index += 1 } end
prepare_worker()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 92 def prepare_worker worker = Arsenicum::Core::Worker.new(self, next_worker_index, worker_options) stock(worker) end
prepare_workers()
click to toggle source
# File lib/arsenicum/core/broker.rb, line 88 def prepare_workers @worker_count.times{prepare_worker} end
serialize(value = {})
click to toggle source
# File lib/arsenicum/core/broker.rb, line 112 def serialize(value = {}) serializer.serialize value end
stock(worker)
click to toggle source
# File lib/arsenicum/core/broker.rb, line 103 def stock(worker) workers << worker available_workers << worker end