class Fourtrack::Recorder

A class used as a destination for periodically writing out batch-written JSON formattable payloads. Can be used for stats logs, SQL replay logs and the like. Is thread safe and uses gzip compression. Writes are performed using a binary UNIX append, which with a small-ish record size should guarantee atomic append.

Constants

NULL_LOGGER

Public Class Methods

new(output_path:, flush_after:, logger: NULL_LOGGER) click to toggle source
# File lib/fourtrack/recorder.rb, line 8
def initialize(output_path:, flush_after:, logger: NULL_LOGGER)
  @output_path = File.expand_path(output_path)
  @pid_at_create = Process.pid
  @logger = logger
  @buf = []
  @mux = Mutex.new
  @flush_every = flush_after
  # Attempt to open the file for writing,
  # which will raise an exception outright if we do not have access
  File.open(output_path, 'a') {}
  # and once we know we were able to open it, install an at_exit block for ourselves
  install_at_exit_hook!
end

Public Instance Methods

<<(payload) click to toggle source
# File lib/fourtrack/recorder.rb, line 27
def <<(payload)
  # Get the current PID.
  mypid = Process.pid
  len_so_far  = @mux.synchronize {
    # If the current PID doesn't match the one
    # that was set at instantiation, it means the process was
    # forked and we now possible also hold records for the parent
    # process, which we have to discard (it is the responsibility
    # of the parent to flush it's records, not ours!).
    if mypid != @pid_at_create
      @pid_at_create = mypid
      @buf.clear
    end
    @buf << payload
    @buf.length
  }
  flush! if len_so_far > @flush_every
  self
end
flush!() click to toggle source
# File lib/fourtrack/recorder.rb, line 47
def flush!
  # Refuse to flush and empty the buffer if flush! is called
  # within a child and the object still has records pending
  mypid = Process.pid
  if mypid != @pid_at_create
    @logger.debug { "%s: Flush requested child PID %d, will inhibit flush and empty the record log first" % mypid }
    # Do not flush since we are in the child now
    @mux.synchronize { @buf.clear }
    return
  end

  io_buf = StringIO.new

  @mux.synchronize do
    @logger.debug { "%s: Compressing %d records from PID %d" % [self, @buf.length, Process.pid] }
    z = Zlib::GzipWriter.new(io_buf)
    @buf.each {|record| z.puts(record) }
    z.finish
    @buf.clear
  end

  File.open(output_path, 'ab') { |f| f << io_buf.string }
  @logger.debug { "%s: After flush to %s size %d" % [self, output_path, File.size(output_path)] }

  io_buf.truncate(0)
end
pending?() click to toggle source
# File lib/fourtrack/recorder.rb, line 22
def pending?
  len = @mux.synchronize { @buf.length }
  len.nonzero?
end

Private Instance Methods

install_at_exit_hook!() click to toggle source
# File lib/fourtrack/recorder.rb, line 80
def install_at_exit_hook!
  at_exit { flush! if pending? }
end
output_path() click to toggle source
# File lib/fourtrack/recorder.rb, line 76
def output_path
  @output_path
end