class Protobuf::Nats::ThreadPool
Public Class Methods
new(size, opts = {})
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 5 def initialize(size, opts = {}) @queue = ::Queue.new @active_work = 0 # Callbacks @error_cb = lambda {|_error|} # Synchronization @mutex = ::Mutex.new @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = false @workers = [] supervise_workers end
Public Instance Methods
full?()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 25 def full? @active_work >= @max_size end
kill()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 47 def kill @shutting_down = true @workers.map(&:kill) end
on_error(&cb)
click to toggle source
This callback is executed in a thread safe manner.
# File lib/protobuf/nats/thread_pool.rb, line 65 def on_error(&cb) @error_cb = cb end
push(&work_cb)
click to toggle source
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
# File lib/protobuf/nats/thread_pool.rb, line 31 def push(&work_cb) return false if full? return false if @shutting_down @queue << [:work, work_cb] @mutex.synchronize { @active_work += 1 } supervise_workers true end
shutdown()
click to toggle source
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
# File lib/protobuf/nats/thread_pool.rb, line 42 def shutdown @shutting_down = true @max_workers.times { @queue << [:stop, nil] } end
size()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 69 def size @active_work end
wait_for_termination(seconds = nil)
click to toggle source
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
# File lib/protobuf/nats/thread_pool.rb, line 54 def wait_for_termination(seconds = nil) started_at = ::Time.now loop do sleep 0.1 break if seconds && (::Time.now - started_at) >= seconds break if @workers.empty? prune_dead_workers end end
Private Instance Methods
prune_dead_workers()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 75 def prune_dead_workers @workers = @workers.select(&:alive?) end
spawn_worker()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 87 def spawn_worker ::Thread.new do loop do type, cb = @queue.pop begin # Break if we're shutting down break if type == :stop # Perform work cb.call # Update stats rescue => error @cb_mutex.synchronize { @error_cb.call(error) } ensure @mutex.synchronize { @active_work -= 1 } end end end end
supervise_workers()
click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 79 def supervise_workers prune_dead_workers missing_worker_count = (@max_workers - @workers.size) missing_worker_count.times do @workers << spawn_worker end end