class Eventboss::Launcher
Launcher
manages lifecycle of queues and pollers threads
Constants
- DEFAULT_SHUTDOWN_ATTEMPTS
- DEFAULT_SHUTDOWN_DELAY
Public Class Methods
new(queues, client, options = {})
click to toggle source
# File lib/eventboss/launcher.rb, line 9 def initialize(queues, client, options = {}) @options = options @queues = queues @client = client @lock = Mutex.new @bus = SizedQueue.new(@queues.size * 10) @pollers = Set.new @queues.each { |q, listener| @pollers << new_poller(q, listener) } @workers = Set.new worker_count.times { |id| @workers << new_worker(id) } end
Public Instance Methods
hard_shutdown()
click to toggle source
# File lib/eventboss/launcher.rb, line 43 def hard_shutdown return if @pollers.empty? && @workers.empty? logger.info('launcher') { "Killing remaining #{@pollers.size} pollers, #{@workers.size} workers" } @pollers.each(&:kill) @workers.each(&:kill) end
poller_stopped(poller, restart: false)
click to toggle source
# File lib/eventboss/launcher.rb, line 59 def poller_stopped(poller, restart: false) @lock.synchronize do @pollers.delete(poller) @pollers << new_poller(poller.queue, poller.listener).tap(&:start) if restart end logger.debug('launcher') { "Poller #{poller.id} stopped, restart: #{restart}" } end
start()
click to toggle source
# File lib/eventboss/launcher.rb, line 24 def start logger.info('launcher') { "Starting #{@workers.size} workers, #{@pollers.size} pollers" } @pollers.each(&:start) @workers.each(&:start) end
stop()
click to toggle source
# File lib/eventboss/launcher.rb, line 31 def stop logger.info('launcher') { 'Gracefully shutdown' } @bus.clear @pollers.each(&:terminate) @workers.each(&:terminate) @bus.close wait_for_shutdown hard_shutdown end
worker_stopped(worker, restart: false)
click to toggle source
# File lib/eventboss/launcher.rb, line 51 def worker_stopped(worker, restart: false) @lock.synchronize do @workers.delete(worker) @workers << new_worker(worker.id).tap(&:start) if restart end logger.debug('launcher') { "Worker #{worker.id} stopped, restart: #{restart}" } end
Private Instance Methods
new_poller(queue, listener)
click to toggle source
# File lib/eventboss/launcher.rb, line 77 def new_poller(queue, listener) LongPoller.new(self, @bus, @client, queue, listener) end
new_worker(id)
click to toggle source
# File lib/eventboss/launcher.rb, line 73 def new_worker(id) Worker.new(self, "worker-#{id}", @bus) end
shutdown_attempts()
click to toggle source
# File lib/eventboss/launcher.rb, line 90 def shutdown_attempts Integer(@options[:shutdown_attempts] || DEFAULT_SHUTDOWN_ATTEMPTS) end
shutdown_delay()
click to toggle source
# File lib/eventboss/launcher.rb, line 94 def shutdown_delay Integer(@options[:shutdown_delay] || DEFAULT_SHUTDOWN_DELAY) end
wait_for_shutdown()
click to toggle source
# File lib/eventboss/launcher.rb, line 81 def wait_for_shutdown attempts = 0 while @pollers.any? || @workers.any? break if (attempts += 1) > shutdown_attempts sleep shutdown_delay logger.info('launcher') { "Waiting for #{@pollers.size} pollers, #{@workers.size} workers" } end end
worker_count()
click to toggle source
# File lib/eventboss/launcher.rb, line 69 def worker_count @options.fetch(:worker_count, 2) end