class EmWorkerPool

Constants

OPTIONS_DEFAULT

Constants ============================================================

Public Class Methods

new(options = nil) click to toggle source

Creates a new instance of Pool with an optional set of options:

  • :worker_class - The class of worker to spawn when tasks arrive.

  • :workers_min - The minimum number of workers to have running.

  • :workers_max - The maximum number of workers to spawn.

  • :count_per_worker - Ratio of items in queue to workers.

  • :args - An array of arguments to be passed through to the workers.

# File lib/em-worker-pool.rb, line 30
def initialize(options = nil)
  @queue = Queue.new
  @workers = [ ]

  options = options ? OPTIONS_DEFAULT.merge(options) : OPTIONS_DEFAULT

  @worker_class = options[:worker_class]
  @workers_min = options[:workers_min]
  @workers_max = options[:workers_max]
  @count_per_worker = options[:count_per_worker]
  @args = options[:args]

  @workers_min.times do
    self.worker_create!
  end
end

Public Instance Methods

block_pop() click to toggle source

Makes a blocking call to pop an item from the queue, returning that item. If the queue is empty, also has the effect of sleeping the calling thread until something is pushed into the queue.

# File lib/em-worker-pool.rb, line 68
def block_pop
  @queue.pop
end
busy?() click to toggle source

Returns true if there is some outstanding work to be performed, false otherwise.

# File lib/em-worker-pool.rb, line 99
def busy?
  @queue.num_waiting < @workers.length
end
perform() click to toggle source

Schedules a block to be acted upon by the workers.

# File lib/em-worker-pool.rb, line 126
def perform
  @queue << Proc.new

  if (self.workers_count < self.workers_needed)
    self.worker_create!
  end

  true
end
queue?() click to toggle source

Returns true if anything is queued, false otherwise. Note that this does not count anything that might be active within a worker.

# File lib/em-worker-pool.rb, line 110
def queue?
  @queue.length > 0
end
queue_size() click to toggle source

Returns the number of items in the queue. Note that this does not count anything that might be active within a worker.

# File lib/em-worker-pool.rb, line 116
def queue_size
  @queue.size
end
report_exception!(worker, exception, block = nil) click to toggle source

Receives reports of exceptions from workers. Default behavior is to re-raise.

# File lib/em-worker-pool.rb, line 121
def report_exception!(worker, exception, block = nil)
  raise(exception)
end
waiting() click to toggle source

Returns the number of workers that are waiting for something to do.

# File lib/em-worker-pool.rb, line 104
def waiting
  @queue.num_waiting
end
worker_finished!(worker) click to toggle source

Called by a worker when it's finished. Should not be called otherwise.

# File lib/em-worker-pool.rb, line 137
def worker_finished!(worker)
  @workers.delete(worker)
  worker.join
end
worker_needed?(worker) click to toggle source
# File lib/em-worker-pool.rb, line 78
def worker_needed?(worker)
  @queue.length > 0 or @workers.length <= self.workers_needed
end
workers() click to toggle source

Returns an array of the current workers.

# File lib/em-worker-pool.rb, line 83
def workers
  @workers.dup
end
workers?() click to toggle source

Returns true if there are any workers, false otherwise.

# File lib/em-worker-pool.rb, line 93
def workers?
  @workers.any?
end
workers_count() click to toggle source

Returns the number of active worker threads.

# File lib/em-worker-pool.rb, line 48
def workers_count
  @workers.length
end
workers_needed() click to toggle source

Returns the number of workers required for the current loading.

# File lib/em-worker-pool.rb, line 53
def workers_needed
  n = ((@queue.length + @workers.length - @queue.num_waiting) / @count_per_worker)

  if (n > @workers_max)
    @workers_max
  elsif (n < @workers_min)
    @workers_min
  else
    n
  end  
end
workers_needed?() click to toggle source

Returns true if more workers are needed to satisfy the current backlog, or false otherwise.

# File lib/em-worker-pool.rb, line 74
def workers_needed?
  @workers.length < self.workers_needed
end

Protected Instance Methods

worker_create!() click to toggle source

Creates a new worker and puts it into the list of available workers.

# File lib/em-worker-pool.rb, line 144
def worker_create!
  @workers << worker_class.new(self, *@args)
end