class Datadog::Writer

Processor that sends traces and metadata to the agent

Constants

DEPRECATION_WARN_ONLY_ONCE

Attributes

priority_sampler[R]
transport[R]
worker[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/ddtrace/writer.rb, line 25
def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)

  # priority sampling
  if options[:priority_sampler]
    @priority_sampler = options[:priority_sampler]
    transport_options[:api_version] ||= Transport::HTTP::API::V4
  end

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(**transport_options)
  end

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil

  # Once stopped, this writer instance cannot be restarted.
  # This allow for graceful shutdown, while preventing
  # the host application from inadvertently start new
  # threads during shutdown.
  @stopped = false
end

Public Instance Methods

send_spans(traces, transport) click to toggle source

flush spans to the trace-agent, handles spans only

# File lib/ddtrace/writer.rb, line 111
def send_spans(traces, transport)
  return true if traces.empty?

  # Inject hostname if configured to do so
  inject_hostname!(traces) if Datadog.configuration.report_hostname

  # Send traces and get responses
  responses = transport.send_traces(traces)

  # Tally up successful flushes
  responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
    @traces_flushed += response.trace_count
  end

  # Update priority sampler
  update_priority_sampler(responses.last)

  record_environment_information!(responses)

  # Return if server error occurred.
  !responses.find(&:server_error?)
end
start() click to toggle source
# File lib/ddtrace/writer.rb, line 60
def start
  @mutex_after_fork.synchronize do
    return false if @stopped

    pid = Process.pid
    return if @worker && pid == @pid

    @pid = pid

    start_worker
    true
  end
end
stats() click to toggle source

stats returns a dictionary of stats about the writer.

# File lib/ddtrace/writer.rb, line 172
def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end
stop() click to toggle source

Gracefully shuts down this writer.

Once stopped methods calls won't fail, but no internal work will be performed.

It is not possible to restart a stopped writer instance.

# File lib/ddtrace/writer.rb, line 93
def stop
  @mutex_after_fork.synchronize { stop_worker }
end
write(trace, services = nil) click to toggle source

enqueue the trace for submission to the API

# File lib/ddtrace/writer.rb, line 135
def write(trace, services = nil)
  unless services.nil?
    DEPRECATION_WARN_ONLY_ONCE.run do
      Datadog.logger.warn(%(
        write: Writing services has been deprecated and no longer need to be provided.
        write(traces, services) can be updated to write(traces)
      ))
    end
  end

  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  start if @worker.nil? || @pid != Process.pid

  # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
  #       instead of working through the trace writer.
  # Associate root span with runtime metrics
  if Datadog.configuration.runtime_metrics.enabled && !trace.empty?
    Datadog.runtime_metrics.associate_with_span(trace.first)
  end

  worker_local = @worker

  if worker_local
    worker_local.enqueue_trace(trace)
  elsif !@stopped
    Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
  end
end

Private Instance Methods

inject_hostname!(traces) click to toggle source
# File lib/ddtrace/writer.rb, line 181
def inject_hostname!(traces)
  traces.each do |trace|
    next if trace.first.nil?

    hostname = Datadog::Core::Environment::Socket.hostname
    trace.first.set_tag(Ext::NET::TAG_HOSTNAME, hostname) unless hostname.nil? || hostname.empty?
  end
end
record_environment_information!(responses) click to toggle source
# File lib/ddtrace/writer.rb, line 196
def record_environment_information!(responses)
  Diagnostics::EnvironmentLogger.log!(responses)
end
start_worker() click to toggle source

spawns a worker for spans; they share the same transport which is thread-safe

# File lib/ddtrace/writer.rb, line 75
def start_worker
  @trace_handler = ->(items, transport) { send_spans(items, transport) }
  @worker = Datadog::Workers::AsyncTransport.new(
    transport: @transport,
    buffer_size: @buff_size,
    on_trace: @trace_handler,
    interval: @flush_interval
  )

  @worker.start
end
stop_worker() click to toggle source
# File lib/ddtrace/writer.rb, line 97
def stop_worker
  @stopped = true

  return if @worker.nil?

  @worker.stop
  @worker = nil

  true
end
update_priority_sampler(response) click to toggle source
# File lib/ddtrace/writer.rb, line 190
def update_priority_sampler(response)
  return unless response && !response.internal_error? && priority_sampler && response.service_rates

  priority_sampler.update(response.service_rates)
end