class Concurrent::ErlangActor::AbstractActor
Public Class Methods
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 652 def initialize(mailbox, environment, name, executor) super() @Mailbox = mailbox @Pid = Pid.new self, name @Linked = ::Set.new @Monitors = {} @Monitoring = {} @MonitoringLateDelivery = {} @Terminated = Promises.resolvable_future @trap = false @reply = nil @Environment = if environment.is_a?(Class) && environment <= Environment environment.new self, executor elsif environment.is_a? Module e = Environment.new self, executor e.extend environment e else raise ArgumentError, "environment has to be a class inheriting from Environment or a module" end end
@param [Promises::Channel] mailbox
Calls superclass method
Public Instance Methods
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 695 def ask(message, timeout, timeout_value) log DEBUG, @Pid, asked: message if @Terminated.resolved? raise NoActor.new(@Pid) else probe = Promises.resolvable_future question = Ask.new(message, probe) if timeout start = Concurrent.monotonic_time in_time = tell question, timeout # recheck it could have in the meantime terminated and drained mailbox raise NoActor.new(@Pid) if @Terminated.resolved? # has to be after resolved check, to catch case where it would return timeout_value # when it was actually terminated to_wait = if in_time time = timeout - (Concurrent.monotonic_time - start) time >= 0 ? time : 0 else 0 end # TODO (pitr-ch 06-Feb-2019): allow negative timeout everywhere, interpret as 0 probe.value! to_wait, timeout_value, [true, nil, nil] else raise NoActor.new(@Pid) if @Terminated.resolved? tell question probe.reject NoActor.new(@Pid), false if @Terminated.resolved? probe.value! end end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 726 def ask_op(message, probe) log DEBUG, @Pid, asked: message if @Terminated.resolved? probe.reject NoActor.new(@Pid), false else tell_op(Ask.new(message, probe)).then do probe.reject NoActor.new(@Pid), false if @Terminated.resolved? probe end.flat end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 812 def demonitor(reference, *options) info = options.delete :info flush = options.delete :flush raise ArgumentError, "bad options #{options}" unless options.empty? pid = @Monitoring.delete reference demonitoring = !!pid pid.tell DeMonitor.new @Pid, reference if demonitoring if flush # remove (one) down message having reference from mailbox flushed = demonitoring ? !!@Mailbox.try_pop_matching(And[DownSignal, -> m { m.reference == reference }]) : false return info ? !flushed : true end if info return false unless demonitoring if @Mailbox.peek_matching(And[DownSignal, -> m { m.reference == reference }]) @MonitoringLateDelivery[reference] = pid # allow to deliver the message once return false end end return true end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 766 def link(pid) return true if pid == @Pid if @Linked.add? pid pid.tell Link.new(@Pid) if pid.terminated.resolved? # no race since it only could get NoActor if @trap tell Terminate.new pid, NoActor.new(pid) else @Linked.delete pid raise NoActor.new(pid) end end end true end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 788 def linked?(pid) @Linked.include? pid end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 792 def monitor(pid) # *monitoring* *monitored* # send Monitor # terminated? # terminate before getting Monitor # drain signals including the Monitor reference = Reference.new @Monitoring[reference] = pid if pid.terminated.resolved? # always return no-proc when terminated tell DownSignal.new(pid, reference, NoActor.new(pid)) else # otherwise let it race pid.tell Monitor.new(@Pid, reference) # no race, it cannot get anything else than NoActor tell DownSignal.new(pid, reference, NoActor.new(pid)) if pid.terminated.resolved? end reference end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 839 def monitoring?(reference) @Monitoring.include? reference end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 757 def on(matcher, value = nil, &block) raise ArgumentError, 'only one of block or value can be supplied' if block && value [matcher, value || block] end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 742 def pid @Pid end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 762 def receive(*rules, timeout: nil, timeout_value: nil, **options, &block) raise NotImplementedError end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 860 def reply_resolution(fulfilled, value, reason) return false unless @reply return !!@reply.resolve(fulfilled, value, reason, false) end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 843 def spawn(*args, type:, channel:, environment:, name:, link:, monitor:, executor:, &body) actor = ErlangActor.create type, channel, environment, name, executor pid = actor.pid link pid if link ref = (monitor pid if monitor) actor.run(*args, &body) monitor ? [pid, ref] : pid end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 685 def tell(message, timeout = nil) log DEBUG, @Pid, told: message if (mailbox = @Mailbox) timed_out = mailbox.push message, timeout timeout ? timed_out : @Pid else timeout ? false : @Pid end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 676 def tell_op(message) log DEBUG, @Pid, told: message if (mailbox = @Mailbox) mailbox.push_op(message).then { @Pid } else Promises.fulfilled_future @Pid end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 865 def terminate(pid = nil, reason, value: nil) if pid # has to send it to itself even if pid equals self.pid if reason == :kill pid.tell Kill.new(@Pid) else pid.tell Terminate.new(@Pid, reason, false) end else terminate_self(reason, value) end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 738 def terminated @Terminated.with_hidden_resolvable end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 750 def trap(value = true) old = @trap # noinspection RubySimplifyBooleanInspection @trap = !!value old end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 746 def traps? @trap end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 783 def unlink(pid) pid.tell UnLink.new(@Pid) if @Linked.delete pid true end
Private Instance Methods
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1031 def after_termination(final_reason) log DEBUG, @Pid, terminated: final_reason clean_reply NoActor.new(@Pid) while true message = @Mailbox.try_pop NOTHING break if message == NOTHING case message when Monitor # The actor is terminated so we must return NoActor, # even though we still know the reason. # Otherwise it would return different reasons non-deterministically. message.from.tell DownSignal.new(@Pid, message.reference, NoActor.new(@Pid)) when Link # same as for Monitor message.from.tell NoActor.new(@Pid) when Ask message.probe.reject(NoActor.new(@Pid), false) else # normal messages and other signals are thrown away end end @Mailbox = nil end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 930 def asked? !!@reply end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 880 def canonical_rules(rules, timeout, timeout_value, given_block) block = given_block || -> v { v } case rules.size when 0 rules.push(on(ANY, &block)) when 1 matcher = rules.first if matcher.is_a?(::Array) && matcher.size == 2 return ArgumentError.new 'a block cannot be given if full rules are used' if given_block else rules.replace([on(matcher, &block)]) end else return ArgumentError.new 'a block cannot be given if full rules are used' if given_block end if timeout # TIMEOUT rule has to be first, to prevent any picking it up ANY has_timeout = nil i = rules.size rules.reverse_each do |r, _| i -= 1 if r == TIMEOUT has_timeout = i break end end rules.unshift(has_timeout ? rules[has_timeout] : on(TIMEOUT, timeout_value)) end nil end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 934 def clean_reply(reason = NoReply) if @reply @reply.reject(reason, false) @reply = nil end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 987 def consume_exit(exit_message) from, reason = exit_message if !exit_message.link_terminated || @Linked.delete(from) case reason when :normal if @trap Terminated.new from, reason else if from == @Pid terminate :normal else NOTHING # do nothing end end else if @trap Terminated.new from, reason else terminate reason end end else # *link* *exiting* # send Link # terminate # terminated? # drain signals # generates second Terminated which is dropped here # already processed exit message, do nothing NOTHING end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 941 def consume_signal(message) if AbstractSignal === message case message when Ask @reply = message.probe message.message when Link @Linked.add message.from NOTHING when UnLink @Linked.delete message.from NOTHING when Monitor @Monitors[message.reference] = message.from NOTHING when DeMonitor @Monitors.delete message.reference NOTHING when Kill terminate :killed when DownSignal if @Monitoring.delete(message.reference) || @MonitoringLateDelivery.delete(message.reference) # put into a queue return Down.new(message.from, message.reference, message.info) end # ignore down message if no longer monitoring, and following case # # *monitoring* *monitored* # send Monitor # terminate # terminated? # drain signals # generates second DOWN which is dropped here # already reported as :noproc NOTHING when Terminate consume_exit message else raise "unknown message #{message}" end else # regular message message end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 913 def eval_task(message, job) if job.is_a? Proc @Environment.instance_exec message, &job else job end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1019 def initial_signal_consumption while true message = @Mailbox.try_pop break unless message consume_signal(message) == NOTHING or raise 'it was not consumable signal' end end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 921 def send_exit_messages(reason) @Linked.each do |pid| pid.tell Terminate.new(@Pid, reason) end.clear @Monitors.each do |reference, pid| pid.tell DownSignal.new(@Pid, reference, reason) end.clear end
Source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1027 def terminate_self(reason, value) raise NotImplementedError end