class Concurrent::ErlangActor::OnPool

Public Class Methods

new(channel, environment, name, executor) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1060
def initialize(channel, environment, name, executor)
  super channel, environment, name, executor
  @Executor       = executor
  @behaviour      = []
  @keep_behaviour = false
end

Public Instance Methods

receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1077
def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)
  clean_reply
  err = canonical_rules rules, timeout, timeout_value, given_block
  raise err if err

  @keep_behaviour = keep
  @timeout        = timeout
  @behaviour      = rules
  throw JUMP, [RECEIVE]
end
run(*args, &body) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1067
def run(*args, &body)
  body ||= -> { start }

  initial_signal_consumption
  inner_run(*args, &body).
      run(Run::TEST).
      then(&method(:after_termination)).
      rescue { |e| log Logger::ERROR, e }
end

Private Instance Methods

apply_behaviour(message) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1178
def apply_behaviour(message)
  @behaviour.each do |rule, job|
    if rule === message
      @behaviour = [] unless @keep_behaviour
      return eval_task(message, job)
    end
  end
  raise 'should not reach'
end
inner_run(*args, &body) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1094
def inner_run(*args, &body)
  first       = !!body
  future_body = -> message, _actor do
    kind, reason, value =
        if message.is_a?(::Array) && message.first == TERMINATE
          message
        else
          begin
            catch(JUMP) do
              [NOTHING,
               :normal,
               first ? @Environment.instance_exec(*args, &body) : apply_behaviour(message)]
            end
          rescue => e
            [TERMINATE, e, nil]
          end
        end

    case kind
    when TERMINATE
      send_exit_messages reason
      @Terminated.resolve(reason == :normal, value, reason)
      reason
    when RECEIVE
      Run[inner_run]
    when NOTHING
      if @behaviour.empty?
        send_exit_messages reason
        @Terminated.resolve(reason == :normal, value, reason)
        reason
      else
        Run[inner_run]
      end
    else
      raise "bad kind: #{kind.inspect}"
    end
  end

  if first
    Promises.future_on(@Executor, nil, self, &future_body)
  else
    internal_receive.run(Run::TEST).then(self, &future_body)
  end
end
internal_receive() click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1139
def internal_receive
  raise if @behaviour.empty?
  rules_matcher  = Or[*@behaviour.map(&:first)]
  matcher        = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
  start          = nil
  message_future = case @timeout
                   when 0
                     Promises.fulfilled_future @Mailbox.try_pop_matching(matcher, TIMEOUT)
                   when Numeric
                     pop   = @Mailbox.pop_op_matching(matcher)
                     start = Concurrent.monotonic_time
                     # FIXME (pitr-ch 30-Jan-2019): the scheduled future should be cancelled
                     (Promises.schedule(@timeout) { TIMEOUT } | pop).then(pop) do |message, p|
                       if message == TIMEOUT && !p.resolve(true, TIMEOUT, nil, false)
                         # timeout raced with probe resolution, take the value instead
                         p.value
                       else
                         message
                       end
                     end
                   when nil
                     @Mailbox.pop_op_matching(matcher)
                   else
                     raise
                   end

  message_future.then(start, self) do |message, s, _actor|
    log Logger::DEBUG, pid, got: message
    catch(JUMP) do
      if (message = consume_signal(message)) == NOTHING
        @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s
        Run[internal_receive]
      else
        message
      end
    end
  end
end
terminate_self(reason, value) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1090
def terminate_self(reason, value)
  throw JUMP, [TERMINATE, reason, value]
end