class EmWorkerPool
Constants
- OPTIONS_DEFAULT
Constants ============================================================¶ ↑
Public Class Methods
Creates a new instance of Pool with an optional set of options:
-
:worker_class - The class of worker to spawn when tasks arrive.
-
:workers_min - The minimum number of workers to have running.
-
:workers_max - The maximum number of workers to spawn.
-
:count_per_worker - Ratio of items in queue to workers.
-
:args - An array of arguments to be passed through to the workers.
# File lib/em-worker-pool.rb, line 30 def initialize(options = nil) @queue = Queue.new @workers = [ ] options = options ? OPTIONS_DEFAULT.merge(options) : OPTIONS_DEFAULT @worker_class = options[:worker_class] @workers_min = options[:workers_min] @workers_max = options[:workers_max] @count_per_worker = options[:count_per_worker] @args = options[:args] @workers_min.times do self.worker_create! end end
Public Instance Methods
Makes a blocking call to pop an item from the queue, returning that item. If the queue is empty, also has the effect of sleeping the calling thread until something is pushed into the queue.
# File lib/em-worker-pool.rb, line 68 def block_pop @queue.pop end
Returns true if there is some outstanding work to be performed, false otherwise.
# File lib/em-worker-pool.rb, line 99 def busy? @queue.num_waiting < @workers.length end
Schedules a block to be acted upon by the workers.
# File lib/em-worker-pool.rb, line 126 def perform @queue << Proc.new if (self.workers_count < self.workers_needed) self.worker_create! end true end
Returns true if anything is queued, false otherwise. Note that this does not count anything that might be active within a worker.
# File lib/em-worker-pool.rb, line 110 def queue? @queue.length > 0 end
Returns the number of items in the queue. Note that this does not count anything that might be active within a worker.
# File lib/em-worker-pool.rb, line 116 def queue_size @queue.size end
Receives reports of exceptions from workers. Default behavior is to re-raise.
# File lib/em-worker-pool.rb, line 121 def report_exception!(worker, exception, block = nil) raise(exception) end
Returns the number of workers that are waiting for something to do.
# File lib/em-worker-pool.rb, line 104 def waiting @queue.num_waiting end
Called by a worker when it's finished. Should not be called otherwise.
# File lib/em-worker-pool.rb, line 137 def worker_finished!(worker) @workers.delete(worker) worker.join end
# File lib/em-worker-pool.rb, line 78 def worker_needed?(worker) @queue.length > 0 or @workers.length <= self.workers_needed end
Returns an array of the current workers.
# File lib/em-worker-pool.rb, line 83 def workers @workers.dup end
Returns true if there are any workers, false otherwise.
# File lib/em-worker-pool.rb, line 93 def workers? @workers.any? end
Returns the number of active worker threads.
# File lib/em-worker-pool.rb, line 48 def workers_count @workers.length end
Returns the number of workers required for the current loading.
# File lib/em-worker-pool.rb, line 53 def workers_needed n = ((@queue.length + @workers.length - @queue.num_waiting) / @count_per_worker) if (n > @workers_max) @workers_max elsif (n < @workers_min) @workers_min else n end end
Returns true if more workers are needed to satisfy the current backlog, or false otherwise.
# File lib/em-worker-pool.rb, line 74 def workers_needed? @workers.length < self.workers_needed end
Protected Instance Methods
Creates a new worker and puts it into the list of available workers.
# File lib/em-worker-pool.rb, line 144 def worker_create! @workers << worker_class.new(self, *@args) end