class Concurrent::ErlangActor::Environment

A class providing environment and methods for actor bodies to run in.

Public Class Methods

new(actor, executor) click to toggle source
Calls superclass method
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 471
def initialize(actor, executor)
  super()
  @Actor           = actor
  @DefaultExecutor = executor
end

Public Instance Methods

default_executor() click to toggle source

@return [ExecutorService] a default executor which is picked by spawn call

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 465
def default_executor
  @DefaultExecutor
end
demonitor(reference, *options) click to toggle source

If MonitorRef is a reference which the calling actor obtained by calling {#monitor}, this monitoring is turned off. If the monitoring is already turned off, nothing happens.

Once demonitor has returned it is guaranteed that no {DownSignal} message due to the monitor will be placed in the caller’s message queue in the future. A {DownSignal} message might have been placed in the caller’s message queue prior to the call, though. Therefore, in most cases, it is advisable to remove such a ‘DOWN’ message from the message queue after monitoring has been stopped. ‘demonitor(reference, :flush)` can be used if this cleanup is wanted.

The behavior of this method can be viewed as two combined operations: asynchronously send a “demonitor signal” to the monitored actor and ignore any future results of the monitor.

Failure: It is an error if reference refers to a monitoring started by another actor. In that case it may raise an ArgumentError or go unnoticed.

Options:

  • ‘:flush` - Remove (one) {DownSignal} message, if there is one, from the caller’s message queue after monitoring has been stopped. Calling ‘demonitor(pid, :flush)` is equivalent to the following, but more efficient: “`ruby demonitor(pid) receive on(And[DownSignal, -> d { d.reference == reference}], true), timeout: 0, timeout_value: true “`

  • ‘info` The returned value is one of the following:

    • ‘true` - The monitor was found and removed. In this case no {DownSignal} message due to this monitor have been nor will be placed in the message queue of the caller.

    • ‘false` - The monitor was not found and could not be removed. This probably because someone already has placed a {DownSignal} message corresponding to this monitor in the caller’s message queue.

    If the info option is combined with the flush option, ‘false` will be returned if a flush was needed; otherwise, `true`.

@param [Reference] reference @param [:flush, :info] options @return [true, false]

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 332
def demonitor(reference, *options)
  @Actor.demonitor(reference, *options)
end
linked?(pid) click to toggle source

@!visibility private @return [true, false]

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 268
def linked?(pid)
  @Actor.linked? pid
end
monitor(pid) click to toggle source

The calling actor starts monitoring actor with given pid.

A {DownSignal} message will be sent to the monitoring actor if the actor with given pid dies, or if the actor with given pid does not exist.

The monitoring is turned off either when the {DownSignal} message is sent, or when {#demonitor} is called.

Making several calls to monitor for the same pid is not an error; it results in as many, completely independent, monitorings.

@return [Reference]

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 285
def monitor(pid)
  @Actor.monitor(pid)
end
monitoring?(reference) click to toggle source

@!visibility private

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 337
def monitoring?(reference)
  @Actor.monitoring? reference
end
name() click to toggle source

@return [#to_s] the name od the actor if provided to spawn method

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 154
def name
  pid.name
end
on(matcher, value = nil, &block) click to toggle source

Helper for constructing a {#receive} rules @see receive @example

receive on(Numeric) { |v| v.succ },
        on(ANY) { terminate :bad_message }
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 185
def on(matcher, value = nil, &block)
  @Actor.on matcher, value, &block
end
pid() click to toggle source

@return [Pid] the pid of this actor

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 149
def pid
  @Actor.pid
end
receive(*rules, timeout: nil, timeout_value: nil, **options, &block) click to toggle source

Receive a message.

@param [::Array(), ::Array(===), ::Array<::Array(===, Proc)>] rules

*   No rule - `receive`, `receive {|m| m.to_s}`
*   or single rule which can be combined with the supplied block -
    `receive(Numeric)`, `receive(Numeric) {|v| v.succ}`
*   or array of matcher-proc pairs -
    `receive on(Numeric) { |v| v*2 }, on(Symbol) { |c| do_command c }`

@param [Numeric] timeout

how long it should wait for the message

@param [Object] timeout_value

if rule `on(TIMEOUT) { do_something }` is not specified
then timeout_value is returned.

@return [Object, nothing]

depends on type of the actor.
On thread it blocks until message is available
then it returns the message (or a result of a called block).
On pool it stops executing and continues with a given block
when message becomes available.

@param [Hash] options

other options specific by type of the actor

@option options [true, false] :keep

Keep the rules and repeatedly call the associated blocks,
until receive is called again.

@yield [message] block

to process the message
if single matcher is supplied

@yieldparam [Object] message the received message @see ErlangActor Receiving chapter in the ErlangActor examples

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 218
def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
  @Actor.receive(*rules, timeout: timeout, timeout_value: timeout_value, **options, &block)
end
reply(value) click to toggle source

Shortcut for fulfilling the reply, same as ‘reply_resolution true, value, nil`. @example

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply receive * 2 }
actor.ask 2 #=> 4

