class Fourtrack::Recorder
A class used as a destination for periodically writing out batch-written JSON formattable payloads. Can be used for stats logs, SQL replay logs and the like. Is thread safe and uses gzip compression. Writes are performed using a binary UNIX append, which with a small-ish record size should guarantee atomic append.
Constants
- NULL_LOGGER
Public Class Methods
new(output_path:, flush_after:, logger: NULL_LOGGER)
click to toggle source
# File lib/fourtrack/recorder.rb, line 8 def initialize(output_path:, flush_after:, logger: NULL_LOGGER) @output_path = File.expand_path(output_path) @pid_at_create = Process.pid @logger = logger @buf = [] @mux = Mutex.new @flush_every = flush_after # Attempt to open the file for writing, # which will raise an exception outright if we do not have access File.open(output_path, 'a') {} # and once we know we were able to open it, install an at_exit block for ourselves install_at_exit_hook! end
Public Instance Methods
<<(payload)
click to toggle source
# File lib/fourtrack/recorder.rb, line 27 def <<(payload) # Get the current PID. mypid = Process.pid len_so_far = @mux.synchronize { # If the current PID doesn't match the one # that was set at instantiation, it means the process was # forked and we now possible also hold records for the parent # process, which we have to discard (it is the responsibility # of the parent to flush it's records, not ours!). if mypid != @pid_at_create @pid_at_create = mypid @buf.clear end @buf << payload @buf.length } flush! if len_so_far > @flush_every self end
flush!()
click to toggle source
# File lib/fourtrack/recorder.rb, line 47 def flush! # Refuse to flush and empty the buffer if flush! is called # within a child and the object still has records pending mypid = Process.pid if mypid != @pid_at_create @logger.debug { "%s: Flush requested child PID %d, will inhibit flush and empty the record log first" % mypid } # Do not flush since we are in the child now @mux.synchronize { @buf.clear } return end io_buf = StringIO.new @mux.synchronize do @logger.debug { "%s: Compressing %d records from PID %d" % [self, @buf.length, Process.pid] } z = Zlib::GzipWriter.new(io_buf) @buf.each {|record| z.puts(record) } z.finish @buf.clear end File.open(output_path, 'ab') { |f| f << io_buf.string } @logger.debug { "%s: After flush to %s size %d" % [self, output_path, File.size(output_path)] } io_buf.truncate(0) end
pending?()
click to toggle source
# File lib/fourtrack/recorder.rb, line 22 def pending? len = @mux.synchronize { @buf.length } len.nonzero? end
Private Instance Methods
install_at_exit_hook!()
click to toggle source
# File lib/fourtrack/recorder.rb, line 80 def install_at_exit_hook! at_exit { flush! if pending? } end
output_path()
click to toggle source
# File lib/fourtrack/recorder.rb, line 76 def output_path @output_path end