module Datadog::Workers::Async::Thread
Adds threading behavior to workers to run tasks asynchronously. rubocop:disable Metrics/ModuleLength
Constants
- FORK_POLICY_RESTART
- FORK_POLICY_STOP
- SHUTDOWN_TIMEOUT
Attributes
error[R]
fork_policy[W]
pid[R]
result[RW]
Public Class Methods
included(base)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 15 def self.included(base) base.prepend(PrependedMethods) end
Public Instance Methods
completed?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 68 def completed? !worker.nil? && worker.status == false && !error? end
error?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 62 def error? return false unless instance_variable_defined?(:@error) !@error.nil? end
failed?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 72 def failed? !worker.nil? && worker.status.nil? end
fork_policy()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 80 def fork_policy @fork_policy ||= FORK_POLICY_STOP end
forked?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 76 def forked? !pid.nil? && pid != Process.pid end
join(timeout = nil)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 33 def join(timeout = nil) return true unless running? !worker.join(timeout).nil? end
run_async?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 48 def run_async? return false unless instance_variable_defined?(:@run_async) @run_async == true end
running?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 58 def running? !worker.nil? && worker.alive? end
started?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 54 def started? !(worker.nil? || forked?) end
terminate()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 39 def terminate return false unless running? @run_async = false Datadog.logger.debug { "Forcibly terminating worker thread for: #{self}" } worker.terminate true end
Protected Instance Methods
after_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 93 def after_fork # Do nothing by default end
mutex()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 89 def mutex @mutex ||= Mutex.new end
Private Instance Methods
mutex_after_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 102 def mutex_after_fork @mutex_after_fork ||= Mutex.new end
restart_after_fork(&block)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 161 def restart_after_fork(&block) mutex_after_fork.synchronize do if forked? # Trigger callback to allow workers to reset themselves accordingly after_fork # Start worker start_worker(&block) end end end
start_async(&block)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 110 def start_async(&block) mutex.synchronize do return if running? if forked? case fork_policy when FORK_POLICY_STOP stop_fork when FORK_POLICY_RESTART restart_after_fork(&block) end elsif !run_async? start_worker(&block) end end end
start_worker() { || ... }
click to toggle source
# File lib/ddtrace/workers/async.rb, line 127 def start_worker @run_async = true @pid = Process.pid @error = nil Datadog.logger.debug { "Starting thread for: #{self}" } @worker = ::Thread.new do begin yield # rubocop:disable Lint/RescueException rescue Exception => e @error = e Datadog.logger.debug("Worker thread error. Cause #{e.message} Location: #{Array(e.backtrace).first}") raise end end @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') nil end
stop_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 148 def stop_fork mutex_after_fork.synchronize do if forked? # Trigger callback to allow workers to reset themselves accordingly after_fork # Reset and turn off @pid = Process.pid @run_async = false end end end
worker()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 106 def worker @worker ||= nil end