class Datadog::Workers::AsyncTraceWriter
Writes traces to transport asynchronously, using a thread & buffer.
Constants
- DEFAULT_BUFFER_MAX_SIZE
- FORK_POLICY_ASYNC
- FORK_POLICY_SYNC
Attributes
async[W]
Public Class Methods
new(options = {})
click to toggle source
Calls superclass method
Datadog::Workers::TraceWriter::new
# File lib/ddtrace/workers/trace_writer.rb, line 97 def initialize(options = {}) # Workers::TraceWriter settings super # Workers::Polling settings self.enabled = options.fetch(:enabled, true) # Workers::Async::Thread settings @async = true self.fork_policy = options.fetch(:fork_policy, FORK_POLICY_ASYNC) # Workers::IntervalLoop settings self.loop_base_interval = options[:interval] if options.key?(:interval) self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio) self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max) # Workers::Queue settings @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) self.buffer = TraceBuffer.new(@buffer_size) end
Public Instance Methods
after_fork()
click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 175 def after_fork # In multiprocess environments, forks will share the same buffer until its written to. # A.K.A. copy-on-write. We don't want forks to write traces generated from another process. # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.) self.buffer = TraceBuffer.new(@buffer_size) # Switch to synchronous mode if configured to do so. # In some cases synchronous writing is preferred because the fork will be short lived. @async = false if @writer_fork_policy == FORK_POLICY_SYNC end
async?()
click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 151 def async? @async == true end
dequeue()
click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 140 def dequeue # Wrap results in Array because they are # splatted as args against TraceWriter#perform. [buffer.pop] end
enqueue(trace)
click to toggle source
# File lib/ddtrace/workers/trace_writer.rb, line 136 def enqueue(trace) buffer.push(trace) end
fork_policy=(policy)
click to toggle source
Calls superclass method
# File lib/ddtrace/workers/trace_writer.rb, line 155 def fork_policy=(policy) # Translate to Workers::Async::Thread policy thread_fork_policy = case policy when Workers::Async::Thread::FORK_POLICY_STOP policy when FORK_POLICY_SYNC # Stop the async thread because the writer # will bypass and run synchronously. Workers::Async::Thread::FORK_POLICY_STOP else Workers::Async::Thread::FORK_POLICY_RESTART end # Update thread fork policy super(thread_fork_policy) # Update local policy @writer_fork_policy = policy end
perform(traces)
click to toggle source
NOTE: perform
is wrapped by other modules:
Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter
WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#perform
is spec'd to return the result from the writer, whereas this method always returns nil.
Calls superclass method
Datadog::Workers::TraceWriter#perform
# File lib/ddtrace/workers/trace_writer.rb, line 123 def perform(traces) super(traces).tap do |responses| loop_back_off! if responses.find(&:server_error?) end nil end
stop(*args)
click to toggle source
Calls superclass method
Datadog::Workers::Polling#stop
# File lib/ddtrace/workers/trace_writer.rb, line 131 def stop(*args) buffer.close if running? super end
work_pending?()
click to toggle source
Are there more traces to be processed next?
# File lib/ddtrace/workers/trace_writer.rb, line 147 def work_pending? !buffer.empty? end
write(trace)
click to toggle source
WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#write
is spec'd to return the result from the writer, whereas this method returns something else when running in async mode.
# File lib/ddtrace/workers/trace_writer.rb, line 188 def write(trace) # Start worker thread. If the process has forked, it will trigger #after_fork to # reconfigure the worker accordingly. # NOTE: It's important we do this before queuing or it will drop the current trace, # because #after_fork resets the buffer. perform # Queue the trace if running asynchronously, otherwise short-circuit and write it directly. async? ? enqueue(trace) : write_traces([trace]) end