@param [Object] value @return [true, false] did the sender ask, and was it resolved

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 405
def reply(value)
  # TODO (pitr-ch 08-Feb-2019): consider adding reply? which returns true,false if success, reply method will always return value
  reply_resolution true, value, nil
end
reply_resolution(fulfilled = true, value = nil, reason = nil) click to toggle source

Reply to the sender of the message currently being processed if the actor was asked instead of told. The reply is stored in a {Promises::ResolvableFuture} so the arguments are same as for {Promises::ResolvableFuture#resolve} method.

The reply may timeout, then this will fail with false.

@param [true, false] fulfilled @param [Object] value @param [Object] reason

@example

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply_resolution true, receive * 2, nil }
actor.ask 2 #=> 4

@return [true, false] did the sender ask, and was it resolved before it timed out?

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 426
def reply_resolution(fulfilled = true, value = nil, reason = nil)
  @Actor.reply_resolution(fulfilled, value, reason)
end
spawn(*args, type: @Actor.class, channel: Promises::Channel.new, environment: Environment, name: nil, executor: default_executor, link: false, monitor: false, &body) click to toggle source

Creates an actor.

@param [Object] args arguments for the actor body @param [:on_thread, :on_pool] type

of the actor to be created.

@param [Channel] channel

The mailbox of the actor, by default it has unlimited capacity.
Crating the actor with a bounded queue is useful to create backpressure.
The channel can be shared with other abstractions
but actor has to be the only consumer
otherwise internal signals could be lost.

@param [Environment, Module] environment

A class which is used to run the body of the actor in.
It can either be a child of {Environment} or a module.
Module is extended to a new instance of environment,
therefore if there is many actors with this module
it is better to create a class and use it instead.

@param [#to_s] name of the actor.

Available by {Pid#name} or {Environment#name} and part of {Pid#to_s}.

@param [true, false] link

the created actor is atomically created and linked with the calling actor

@param [true, false] monitor

the created actor is atomically created and monitored by the calling actor

@param [ExecutorService] executor

The executor service to use to execute the actor on.
Applies only to :on_pool actor type.

@yield [*args] the body of the actor.

When actor is spawned this block is evaluated
until it terminates.
The on-thread actor requires a block.
The on-poll actor has a default `-> { start }`,
therefore if not block is given it executes a #start method
which needs to be provided with environment.

@return [Pid, ::Array(Pid, Reference)] a pid or a pid-reference pair when monitor is true @see www1.erlang.org/doc/man/erlang.html#spawn-1 @see www1.erlang.org/doc/man/erlang.html#spawn_link-1 @see www1.erlang.org/doc/man/erlang.html#spawn_monitor-1

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 378
def spawn(*args,
          type: @Actor.class,
          channel: Promises::Channel.new,
          environment: Environment,
          name: nil,
          executor: default_executor,
          link: false,
          monitor: false,
          &body)

  @Actor.spawn(*args,
               type:        type,
               channel:     channel,
               environment: environment,
               name:        name,
               executor:    executor,
               link:        link,
               monitor:     monitor,
               &body)
end
terminate(pid = nil, reason, value: nil) click to toggle source

If pid **is not** provided stops the execution of the calling actor with the exit reason.

If pid is provided, it sends an exit signal with exit reason to the actor identified by pid.

The following behavior apply if ‘reason` is any object except `:normal` or `:kill`. If pid is not trapping exits, pid itself will exit with exit reason. If pid is trapping exits, the exit signal is transformed into a message {Terminated} and delivered to the message queue of pid.

If reason is the Symbol ‘:normal`, pid will not exit. If it is trapping exits, the exit signal is transformed into a message {Terminated} and delivered to its message queue.

If reason is the Symbol ‘:kill`, that is if `exit(pid, :kill)` is called, an untrappable exit signal is sent to pid which will unconditionally exit with exit reason `:killed`.

Since evaluating this function causes the process to terminate, it has no return value.

@param [Pid] pid @param [Object, :normal, :kill] reason @param [Object] value @return [nothing] @see www1.erlang.org/doc/man/erlang.html#error-1 @see www1.erlang.org/doc/man/erlang.html#error-2

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 460
def terminate(pid = nil, reason, value: nil)
  @Actor.terminate pid, reason, value: value
end
terminated() click to toggle source

@!macro erlang_actor.terminated

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 144
def terminated
  @Actor.terminated
end
trap(value = true) click to toggle source

When trap is set to true, exit signals arriving to a actor are converted to {Terminated} messages, which can be received as ordinary messages. If trap is set to false, the actor exits if it receives an exit signal other than normal and the exit signal is propagated to its linked actors. Application actors should normally not trap exits.

@param [true, false] value @return [true, false] the old value of the flag @see www1.erlang.org/doc/man/erlang.html#process_flag-2

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 176
def trap(value = true)
  @Actor.trap(value)
end
traps?() click to toggle source

@return [true, false] does this actor trap exit messages? @see www1.erlang.org/doc/man/erlang.html#process_flag-2

# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 160
def traps?
  @Actor.traps?
end