class BackPressure::GatedExecutor
A {GatedExecutor} is an implementation of {BackPressure::Executor} that allows external control of back-pressure state, and is useful when non-blocking APIs provide hooks for identifying when they should block.
@author Ry Biesemeyer <identity@yaauie.com> @since 1.0.0
@example Using a GatedExecutor
with a non-blocking API
gated_executor = BackPressure::GatedExecutor.new non_blocking_api_client.on_connection_blocked { gated_executor.engage_back_pressure } non_blocking_api_client.on_connection_unblocked { gated_executor.remove_back_pressure } 16.times do Thread.new do loop do message = queue.pop gated_executor.execute { non_blocking_api_client.push(message) } end end end
Constants
- DEFAULT_REASON
Public Class Methods
@param logger [Logger]: logger on which to emit (optional) @param description [String]: description for logs (optional) @param log_threshold [Number]: silences blockage warnings for durations
less than the provided value (default `0`).
@yield [gated_executor] if a block is provided, the newly-created
instance is yielded to the given block after being initialised.
@yieldparam [self] @yieldreturn [void]
# File lib/back_pressure/gated_executor.rb, line 59 def initialize(logger: nil, description: nil, log_threshold: 1) @logger = logger @desc = (description ? description.dup : "#{self.class.name}<#{__id__}>").freeze @log_threshold = log_threshold @control_mutex = Mutex.new @control_condv = ConditionVariable.new @blocked_threads = Set.new @blocked_threads_mutex = Mutex.new yield(self) if block_given? end
Public Instance Methods
Helper method for determining if back-pressure is currently engaged.
@return [Boolean]
# File lib/back_pressure/gated_executor.rb, line 119 def back_pressure_engaged? @control_mutex.synchronize { @back_pressure_engaged } end
(see Executor#blocked?
)
# File lib/back_pressure/gated_executor.rb, line 162 def blocked? blocked_threads.any? end
(see Executor#blocked_threads
)
# File lib/back_pressure/gated_executor.rb, line 156 def blocked_threads @blocked_threads_mutex.synchronize { @blocked_threads.dup.freeze } end
Engages back-pressure and immediately returns; threads that send this instance `GatedExecutor#execute` will be blocked until back-pressure is removed.
@param reason [String]: the reason back-pressure is being applied, to be
included in the log message (optional).
@return [void]
# File lib/back_pressure/gated_executor.rb, line 84 def engage_back_pressure(reason=DEFAULT_REASON) @control_mutex.synchronize do if !@back_pressure_engaged @back_pressure_engaged = true @logger && @logger.info("#{@desc} back-pressure engaged (#{reason})") else @logger && @logger.debug("#{@desc} attempted to engage back-pressure when it is already engaged (#{reason})") end end end
(see Executor#execute
)
@note Care must be taken to ensure that back-pressure control is executed
outside of this block, as the block provided is not executed while back-pressure is engaged.
# File lib/back_pressure/gated_executor.rb, line 129 def execute(blocking_time_limit=nil) fail(ArgumentError, 'block required!') unless block_given? if !@back_pressure_engaged || block_until_back_pressure_removed(blocking_time_limit) yield return true else return false end end
(see Executor#execute!
)
@note Care must be taken to ensure that back-pressure control is executed
outside of this block, as the block provided is not executed while back-pressure is engaged.
# File lib/back_pressure/gated_executor.rb, line 146 def execute!(blocking_time_limit=nil) execute(blocking_time_limit) do return yield end fail(ExecutionExpired) end
Removes back-pressure, waking any threads that are currently blocked by back-pressure, and immediately returns.
@note No guarantee of ordering are made with regard to threads that
are blocked at the instant back-pressure is removed.
@return [void]
# File lib/back_pressure/gated_executor.rb, line 103 def remove_back_pressure(reason=DEFAULT_REASON) @control_mutex.synchronize do if @back_pressure_engaged @back_pressure_engaged = false @logger && @logger.info("#{@desc} back-pressure removed (#{reason})") @control_condv.broadcast # wakeup _all_ waiting threads else @logger && @logger.debug("#{@desc} attempted to remove back-pressure when it not engaged (#{reason})") end end end
Private Instance Methods
Blocks while back-pressure is engaged, immediately unblocking all threads as soon as the back-pressure is removed.
@api private
@param blocking_time_limit [Number]: the maximum time to wait, in seconds,
when back-pressure is being applied, before aborting (optional).
@return [Boolean] returns `true` as soon as back-pressure is released,
or `false` if a provided `blocking_time_limit` was reached before back-ressure could be released.
# File lib/back_pressure/gated_executor.rb, line 181 def block_until_back_pressure_removed(blocking_time_limit=nil) @blocked_threads_mutex.synchronize { @blocked_threads.add(Thread.current) } timeout = [0.5, blocking_time_limit].compact.min start = Time.now thread_id = Thread.current.to_s loop do should_block = @control_mutex.synchronize do @control_condv.wait(@control_mutex, timeout) @back_pressure_engaged end break unless should_block block_duration = Time.now - start if @logger && block_duration > @log_threshold @logger.warn("#{@desc} has been blocked for #{block_duration.round(2)}s... (#{thread_id})") end if blocking_time_limit && block_duration > blocking_time_limit @logger && @logger.warn("#{@desc} blocking back-pressure exceeded limit of #{blocking_time_limit}s (#{thread_id})") return false end timeout = [30, (timeout * 2), blocking_time_limit && (blocking_time_limit-block_duration)].compact.min end return true ensure @blocked_threads_mutex.synchronize { @blocked_threads.delete(Thread.current) } end