class Concurrent::Promises::Channel
A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see {#pop_op} and {#push_op}. The capacity of the channel can be limited to support back pressure, use capacity option in {#initialize}. {#pop} method blocks ans {#pop_op} returns pending future if there is no message in the channel. If the capacity is limited the {#push} method blocks and {#push_op} returns pending future.
{include:file:docs-source/channel.out.md} @!macro warn.edge
Constants
- ANY
An object which matches anything (with ===)
- NOTHING
- UNLIMITED_CAPACITY
Default capacity of the
Channel
, makes it accept unlimited number of messages.
Public Class Methods
Create channel. @param [Integer, UNLIMITED_CAPACITY
] capacity the maximum number of messages which can be stored in the channel.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 64 def initialize(capacity = UNLIMITED_CAPACITY) super() @Capacity = capacity @Mutex = Mutex.new # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle @Probes = [] @Messages = [] @PendingPush = [] end
@see select_matching
@return [::Array(Channel
, Object), nil]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 340 def select_matching(matcher, channels, timeout = nil) channels.first.select_matching(matcher, channels[1..-1], timeout) end
@see select_op
@return [Future(::Array(Channel, Object))]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 316 def select_op(channels, probe = Promises.resolvable_future) channels.first.select_op(channels[1..-1], probe) end
@see select_op_matching
@return [Future(::Array(Channel, Object))]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 334 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) channels.first.select_op_matching(matcher, channels[1..-1], probe) end
@see try_select
@return [::Array(Channel
, Object)]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 310 def try_select(channels) channels.first.try_select(channels[1..-1]) end
@see try_select_matching
@return [::Array(Channel
, Object)]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 328 def try_select_matching(matcher, channels) channels.first.try_select_matching(matcher, channels[1..-1]) end
Public Instance Methods
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 39 def <=>(other) 1 end
@return [Integer] Maximum capacity of the Channel
.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 295 def capacity @Capacity end
@!visibility private
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 346 def partial_select_op(matcher, probe) @Mutex.synchronize { ns_pop_op(matcher, probe, true) } end
@!macro promises.channel.peek
Behaves as {#try_pop} but it does not remove the message from the channel @param [Object] no_value returned when there is no message available @return [Object, no_value] message or nil when there is no message
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 209 def peek(no_value = nil) peek_matching ANY, no_value end
@!macro promises.channel.peek @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 215 def peek_matching(matcher, no_value = nil) @Mutex.synchronize do message = ns_shift_message matcher, false return message if message != NOTHING message = ns_consume_pending_push matcher, false return message != NOTHING ? message : no_value end end
@!macro promises.channel.pop
Blocks current thread until a message is available in the channel for popping. @!macro channel.warn.blocks @!macro channel.param.timeout @param [Object] timeout_value a value returned by the method when it times out @return [Object, nil] message or nil when timed out
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 177 def pop(timeout = nil, timeout_value = nil) pop_matching ANY, timeout, timeout_value end
@!macro promises.channel.pop @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 183 def pop_matching(matcher, timeout = nil, timeout_value = nil) # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer # that it returns even if the buffer is full. User might expect that it has to be in the buffer first. probe = @Mutex.synchronize do message = ns_shift_message matcher if message == NOTHING message = ns_consume_pending_push matcher return message if message != NOTHING else new_message = ns_consume_pending_push ANY @Messages.push new_message unless new_message == NOTHING return message end probe = Promises.resolvable_future @Probes.push probe, false, matcher probe end probe.value!(timeout, timeout_value, [true, timeout_value, nil]) end
@!macro promises.channel.pop_op
Returns a future witch will become fulfilled with a value from the channel when one is available. @!macro chanel.operation_wait @param [ResolvableFuture] probe the future which will be fulfilled with a channel value @return [Future(Object)] the probe, its value will be the message when available.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 160 def pop_op(probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(ANY, probe, false) } end
@!macro promises.channel.pop_op @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 166 def pop_op_matching(matcher, probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(matcher, probe, false) } end
Blocks current thread until the message is pushed into the channel.
@!macro channel.warn.blocks @param [Object] message @!macro channel.param.timeout @return [self, true, false] self implies timeout was not used, true implies timeout was used
and it was pushed, false implies it was not pushed within timeout.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 120 def push(message, timeout = nil) pushed_op = @Mutex.synchronize do return timeout ? true : self if ns_try_push(message) pushed = Promises.resolvable_future # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages @PendingPush.push message, pushed pushed end result = pushed_op.wait!(timeout, [true, self, nil]) result == pushed_op ? self : result end
Returns future which will fulfill when the message is pushed to the channel. @!macro chanel.operation_wait
If it is later waited on the operation with a timeout e.g.`channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later ```ruby pop_op = channel.pop_op if pop_op.wait(1) process_message pop_op.value else pop_op.then { |message| log_unprocessed_message message } end ``` or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
@param [Object] message @return [ResolvableFuture(self)]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 101 def push_op(message) @Mutex.synchronize do if ns_try_push(message) Promises.fulfilled_future self else pushed = Promises.resolvable_future @PendingPush.push message, pushed return pushed end end end
@!macro promises.channel.select
As {#select_op} but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels. @!macro channel.warn.blocks @param [Channel, ::Array<Channel>] channels @!macro channel.param.timeout @return [::Array(Channel, Object), nil] message or nil when timed out @see #select_op
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 278 def select(channels, timeout = nil) select_matching ANY, channels, timeout end
@!macro promises.channel.select @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 284 def select_matching(matcher, channels, timeout = nil) probe = select_op_matching(matcher, channels) probe.value!(timeout, nil, [true, nil, nil]) end
@!macro promises.channel.select_op
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. @!macro chanel.operation_wait @param [Channel, ::Array<Channel>] channels @param [ResolvableFuture] probe the future which will be fulfilled with the message @return [ResolvableFuture(::Array(Channel, Object))] a future which is fulfilled with pair [channel, message] when one of the channels is available for reading
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 257 def select_op(channels, probe = Promises.resolvable_future) select_op_matching ANY, channels, probe end
@!macro promises.channel.select_op @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 263 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) [self, *channels].each { |ch| ch.partial_select_op matcher, probe } probe end
@return [Integer] The number of messages currently stored in the channel.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 290 def size @Mutex.synchronize { @Messages.size } end
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 43 def to_s 'unlimited' end
@!macro promises.channel.try_pop
Pop a message from the channel if there is one available. @param [Object] no_value returned when there is no message available @return [Object, no_value] message or nil when there is no message
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 138 def try_pop(no_value = nil) try_pop_matching ANY, no_value end
@!macro promises.channel.try_pop @!macro promises.channel.param.matcher
@param [#===] matcher only consider message which matches `matcher === a_message`
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 145 def try_pop_matching(matcher, no_value = nil) @Mutex.synchronize do message = ns_shift_message matcher return message if message != NOTHING message = ns_consume_pending_push matcher return message != NOTHING ? message : no_value end end
Push the message into the channel if there is space available. @param [Object] message @return [true, false]
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 77 def try_push(message) @Mutex.synchronize { ns_try_push(message) } end
@!macro promises.channel.try_select
If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message. @param [Channel, ::Array<Channel>] channels @return [::Array(Channel, Object), nil] pair [channel, message] if one of the channels is available for reading
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 232 def try_select(channels) try_select_matching ANY, channels end
@!macro promises.channel.try_select @!macro promises.channel.param.matcher
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 238 def try_select_matching(matcher, channels) message = nil channel = [self, *channels].find do |ch| message = ch.try_pop_matching(matcher, NOTHING) message != NOTHING end channel ? [channel, message] : nil end
Private Instance Methods
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 397 def ns_consume_pending_push(matcher, remove = true) i = 0 while true message, pushed = @PendingPush[i, 2] return NOTHING unless pushed if matcher === message resolved = pushed.resolved? @PendingPush[i, 2] = [] if remove || resolved # can fail if timed-out, so try without error if remove ? pushed.fulfill(self, false) : !resolved # pushed fulfilled so actually push the message return message end end i += 2 end end
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 352 def ns_pop_op(matcher, probe, include_channel) message = ns_shift_message matcher # got message from buffer if message != NOTHING if probe.fulfill(include_channel ? [self, message] : message, false) new_message = ns_consume_pending_push ANY @Messages.push new_message unless new_message == NOTHING else @Messages.unshift message end return probe end # no message in buffer, try to pair with a pending push i = 0 while true message, pushed = @PendingPush[i, 2] break if pushed.nil? if matcher === message value = include_channel ? [self, message] : message if Promises::Resolvable.atomic_resolution(probe => [true, value, nil], pushed => [true, self, nil]) @PendingPush[i, 2] = [] return probe end if probe.resolved? return probe end # so pushed.resolved? has to be true, remove the push @PendingPush[i, 2] = [] end i += 2 end # no push to pair with # TODO (pitr-ch 11-Jan-2019): clear up probes when timed out, use callback @Probes.push probe, include_channel, matcher if probe.pending? return probe end
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 437 def ns_shift_message(matcher, remove = true) i = 0 while true message = @Messages.fetch(i, NOTHING) return NOTHING if message == NOTHING if matcher === message @Messages.delete_at i if remove return message end i += 1 end end
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 417 def ns_try_push(message) i = 0 while true probe, include_channel, matcher = @Probes[i, 3] break unless probe if matcher === message && probe.fulfill(include_channel ? [self, message] : message, false) @Probes[i, 3] = [] return true end i += 3 end if @Capacity > @Messages.size @Messages.push message true else false end end