class Plux::Reactor

Public Class Methods

new(count, worker) click to toggle source
# File lib/plux/reactor.rb, line 3
def initialize(count, worker)
  @worker = worker
  @msg_q = Queue.new
  @count = count

  @nio = NIO::Selector.new
  @newly_accepted = Queue.new
  @closed = []

  receive
  process
end

Public Instance Methods

register(socket) click to toggle source
# File lib/plux/reactor.rb, line 16
def register(socket)
  @newly_accepted << socket
  @nio.wakeup
end

Private Instance Methods

process() click to toggle source
# File lib/plux/reactor.rb, line 42
def process
  @count.times.each do
    Thread.new do
      loop{ @worker.process(@msg_q.deq) }
    end
  end
end
receive() click to toggle source
# File lib/plux/reactor.rb, line 23
def receive
  Thread.new do
    loop do
      @closed.size.times{ @nio.deregister(@closed.pop) }

      @newly_accepted.size.times do
        socket = @newly_accepted.pop
        mon = @nio.register(socket, :r)
        mon.value = Worker.new(socket, @msg_q)
      end

      @nio.select do |m|
        next if m.value.process
        @closed << m.io
      end
    end
  end
end