module Datadog::Workers::Async::Thread

Adds threading behavior to workers to run tasks asynchronously. rubocop:disable Metrics/ModuleLength

Constants

FORK_POLICY_RESTART
FORK_POLICY_STOP
SHUTDOWN_TIMEOUT

Attributes

error[R]
fork_policy[W]
pid[R]
result[RW]

Public Class Methods

included(base) click to toggle source
# File lib/ddtrace/workers/async.rb, line 15
def self.included(base)
  base.prepend(PrependedMethods)
end

Public Instance Methods

completed?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 68
def completed?
  !worker.nil? && worker.status == false && !error?
end
error?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 62
def error?
  return false unless instance_variable_defined?(:@error)

  !@error.nil?
end
failed?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 72
def failed?
  !worker.nil? && worker.status.nil?
end
fork_policy() click to toggle source
# File lib/ddtrace/workers/async.rb, line 80
def fork_policy
  @fork_policy ||= FORK_POLICY_STOP
end
forked?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 76
def forked?
  !pid.nil? && pid != Process.pid
end
join(timeout = nil) click to toggle source
# File lib/ddtrace/workers/async.rb, line 33
def join(timeout = nil)
  return true unless running?

  !worker.join(timeout).nil?
end
run_async?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 48
def run_async?
  return false unless instance_variable_defined?(:@run_async)

  @run_async == true
end
running?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 58
def running?
  !worker.nil? && worker.alive?
end
started?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 54
def started?
  !(worker.nil? || forked?)
end
terminate() click to toggle source
# File lib/ddtrace/workers/async.rb, line 39
def terminate
  return false unless running?

  @run_async = false
  Datadog.logger.debug { "Forcibly terminating worker thread for: #{self}" }
  worker.terminate
  true
end

Protected Instance Methods

after_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 93
def after_fork
  # Do nothing by default
end
mutex() click to toggle source
# File lib/ddtrace/workers/async.rb, line 89
def mutex
  @mutex ||= Mutex.new
end

Private Instance Methods

mutex_after_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 102
def mutex_after_fork
  @mutex_after_fork ||= Mutex.new
end
restart_after_fork(&block) click to toggle source
# File lib/ddtrace/workers/async.rb, line 161
def restart_after_fork(&block)
  mutex_after_fork.synchronize do
    if forked?
      # Trigger callback to allow workers to reset themselves accordingly
      after_fork

      # Start worker
      start_worker(&block)
    end
  end
end
start_async(&block) click to toggle source
# File lib/ddtrace/workers/async.rb, line 110
def start_async(&block)
  mutex.synchronize do
    return if running?

    if forked?
      case fork_policy
      when FORK_POLICY_STOP
        stop_fork
      when FORK_POLICY_RESTART
        restart_after_fork(&block)
      end
    elsif !run_async?
      start_worker(&block)
    end
  end
end
start_worker() { || ... } click to toggle source
# File lib/ddtrace/workers/async.rb, line 127
def start_worker
  @run_async = true
  @pid = Process.pid
  @error = nil
  Datadog.logger.debug { "Starting thread for: #{self}" }

  @worker = ::Thread.new do
    begin
      yield
    # rubocop:disable Lint/RescueException
    rescue Exception => e
      @error = e
      Datadog.logger.debug("Worker thread error. Cause #{e.message} Location: #{Array(e.backtrace).first}")
      raise
    end
  end
  @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')

  nil
end
stop_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 148
def stop_fork
  mutex_after_fork.synchronize do
    if forked?
      # Trigger callback to allow workers to reset themselves accordingly
      after_fork

      # Reset and turn off
      @pid = Process.pid
      @run_async = false
    end
  end
end
worker() click to toggle source
# File lib/ddtrace/workers/async.rb, line 106
def worker
  @worker ||= nil
end