class Datadog::Workers::AsyncTransport

Asynchronous worker that executes a +Send()+ operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the +stop()+ method and can start with the +start()+ method.

Constants

BACK_OFF_MAX
BACK_OFF_RATIO
DEFAULT_BUFFER_MAX_SIZE
DEFAULT_FLUSH_INTERVAL
DEFAULT_TIMEOUT
SHUTDOWN_TIMEOUT

Attributes

trace_buffer[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/ddtrace/workers.rb, line 27
def initialize(options = {})
  @transport = options[:transport]

  # Callbacks
  @trace_task = options[:on_trace]

  # Intervals
  interval = options.fetch(:interval, DEFAULT_FLUSH_INTERVAL)
  @flush_interval = interval
  @back_off = interval

  # Buffers
  buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  @trace_buffer = TraceBuffer.new(buffer_size)

  # Threading
  @shutdown = ConditionVariable.new
  @mutex = Mutex.new
  @worker = nil
  @run = false
end

Public Instance Methods

callback_traces() click to toggle source

Callback function that process traces and executes the +send_traces()+ method.

# File lib/ddtrace/workers.rb, line 50
def callback_traces
  return true if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop
    traces = Pipeline.process!(traces)
    @trace_task.call(traces, @transport) unless @trace_task.nil?
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog.logger.error(
      "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{Array(e.backtrace).first}"
    )
  end
end
Also aliased as: flush_data
enqueue_trace(trace) click to toggle source

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.

# File lib/ddtrace/workers.rb, line 102
def enqueue_trace(trace)
  @trace_buffer.push(trace)
end
join() click to toggle source

Block until executor shutdown is complete or until timeout seconds have passed.

# File lib/ddtrace/workers.rb, line 96
def join
  @worker.join(SHUTDOWN_TIMEOUT)
end
start() click to toggle source

Start the timer execution.

# File lib/ddtrace/workers.rb, line 68
def start
  @mutex.synchronize do
    return if @run

    @run = true
    Datadog.logger.debug { "Starting thread for: #{self}" }
    @worker = Thread.new { perform }
    @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')

    nil
  end
end
stop() click to toggle source

Closes all available queues and waits for the trace buffer to flush

# File lib/ddtrace/workers.rb, line 82
def stop
  @mutex.synchronize do
    return unless @run

    @trace_buffer.close
    @run = false
    @shutdown.signal
  end

  join
  true
end

Private Instance Methods

flush_data()
Alias for: callback_traces
perform() click to toggle source
# File lib/ddtrace/workers.rb, line 110
def perform
  loop do
    @back_off = flush_data ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min

    @mutex.synchronize do
      return if !@run && @trace_buffer.empty?

      @shutdown.wait(@mutex, @back_off) if @run # do not wait when shutting down
    end
  end
end