class Protobuf::Nats::ThreadPool

Public Class Methods

new(size, opts = {}) click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 5
def initialize(size, opts = {})
  @queue = ::Queue.new
  @active_work = 0

  # Callbacks
  @error_cb = lambda {|_error|}

  # Synchronization
  @mutex = ::Mutex.new
  @cb_mutex = ::Mutex.new

  # Let's get this party started
  queue_size = opts[:max_queue].to_i || 0
  @max_size = size + queue_size
  @max_workers = size
  @shutting_down = false
  @workers = []
  supervise_workers
end

Public Instance Methods

full?() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 25
def full?
  @active_work >= @max_size
end
kill() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 47
def kill
  @shutting_down = true
  @workers.map(&:kill)
end
on_error(&cb) click to toggle source

This callback is executed in a thread safe manner.

# File lib/protobuf/nats/thread_pool.rb, line 65
def on_error(&cb)
  @error_cb = cb
end
push(&work_cb) click to toggle source

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.

# File lib/protobuf/nats/thread_pool.rb, line 31
def push(&work_cb)
  return false if full?
  return false if @shutting_down
  @queue << [:work, work_cb]
  @mutex.synchronize { @active_work += 1 }
  supervise_workers
  true
end
shutdown() click to toggle source

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.

# File lib/protobuf/nats/thread_pool.rb, line 42
def shutdown
  @shutting_down = true
  @max_workers.times { @queue << [:stop, nil] }
end
size() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 69
def size
  @active_work
end
wait_for_termination(seconds = nil) click to toggle source

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.

# File lib/protobuf/nats/thread_pool.rb, line 54
def wait_for_termination(seconds = nil)
  started_at = ::Time.now
  loop do
    sleep 0.1
    break if seconds && (::Time.now - started_at) >= seconds
    break if @workers.empty?
    prune_dead_workers
  end
end

Private Instance Methods

prune_dead_workers() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 75
def prune_dead_workers
  @workers = @workers.select(&:alive?)
end
spawn_worker() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 87
def spawn_worker
  ::Thread.new do
    loop do
      type, cb = @queue.pop
      begin
        # Break if we're shutting down
        break if type == :stop
        # Perform work
        cb.call
        # Update stats
      rescue => error
        @cb_mutex.synchronize { @error_cb.call(error) }
      ensure
        @mutex.synchronize { @active_work -= 1 }
      end
    end
  end
end
supervise_workers() click to toggle source
# File lib/protobuf/nats/thread_pool.rb, line 79
def supervise_workers
  prune_dead_workers
  missing_worker_count = (@max_workers - @workers.size)
  missing_worker_count.times do
    @workers << spawn_worker
  end
end