class Arsenicum::Core::Broker

Constants

PROCESSOR_COUNT_DEFAULT

Attributes

available_workers[R]
default_task[RW]
mutex[R]
router[R]
tasks[R]
worker_count[R]
worker_options[R]
workers[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/arsenicum/core/broker.rb, line 12
def initialize(options = {})
  @worker_count = (options.delete(:worker_count) || PROCESSOR_COUNT_DEFAULT).to_i
  @tasks = {}
  @router = options.delete :router

  serializer = options[:serializer]  ||  Arsenicum::Serializer::JSON.new
  formatter  = options[:formatter]   ||  Arsenicum::Formatter.new
  @worker_options = options.delete(:worker_options) || {}
  @worker_options.merge! serializer: serializer,  formatter: formatter
  @current_worker_index = -1 # because it is incremented whenever used. (primary index should be ZERO)
  @mutex = Mutex.new
end

Public Instance Methods

[](task_id) click to toggle source
# File lib/arsenicum/core/broker.rb, line 25
def [](task_id)
  tasks[task_id.to_sym] || default_task
end
[]=(task_id, task) click to toggle source
# File lib/arsenicum/core/broker.rb, line 29
def []=(task_id, task)
  tasks[task_id.to_sym] = task
end
Also aliased as: register
broker(success_handler, failure_handler, task_id, *parameters) click to toggle source
# File lib/arsenicum/core/broker.rb, line 42
def broker(success_handler, failure_handler, task_id, *parameters)
  worker = loop do
    w = next_worker
    break w if w

    sleep 0.5
  end

  Arsenicum::Logger.info { "[broker][Task brokering]id=#{task_id}, params=#{parameters.inspect}" }
  worker.ask_async success_handler, failure_handler, task_id, *parameters
end
delegate(message, success_handler, failure_handler) click to toggle source
# File lib/arsenicum/core/broker.rb, line 54
def delegate(message, success_handler, failure_handler)
  (task_id, parameters) = router.route(message)
  broker success_handler, failure_handler, task_id, parameters
end
get_back_worker(worker) click to toggle source
# File lib/arsenicum/core/broker.rb, line 77
def get_back_worker(worker)
  if worker.active?
    available_workers << worker
  else
    remove worker
    worker.stop
    prepare_worker
  end
end
register(task_id, task)
Alias for: []=
reload() click to toggle source
# File lib/arsenicum/core/broker.rb, line 68
def reload
  stop

  available_workers.clear
  workers.clear

  prepare_workers
end
remove(worker) click to toggle source
# File lib/arsenicum/core/broker.rb, line 63
def remove(worker)
  available_workers.delete(worker)
  workers.delete(worker)
end
run() click to toggle source
# File lib/arsenicum/core/broker.rb, line 35
def run
  @workers = []
  @available_workers = []

  prepare_workers
end
stop() click to toggle source
# File lib/arsenicum/core/broker.rb, line 59
def stop
  workers.each(&:stop)
end

Private Instance Methods

next_worker() click to toggle source
# File lib/arsenicum/core/broker.rb, line 108
def next_worker
  mutex.synchronize{available_workers.shift}
end
next_worker_index() click to toggle source
# File lib/arsenicum/core/broker.rb, line 97
def next_worker_index
  mutex.synchronize{
    @current_worker_index += 1
  }
end
prepare_worker() click to toggle source
# File lib/arsenicum/core/broker.rb, line 92
def prepare_worker
  worker = Arsenicum::Core::Worker.new(self, next_worker_index, worker_options)
  stock(worker)
end
prepare_workers() click to toggle source
# File lib/arsenicum/core/broker.rb, line 88
def prepare_workers
  @worker_count.times{prepare_worker}
end
serialize(value = {}) click to toggle source
# File lib/arsenicum/core/broker.rb, line 112
def serialize(value = {})
  serializer.serialize value
end
stock(worker) click to toggle source
# File lib/arsenicum/core/broker.rb, line 103
def stock(worker)
  workers << worker
  available_workers << worker
end