class Datadog::Workers::AsyncTransport
Asynchronous worker that executes a +Send()+ operation after given seconds. Under the hood, it uses Concurrent::TimerTask
so that the thread will perform a task at regular intervals. The thread can be stopped with the +stop()+ method and can start with the +start()+ method.
Constants
- BACK_OFF_MAX
- BACK_OFF_RATIO
- DEFAULT_BUFFER_MAX_SIZE
- DEFAULT_FLUSH_INTERVAL
- DEFAULT_TIMEOUT
- SHUTDOWN_TIMEOUT
Attributes
trace_buffer[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/ddtrace/workers.rb, line 27 def initialize(options = {}) @transport = options[:transport] # Callbacks @trace_task = options[:on_trace] # Intervals interval = options.fetch(:interval, DEFAULT_FLUSH_INTERVAL) @flush_interval = interval @back_off = interval # Buffers buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) @trace_buffer = TraceBuffer.new(buffer_size) # Threading @shutdown = ConditionVariable.new @mutex = Mutex.new @worker = nil @run = false end
Public Instance Methods
callback_traces()
click to toggle source
Callback function that process traces and executes the +send_traces()+ method.
# File lib/ddtrace/workers.rb, line 50 def callback_traces return true if @trace_buffer.empty? begin traces = @trace_buffer.pop traces = Pipeline.process!(traces) @trace_task.call(traces, @transport) unless @trace_task.nil? rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog.logger.error( "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{Array(e.backtrace).first}" ) end end
Also aliased as: flush_data
enqueue_trace(trace)
click to toggle source
Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer
data structure.
# File lib/ddtrace/workers.rb, line 102 def enqueue_trace(trace) @trace_buffer.push(trace) end
join()
click to toggle source
Block until executor shutdown is complete or until timeout seconds have passed.
# File lib/ddtrace/workers.rb, line 96 def join @worker.join(SHUTDOWN_TIMEOUT) end
start()
click to toggle source
Start the timer execution.
# File lib/ddtrace/workers.rb, line 68 def start @mutex.synchronize do return if @run @run = true Datadog.logger.debug { "Starting thread for: #{self}" } @worker = Thread.new { perform } @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') nil end end
stop()
click to toggle source
Closes all available queues and waits for the trace buffer to flush
# File lib/ddtrace/workers.rb, line 82 def stop @mutex.synchronize do return unless @run @trace_buffer.close @run = false @shutdown.signal end join true end
Private Instance Methods
perform()
click to toggle source
# File lib/ddtrace/workers.rb, line 110 def perform loop do @back_off = flush_data ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min @mutex.synchronize do return if !@run && @trace_buffer.empty? @shutdown.wait(@mutex, @back_off) if @run # do not wait when shutting down end end end