class Pebbles::River::Supervisor
A simple supervisor which runs workers in preforked pools of child processes. If a worker dies, a new child process is created.
Public Class Methods
new(name, options = {})
click to toggle source
Calls superclass method
# File lib/pebbles/river/supervisor.rb, line 10 def initialize(name, options = {}) super(name, {interval: 5}.merge(options.slice(:logger, :pid_file))) options.assert_valid_keys(:logger, :pid_file, :worker_count, :worker) @worker_count = options[:worker_count] || 1 @prefork_pools = [] @worker_modules = [] @recovering = true end
Public Instance Methods
add_listener(listener, queue_spec, worker_options = {})
click to toggle source
Add a listener. The listener must support the `#call(message)` method. The queue specification contains the parameters naming the queue and so on; see `Pebbles::River::River#queue`. The worker options are the same as those used by `Pebbles::River::Worker.new`, plus:
-
`worker_count`: Number of parallel workers to run. Defaults to the global setting.
# File lib/pebbles/river/supervisor.rb, line 44 def add_listener(listener, queue_spec, worker_options = {}) worker_options = worker_options.dup worker_count = worker_options.delete(:worker_count) || @worker_count worker_options = { logger: @logger, on_exception: ->(e) { if logger.respond_to?(:exception) logger.exception(e) else logger.error("Exception #{e.class}: #{e} #{e.backtrace.join("\n")}") end } }.merge(worker_options) worker_options[:queue] = queue_spec worker = Pebbles::River::Worker.new(listener, worker_options) runner = ->() { with_exception_logging do worker.run end } name = queue_spec[:name] process_name = "#{@name}: queue worker: #{name}" logger = @logger @worker_modules.push([name, worker_count, Module.new { define_method :execute do $0 = process_name trap('TERM') do logger.info "[#{name}] Worker received TERM, stopping" worker.stop exit(0) end runner.call end }]) end
after_stopping()
click to toggle source
From Servolux::Server
# File lib/pebbles/river/supervisor.rb, line 97 def after_stopping shutdown_workers end
before_starting()
click to toggle source
From Servolux::Server
# File lib/pebbles/river/supervisor.rb, line 86 def before_starting $0 = "#{self.name}: master" logger.info "Starting workers" @prefork_pools.each do |name, prefork| logger.info "[#{name}] Starting workers" prefork.ensure_worker_pool_size end end
run()
click to toggle source
From Servolux::Server
# File lib/pebbles/river/supervisor.rb, line 107 def run with_exception_logging do ensure_workers end sleep 1 end
start_workers()
click to toggle source
# File lib/pebbles/river/supervisor.rb, line 21 def start_workers if @worker_modules.empty? raise ConfigurationError.new("No listeners configured") end @worker_modules.each do |name, min_worker_count, m| if min_worker_count > 0 prefork = Servolux::Prefork.new(min_workers: min_worker_count, module: m) @prefork_pools.push([name, prefork]) else logger.info "[#{name}] Workers disabled" end end end
usr2()
click to toggle source
From Servolux::Server
# File lib/pebbles/river/supervisor.rb, line 102 def usr2 shutdown_workers end
Private Instance Methods
ensure_workers()
click to toggle source
# File lib/pebbles/river/supervisor.rb, line 116 def ensure_workers complete = true @prefork_pools.each do |name, prefork| if prefork.below_minimum_workers? complete = false else had_workers = true end prefork.prune_workers if had_workers and prefork.below_minimum_workers? logger.error "[#{name}] One or more worker died" end while prefork.below_minimum_workers? do @recovering = true logger.info "[#{name}] Too few workers (" \ "#{prefork.live_worker_count} alive, #{prefork.dead_worker_count} dead), spawning another" prefork.add_workers(1) end end if @recovering and complete @recovering = false logger.info "All workers up" end end
shutdown_workers()
click to toggle source
# File lib/pebbles/river/supervisor.rb, line 145 def shutdown_workers logger.info "Telling all workers to shut down" @prefork_pools.each do |name, prefox| prefox.stop end last_logged_time = Time.now loop do count = @prefork_pools.inject(0) { |sum, (name, prefork)| sum + prefork.live_worker_count } break if count == 0 if Time.now - last_logged_time > 5 logger.info "Still waiting for #{count} workers to quit..." last_logged_time = Time.now end sleep 0.25 end end
with_exception_logging() { || ... }
click to toggle source
# File lib/pebbles/river/supervisor.rb, line 165 def with_exception_logging(&block) yield rescue => e if logger.respond_to? :exception logger.exception(e) else logger.error(e.inspect) logger.error(e.backtrace.join("\n")) end raise end