class Pebbles::River::Supervisor

A simple supervisor which runs workers in preforked pools of child processes. If a worker dies, a new child process is created.

Public Class Methods

new(name, options = {}) click to toggle source
Calls superclass method
# File lib/pebbles/river/supervisor.rb, line 10
def initialize(name, options = {})
  super(name, {interval: 5}.merge(options.slice(:logger, :pid_file)))

  options.assert_valid_keys(:logger, :pid_file, :worker_count, :worker)

  @worker_count = options[:worker_count] || 1
  @prefork_pools = []
  @worker_modules = []
  @recovering = true
end

Public Instance Methods

add_listener(listener, queue_spec, worker_options = {}) click to toggle source

Add a listener. The listener must support the `#call(message)` method. The queue specification contains the parameters naming the queue and so on; see `Pebbles::River::River#queue`. The worker options are the same as those used by `Pebbles::River::Worker.new`, plus:

  • `worker_count`: Number of parallel workers to run. Defaults to the global setting.

# File lib/pebbles/river/supervisor.rb, line 44
def add_listener(listener, queue_spec, worker_options = {})
  worker_options = worker_options.dup

  worker_count = worker_options.delete(:worker_count) || @worker_count

  worker_options = {
    logger: @logger,
    on_exception: ->(e) {
      if logger.respond_to?(:exception)
        logger.exception(e)
      else
        logger.error("Exception #{e.class}: #{e} #{e.backtrace.join("\n")}")
      end
    }
  }.merge(worker_options)
  worker_options[:queue] = queue_spec

  worker = Pebbles::River::Worker.new(listener, worker_options)

  runner = ->() {
    with_exception_logging do
      worker.run
    end
  }

  name = queue_spec[:name]
  process_name = "#{@name}: queue worker: #{name}"
  logger = @logger
  @worker_modules.push([name, worker_count, Module.new {
    define_method :execute do
      $0 = process_name
      trap('TERM') do
        logger.info "[#{name}] Worker received TERM, stopping"
        worker.stop
        exit(0)
      end
      runner.call
    end
  }])
end
after_stopping() click to toggle source

From Servolux::Server

# File lib/pebbles/river/supervisor.rb, line 97
def after_stopping
  shutdown_workers
end
before_starting() click to toggle source

From Servolux::Server

# File lib/pebbles/river/supervisor.rb, line 86
def before_starting
  $0 = "#{self.name}: master"

  logger.info "Starting workers"
  @prefork_pools.each do |name, prefork|
    logger.info "[#{name}] Starting workers"
    prefork.ensure_worker_pool_size
  end
end
run() click to toggle source

From Servolux::Server

# File lib/pebbles/river/supervisor.rb, line 107
def run
  with_exception_logging do
    ensure_workers
  end
  sleep 1
end
start_workers() click to toggle source
# File lib/pebbles/river/supervisor.rb, line 21
def start_workers
  if @worker_modules.empty?
    raise ConfigurationError.new("No listeners configured")
  end

  @worker_modules.each do |name, min_worker_count, m|
    if min_worker_count > 0
      prefork = Servolux::Prefork.new(min_workers: min_worker_count, module: m)
      @prefork_pools.push([name, prefork])
    else
      logger.info "[#{name}] Workers disabled"
    end
  end
end
usr2() click to toggle source

From Servolux::Server

# File lib/pebbles/river/supervisor.rb, line 102
def usr2
  shutdown_workers
end

Private Instance Methods

ensure_workers() click to toggle source
# File lib/pebbles/river/supervisor.rb, line 116
def ensure_workers
  complete = true
  @prefork_pools.each do |name, prefork|
    if prefork.below_minimum_workers?
      complete = false
    else
      had_workers = true
    end

    prefork.prune_workers

    if had_workers and prefork.below_minimum_workers?
      logger.error "[#{name}] One or more worker died"
    end

    while prefork.below_minimum_workers? do
      @recovering = true
      logger.info "[#{name}] Too few workers (" \
        "#{prefork.live_worker_count} alive, #{prefork.dead_worker_count} dead), spawning another"
      prefork.add_workers(1)
    end
  end

  if @recovering and complete
    @recovering = false
    logger.info "All workers up"
  end
end
shutdown_workers() click to toggle source
# File lib/pebbles/river/supervisor.rb, line 145
def shutdown_workers
  logger.info "Telling all workers to shut down"
  @prefork_pools.each do |name, prefox|
    prefox.stop
  end

  last_logged_time = Time.now
  loop do
    count = @prefork_pools.inject(0) { |sum, (name, prefork)| sum + prefork.live_worker_count }
    break if count == 0

    if Time.now - last_logged_time > 5
      logger.info "Still waiting for #{count} workers to quit..."
      last_logged_time = Time.now
    end

    sleep 0.25
  end
end
with_exception_logging() { || ... } click to toggle source
# File lib/pebbles/river/supervisor.rb, line 165
def with_exception_logging(&block)
  yield
rescue => e
  if logger.respond_to? :exception
    logger.exception(e)
  else
    logger.error(e.inspect)
    logger.error(e.backtrace.join("\n"))
  end
  raise
end