class Sqreen::Kit::Signals::BatchCollector::ProcessingLoop
Public Class Methods
new(collector)
click to toggle source
@param [BatchCollector] collector
# File lib/sqreen/kit/signals/batch_collector.rb, line 66 def initialize(collector) @collector = collector @next_batch = [] @deadline = nil end
Public Instance Methods
flush_size()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 80 def flush_size @collector.flush_size end
max_batch_size()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 76 def max_batch_size @collector.max_batch_size end
queue()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 72 def queue @collector.queue end
run()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 84 def run while run_loop_once; end logger.info 'Collector thread exiting' end
Private Instance Methods
run_loop_once()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 91 def run_loop_once el = queue.pop(@deadline) if el.nil? # deadline passed submit elsif el.equal?(EXIT_SENTINEL) return false else # a signal or a trace if @next_batch.empty? # first object, set a deadline @deadline = Time.now.to_f + @collector.max_delay_s end @next_batch << el # drain the queue completely until @next_batch.size >= max_batch_size || (el = queue.pop_nb).nil? if el.equal?(EXIT_SENTINEL) queue << EXIT_SENTINEL # push it back break end @next_batch << el end submit if @next_batch.size >= flush_size end true end
submit()
click to toggle source
# File lib/sqreen/kit/signals/batch_collector.rb, line 119 def submit logger.debug { "Batch submit. Batch size: #{@next_batch.size}" } @deadline = nil return if @next_batch.empty? cur_batch = @next_batch @next_batch = [] @collector.auth_sig_client.report_batch(cur_batch) end