class Datadog::Writer
Processor that sends traces and metadata to the agent
Constants
- DEPRECATION_WARN_ONLY_ONCE
Attributes
priority_sampler[R]
transport[R]
worker[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/ddtrace/writer.rb, line 25 def initialize(options = {}) # writer and transport parameters @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE) @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL) transport_options = options.fetch(:transport_options, {}) transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings) # priority sampling if options[:priority_sampler] @priority_sampler = options[:priority_sampler] transport_options[:api_version] ||= Transport::HTTP::API::V4 end # transport and buffers @transport = options.fetch(:transport) do Transport::HTTP.default(**transport_options) end # handles the thread creation after an eventual fork @mutex_after_fork = Mutex.new @pid = nil @traces_flushed = 0 # one worker for traces @worker = nil # Once stopped, this writer instance cannot be restarted. # This allow for graceful shutdown, while preventing # the host application from inadvertently start new # threads during shutdown. @stopped = false end
Public Instance Methods
send_spans(traces, transport)
click to toggle source
flush spans to the trace-agent, handles spans only
# File lib/ddtrace/writer.rb, line 111 def send_spans(traces, transport) return true if traces.empty? # Inject hostname if configured to do so inject_hostname!(traces) if Datadog.configuration.report_hostname # Send traces and get responses responses = transport.send_traces(traces) # Tally up successful flushes responses.reject { |x| x.internal_error? || x.server_error? }.each do |response| @traces_flushed += response.trace_count end # Update priority sampler update_priority_sampler(responses.last) record_environment_information!(responses) # Return if server error occurred. !responses.find(&:server_error?) end
start()
click to toggle source
# File lib/ddtrace/writer.rb, line 60 def start @mutex_after_fork.synchronize do return false if @stopped pid = Process.pid return if @worker && pid == @pid @pid = pid start_worker true end end
stats()
click to toggle source
stats returns a dictionary of stats about the writer.
# File lib/ddtrace/writer.rb, line 172 def stats { traces_flushed: @traces_flushed, transport: @transport.stats } end
stop()
click to toggle source
Gracefully shuts down this writer.
Once stopped methods calls won't fail, but no internal work will be performed.
It is not possible to restart a stopped writer instance.
# File lib/ddtrace/writer.rb, line 93 def stop @mutex_after_fork.synchronize { stop_worker } end
write(trace, services = nil)
click to toggle source
enqueue the trace for submission to the API
# File lib/ddtrace/writer.rb, line 135 def write(trace, services = nil) unless services.nil? DEPRECATION_WARN_ONLY_ONCE.run do Datadog.logger.warn(%( write: Writing services has been deprecated and no longer need to be provided. write(traces, services) can be updated to write(traces) )) end end # In multiprocess environments, the main process initializes the +Writer+ instance and if # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new # processes will share the same +Writer+ until the first write (COW). Because of that, # each process owns a different copy of the +@buffer+ after each write and so the # +AsyncTransport+ will not send data to the trace agent. # # This check ensures that if a process doesn't own the current +Writer+, async workers # will be initialized again (but only once for each process). start if @worker.nil? || @pid != Process.pid # TODO: Remove this, and have the tracer pump traces directly to runtime metrics # instead of working through the trace writer. # Associate root span with runtime metrics if Datadog.configuration.runtime_metrics.enabled && !trace.empty? Datadog.runtime_metrics.associate_with_span(trace.first) end worker_local = @worker if worker_local worker_local.enqueue_trace(trace) elsif !@stopped Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete') end end
Private Instance Methods
inject_hostname!(traces)
click to toggle source
# File lib/ddtrace/writer.rb, line 181 def inject_hostname!(traces) traces.each do |trace| next if trace.first.nil? hostname = Datadog::Core::Environment::Socket.hostname trace.first.set_tag(Ext::NET::TAG_HOSTNAME, hostname) unless hostname.nil? || hostname.empty? end end
record_environment_information!(responses)
click to toggle source
# File lib/ddtrace/writer.rb, line 196 def record_environment_information!(responses) Diagnostics::EnvironmentLogger.log!(responses) end
start_worker()
click to toggle source
spawns a worker for spans; they share the same transport which is thread-safe
# File lib/ddtrace/writer.rb, line 75 def start_worker @trace_handler = ->(items, transport) { send_spans(items, transport) } @worker = Datadog::Workers::AsyncTransport.new( transport: @transport, buffer_size: @buff_size, on_trace: @trace_handler, interval: @flush_interval ) @worker.start end
stop_worker()
click to toggle source
# File lib/ddtrace/writer.rb, line 97 def stop_worker @stopped = true return if @worker.nil? @worker.stop @worker = nil true end
update_priority_sampler(response)
click to toggle source
# File lib/ddtrace/writer.rb, line 190 def update_priority_sampler(response) return unless response && !response.internal_error? && priority_sampler && response.service_rates priority_sampler.update(response.service_rates) end