class IOPromise::ExecutorContext
Public Class Methods
current()
click to toggle source
# File lib/iopromise/executor_context.rb, line 9 def current @context ||= ExecutorContext.new end
new()
click to toggle source
Calls superclass method
# File lib/iopromise/executor_context.rb, line 14 def initialize @pools = {} @pending_registrations = [] @selector = NIO::Selector.new super end
Public Instance Methods
register(promise)
click to toggle source
# File lib/iopromise/executor_context.rb, line 30 def register(promise) @pending_registrations << promise IOPromise::CancelContext.current&.subscribe(promise) end
register_observer_io(observer, io, interest)
click to toggle source
# File lib/iopromise/executor_context.rb, line 24 def register_observer_io(observer, io, interest) monitor = @selector.register(io, interest) monitor.value = observer monitor end
wait_for_all_data(end_when_complete: nil)
click to toggle source
# File lib/iopromise/executor_context.rb, line 35 def wait_for_all_data(end_when_complete: nil) unless end_when_complete.nil? raise IOPromise::CancelledError if end_when_complete.cancelled? end loop do complete_pending_registrations @pools.each do |pool, _| pool.execute_continue end unless end_when_complete.nil? return unless end_when_complete.pending? end break if @selector.empty? # if we have any pending promises to register, we'll not block at all so we immediately continue unless @pending_registrations.empty? wait_time = 0 else wait_time = nil @pools.each do |pool, _| timeout = pool.select_timeout wait_time = timeout if wait_time.nil? || (!timeout.nil? && timeout < wait_time) end end ready_count = select(wait_time) end unless end_when_complete.nil? raise ::IOPromise::Error.new('Internal error: IO loop completed without fulfilling the desired promise') else @pools.each do |pool, _| pool.wait end end ensure complete_pending_registrations end
Private Instance Methods
complete_pending_registrations()
click to toggle source
# File lib/iopromise/executor_context.rb, line 87 def complete_pending_registrations return if @pending_registrations.empty? pending = @pending_registrations @pending_registrations = [] pending.each do |promise| register_now(promise) unless promise.cancelled? end end
register_now(promise)
click to toggle source
# File lib/iopromise/executor_context.rb, line 96 def register_now(promise) pool = promise.execute_pool pool.register(promise) @pools[pool] = true end
select(wait_time)
click to toggle source
# File lib/iopromise/executor_context.rb, line 80 def select(wait_time) @selector.select(wait_time) do |monitor| observer = monitor.value observer.monitor_ready(monitor, monitor.readiness) end end