class Plux::Reactor
Public Class Methods
new(count, worker)
click to toggle source
# File lib/plux/reactor.rb, line 3 def initialize(count, worker) @worker = worker @msg_q = Queue.new @count = count @nio = NIO::Selector.new @newly_accepted = Queue.new @closed = [] receive process end
Public Instance Methods
register(socket)
click to toggle source
# File lib/plux/reactor.rb, line 16 def register(socket) @newly_accepted << socket @nio.wakeup end
Private Instance Methods
process()
click to toggle source
# File lib/plux/reactor.rb, line 42 def process @count.times.each do Thread.new do loop{ @worker.process(@msg_q.deq) } end end end
receive()
click to toggle source
# File lib/plux/reactor.rb, line 23 def receive Thread.new do loop do @closed.size.times{ @nio.deregister(@closed.pop) } @newly_accepted.size.times do socket = @newly_accepted.pop mon = @nio.register(socket, :r) mon.value = Worker.new(socket, @msg_q) end @nio.select do |m| next if m.value.process @closed << m.io end end end end