class IOPromise::ExecutorContext

Public Class Methods

current() click to toggle source
# File lib/iopromise/executor_context.rb, line 9
def current
  @context ||= ExecutorContext.new
end
new() click to toggle source
Calls superclass method
# File lib/iopromise/executor_context.rb, line 14
def initialize
  @pools = {}

  @pending_registrations = []

  @selector = NIO::Selector.new

  super
end

Public Instance Methods

register(promise) click to toggle source
# File lib/iopromise/executor_context.rb, line 30
def register(promise)
  @pending_registrations << promise
  IOPromise::CancelContext.current&.subscribe(promise)
end
register_observer_io(observer, io, interest) click to toggle source
# File lib/iopromise/executor_context.rb, line 24
def register_observer_io(observer, io, interest)
  monitor = @selector.register(io, interest)
  monitor.value = observer
  monitor
end
wait_for_all_data(end_when_complete: nil) click to toggle source
# File lib/iopromise/executor_context.rb, line 35
def wait_for_all_data(end_when_complete: nil)
  unless end_when_complete.nil?
    raise IOPromise::CancelledError if end_when_complete.cancelled?
  end

  loop do
    complete_pending_registrations

    @pools.each do |pool, _|
      pool.execute_continue
    end
    
    unless end_when_complete.nil?
      return unless end_when_complete.pending?
    end
    
    break if @selector.empty?

    # if we have any pending promises to register, we'll not block at all so we immediately continue
    unless @pending_registrations.empty?
      wait_time = 0
    else
      wait_time = nil
      @pools.each do |pool, _|
        timeout = pool.select_timeout
        wait_time = timeout if wait_time.nil? || (!timeout.nil? && timeout < wait_time)
      end
    end
    
    ready_count = select(wait_time)
  end

  unless end_when_complete.nil?
    raise ::IOPromise::Error.new('Internal error: IO loop completed without fulfilling the desired promise')
  else
    @pools.each do |pool, _|
      pool.wait
    end
  end
ensure
  complete_pending_registrations
end

Private Instance Methods

complete_pending_registrations() click to toggle source
# File lib/iopromise/executor_context.rb, line 87
def complete_pending_registrations
  return if @pending_registrations.empty?
  pending = @pending_registrations
  @pending_registrations = []
  pending.each do |promise|
    register_now(promise) unless promise.cancelled?
  end
end
register_now(promise) click to toggle source
# File lib/iopromise/executor_context.rb, line 96
def register_now(promise)
  pool = promise.execute_pool
  pool.register(promise)
  @pools[pool] = true
end
select(wait_time) click to toggle source
# File lib/iopromise/executor_context.rb, line 80
def select(wait_time)
  @selector.select(wait_time) do |monitor|
    observer = monitor.value
    observer.monitor_ready(monitor, monitor.readiness)
  end
end