class Concurrent::Throttle
A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.
The more common usage of the Throttle
is with a proxy executor ‘a_throttle.on(Concurrent.global_io_executor)`. Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.
{include:file:docs-source/throttle.out.md}
@!macro warn.edge
Public Class Methods
Create throttle. @param [Integer] capacity How many tasks using this throttle can run at the same time.
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 37 def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity end
Public Instance Methods
Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling {#release}. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.
@param [Numeric] timeout the maximum time in second to wait. @yield [] block to execute after the capacity is acquired @return [Object, self, true, false]
* When no timeout and no block it returns self * When no timeout and with block it returns the result of the block * When with timeout and no block it returns true when acquired and false when timed out * When with timeout and with block it returns the result of the block of nil on timing out
@see release
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 63 def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) # release immediately when acquired later after the timeout since it is unused event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end
@!visibility private
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 140 def acquire_or_event while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity - 1 if current_capacity > 0 return nil else event = Promises.resolvable_event @Queue.push event return event end end end end
@return [Integer] The available capacity.
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 30 def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end
Uses executor provided by {#on} therefore all events and futures created using factory methods on this object will be throttled. Overrides {Promises::FactoryMethods#default_executor}.
@return [ExecutorService] @see Promises::FactoryMethods#default_executor
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 183 def default_executor on(super) end
@return [Integer] The maximum capacity.
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 46 def max_capacity @MaxCapacity end
@param [ExecutorService] executor @return [ExecutorService] An executor which wraps given executor and allows to post tasks only
as available capacity in the throttle allows.
@example throttling future
a_future.then_on(a_throttle.on(:io)) { a_throttled_task }
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 162 def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? # cache first proxy proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else # do not cache more than 1 executor ProxyExecutor.new(self, Concurrent.executor(executor)) end end
Releases previously acquired capacity back to Throttle
. Has to be called exactly once for each acquired capacity. @return [self] @see acquire_operation, acquire
, try_acquire
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 118 def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 # release called after trigger which pushed a trigger, busy wait is ok Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end
@return [String] Short string representation.
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 133 def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end
Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling {#release}. @return [true, false] @see release
# File lib/concurrent-ruby-edge/concurrent/edge/throttle.rb, line 102 def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end