class Evt::Bundled

Constants

COLLECT_COUNTER_MAX
MAXIMUM_TIMEOUT

Attributes

readable[R]
waiting[R]
writable[R]

Public Class Methods

new() click to toggle source
# File lib/evt/backends/bundled.rb, line 7
def initialize
  @readable = {}
  @writable = {}
  @waiting = {}
  @iovs = {}
  
  @lock = Mutex.new
  @blocking = 0
  @ready = []
  @collect_counter = 0

  init_selector
end

Public Instance Methods

block(blocker, timeout = nil) click to toggle source

Block the calling fiber. @parameter blocker [Object] What we are waiting on, informational only. @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds. @returns [Boolean] Whether the blocking operation was successful or not.

# File lib/evt/backends/bundled.rb, line 120
def block(blocker, timeout = nil)
  if timeout
    @waiting[Fiber.current] = current_time + timeout
    begin
      Fiber.yield
    ensure
      @waiting.delete(Fiber.current)
    end
  else
    @blocking += 1
    begin
      Fiber.yield
    ensure
      @blocking -= 1
    end
  end
end
close() click to toggle source

Invoked when the thread exits.

# File lib/evt/backends/bundled.rb, line 149
def close
  self.run
end
collect(force=false) click to toggle source

Collect closed streams in readables and writables

# File lib/evt/backends/bundled.rb, line 154
def collect(force=false)
  if @collect_counter < COLLECT_COUNTER_MAX and !force
    @collect_counter += 1
    return
  end

  @collect_counter = 0

  @readable.keys.each do |io|
    @readable.delete(io) if io.closed?
  end
  
  @writable.keys.each do |io|
    @writable.delete(io) if io.closed?
  end

  @iovs.keys.each do |io|
    @iovs.delete(io) if io.closed?
  end
end
current_time() click to toggle source
# File lib/evt/backends/bundled.rb, line 88
def current_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
fiber(&block) click to toggle source

Intercept the creation of a non-blocking fiber. @returns [Fiber]

# File lib/evt/backends/bundled.rb, line 177
def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end
io_wait(io, events, duration) click to toggle source

Wait for the given file descriptor to match the specified events within the specified timeout. @parameter event [Integer] A bit mask of `IO::READABLE`,

`IO::WRITABLE` and `IO::PRIORITY`.

@parameter timeout [Numeric] The amount of time to wait for the event in seconds. @returns [Integer] The subset of events that are ready.

# File lib/evt/backends/bundled.rb, line 98
def io_wait(io, events, duration)
  # TODO: IO::PRIORITY
  @readable[io] = Fiber.current unless (events & IO::READABLE).zero?
  @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero?
  self.register(io, events)
  Fiber.yield
  self.deregister(io)
  true
end
kernel_sleep(duration = nil) click to toggle source

Sleep the current task for the specified duration, or forever if not specified. @param duration [Numeric] The amount of time to sleep in seconds.

# File lib/evt/backends/bundled.rb, line 111
def kernel_sleep(duration = nil)
  self.block(:sleep, duration)
  true
end
next_timeout() click to toggle source
# File lib/evt/backends/bundled.rb, line 25
def next_timeout
  _fiber, timeout = @waiting.min_by{ |key, value| value }

  if timeout
    offset = (timeout - current_time) * 1000 # Use mililisecond
    return 0 if offset < 0
    return offset if offset < MAXIMUM_TIMEOUT
  end

  MAXIMUM_TIMEOUT
end
run() click to toggle source
# File lib/evt/backends/bundled.rb, line 37
def run
  while @readable.any? or @writable.any? or @waiting.any? or @iovs.any? or @blocking.positive?
    readable, writable, iovs = self.wait

    readable&.each do |io|
      fiber = @readable.delete(io)
      fiber&.resume
    end

    writable&.each do |io|
      fiber = @writable.delete(io)
      fiber&.resume
    end

    unless iovs.nil?
      iovs&.each do |v|
        io, ret = v
        fiber = @iovs.delete(io)
        fiber&.resume(ret)
      end
    end

    collect

    if @waiting.any?
      time = current_time
      waiting, @waiting = @waiting, {}

      waiting.each do |fiber, timeout|
        if timeout <= time
          fiber.resume if fiber.is_a? Fiber and fiber.alive?
        else
          @waiting[fiber] = timeout
        end
      end
    end

    if @ready.any?
      ready = nil

      @lock.synchronize do
        ready, @ready = @ready, []
      end

      ready.each do |fiber|
        fiber.resume if fiber.is_a? Fiber and fiber.alive?
      end
    end
  end
end
unblock(blocker, fiber) click to toggle source

Unblock the specified fiber. @parameter blocker [Object] What we are waiting on, informational only. @parameter fiber [Fiber] The fiber to unblock. @reentrant Thread safe.

# File lib/evt/backends/bundled.rb, line 142
def unblock(blocker, fiber)
  @lock.synchronize do
    @ready << fiber
  end
end