class Evt::Bundled
Constants
- COLLECT_COUNTER_MAX
- MAXIMUM_TIMEOUT
Attributes
Public Class Methods
# File lib/evt/backends/bundled.rb, line 7 def initialize @readable = {} @writable = {} @waiting = {} @iovs = {} @lock = Mutex.new @blocking = 0 @ready = [] @collect_counter = 0 init_selector end
Public Instance Methods
Block the calling fiber. @parameter blocker [Object] What we are waiting on, informational only. @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds. @returns [Boolean] Whether the blocking operation was successful or not.
# File lib/evt/backends/bundled.rb, line 120 def block(blocker, timeout = nil) if timeout @waiting[Fiber.current] = current_time + timeout begin Fiber.yield ensure @waiting.delete(Fiber.current) end else @blocking += 1 begin Fiber.yield ensure @blocking -= 1 end end end
Invoked when the thread exits.
# File lib/evt/backends/bundled.rb, line 149 def close self.run end
Collect closed streams in readables and writables
# File lib/evt/backends/bundled.rb, line 154 def collect(force=false) if @collect_counter < COLLECT_COUNTER_MAX and !force @collect_counter += 1 return end @collect_counter = 0 @readable.keys.each do |io| @readable.delete(io) if io.closed? end @writable.keys.each do |io| @writable.delete(io) if io.closed? end @iovs.keys.each do |io| @iovs.delete(io) if io.closed? end end
# File lib/evt/backends/bundled.rb, line 88 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
Intercept the creation of a non-blocking fiber. @returns [Fiber]
# File lib/evt/backends/bundled.rb, line 177 def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
Wait for the given file descriptor to match the specified events within the specified timeout. @parameter event [Integer] A bit mask of `IO::READABLE`,
`IO::WRITABLE` and `IO::PRIORITY`.
@parameter timeout [Numeric] The amount of time to wait for the event in seconds. @returns [Integer] The subset of events that are ready.
# File lib/evt/backends/bundled.rb, line 98 def io_wait(io, events, duration) # TODO: IO::PRIORITY @readable[io] = Fiber.current unless (events & IO::READABLE).zero? @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero? self.register(io, events) Fiber.yield self.deregister(io) true end
Sleep the current task for the specified duration, or forever if not specified. @param duration [Numeric] The amount of time to sleep in seconds.
# File lib/evt/backends/bundled.rb, line 111 def kernel_sleep(duration = nil) self.block(:sleep, duration) true end
# File lib/evt/backends/bundled.rb, line 25 def next_timeout _fiber, timeout = @waiting.min_by{ |key, value| value } if timeout offset = (timeout - current_time) * 1000 # Use mililisecond return 0 if offset < 0 return offset if offset < MAXIMUM_TIMEOUT end MAXIMUM_TIMEOUT end
# File lib/evt/backends/bundled.rb, line 37 def run while @readable.any? or @writable.any? or @waiting.any? or @iovs.any? or @blocking.positive? readable, writable, iovs = self.wait readable&.each do |io| fiber = @readable.delete(io) fiber&.resume end writable&.each do |io| fiber = @writable.delete(io) fiber&.resume end unless iovs.nil? iovs&.each do |v| io, ret = v fiber = @iovs.delete(io) fiber&.resume(ret) end end collect if @waiting.any? time = current_time waiting, @waiting = @waiting, {} waiting.each do |fiber, timeout| if timeout <= time fiber.resume if fiber.is_a? Fiber and fiber.alive? else @waiting[fiber] = timeout end end end if @ready.any? ready = nil @lock.synchronize do ready, @ready = @ready, [] end ready.each do |fiber| fiber.resume if fiber.is_a? Fiber and fiber.alive? end end end end
Unblock the specified fiber. @parameter blocker [Object] What we are waiting on, informational only. @parameter fiber [Fiber] The fiber to unblock. @reentrant Thread safe.
# File lib/evt/backends/bundled.rb, line 142 def unblock(blocker, fiber) @lock.synchronize do @ready << fiber end end