class Datadog::Workers::AsyncTraceWriter

Writes traces to transport asynchronously, using a thread & buffer.

Constants

DEFAULT_BUFFER_MAX_SIZE
FORK_POLICY_ASYNC
FORK_POLICY_SYNC

Attributes

async[W]

Public Class Methods

new(options = {}) click to toggle source
Calls superclass method Datadog::Workers::TraceWriter::new
# File lib/ddtrace/workers/trace_writer.rb, line 97
def initialize(options = {})
  # Workers::TraceWriter settings
  super

  # Workers::Polling settings
  self.enabled = options.fetch(:enabled, true)

  # Workers::Async::Thread settings
  @async = true
  self.fork_policy = options.fetch(:fork_policy, FORK_POLICY_ASYNC)

  # Workers::IntervalLoop settings
  self.loop_base_interval = options[:interval] if options.key?(:interval)
  self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio)
  self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max)

  # Workers::Queue settings
  @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  self.buffer = TraceBuffer.new(@buffer_size)
end

Public Instance Methods

after_fork() click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 175
def after_fork
  # In multiprocess environments, forks will share the same buffer until its written to.
  # A.K.A. copy-on-write. We don't want forks to write traces generated from another process.
  # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.)
  self.buffer = TraceBuffer.new(@buffer_size)

  # Switch to synchronous mode if configured to do so.
  # In some cases synchronous writing is preferred because the fork will be short lived.
  @async = false if @writer_fork_policy == FORK_POLICY_SYNC
end
async?() click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 151
def async?
  @async == true
end
dequeue() click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 140
def dequeue
  # Wrap results in Array because they are
  # splatted as args against TraceWriter#perform.
  [buffer.pop]
end
enqueue(trace) click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 136
def enqueue(trace)
  buffer.push(trace)
end
fork_policy=(policy) click to toggle source
Calls superclass method
# File lib/ddtrace/workers/trace_writer.rb, line 155
def fork_policy=(policy)
  # Translate to Workers::Async::Thread policy
  thread_fork_policy = case policy
                       when Workers::Async::Thread::FORK_POLICY_STOP
                         policy
                       when FORK_POLICY_SYNC
                         # Stop the async thread because the writer
                         # will bypass and run synchronously.
                         Workers::Async::Thread::FORK_POLICY_STOP
                       else
                         Workers::Async::Thread::FORK_POLICY_RESTART
                       end

  # Update thread fork policy
  super(thread_fork_policy)

  # Update local policy
  @writer_fork_policy = policy
end
perform(traces) click to toggle source

NOTE: perform is wrapped by other modules:

Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter

WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#perform is spec'd to return the result from the writer, whereas this method always returns nil.

Calls superclass method Datadog::Workers::TraceWriter#perform
# File lib/ddtrace/workers/trace_writer.rb, line 123
def perform(traces)
  super(traces).tap do |responses|
    loop_back_off! if responses.find(&:server_error?)
  end

  nil
end
stop(*args) click to toggle source
Calls superclass method Datadog::Workers::Polling#stop
# File lib/ddtrace/workers/trace_writer.rb, line 131
def stop(*args)
  buffer.close if running?
  super
end
work_pending?() click to toggle source

Are there more traces to be processed next?

# File lib/ddtrace/workers/trace_writer.rb, line 147
def work_pending?
  !buffer.empty?
end
write(trace) click to toggle source

WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#write is spec'd to return the result from the writer, whereas this method returns something else when running in async mode.

# File lib/ddtrace/workers/trace_writer.rb, line 188
def write(trace)
  # Start worker thread. If the process has forked, it will trigger #after_fork to
  # reconfigure the worker accordingly.
  # NOTE: It's important we do this before queuing or it will drop the current trace,
  #       because #after_fork resets the buffer.
  perform

  # Queue the trace if running asynchronously, otherwise short-circuit and write it directly.
  async? ? enqueue(trace) : write_traces([trace])
end