class ArcFurnace::AllFieldsCSVSink

Public Class Methods

new(filename: , encoding: 'UTF-8', force_quotes: false) click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 8
def initialize(filename: , encoding: 'UTF-8', force_quotes: false)
  @tmp_file = Tempfile.new('intermediate_results', encoding: 'binary')
  @packer = MessagePack::Packer.new(tmp_file)
  @csv = CSV.open(filename, 'wb', encoding: encoding, headers: true, force_quotes: force_quotes)
  @fields = {}
end

Public Instance Methods

finalize() click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 15
def finalize
  packer.flush
  tmp_file.rewind

  write_header_row!

  unpacker = MessagePack::Unpacker.new(tmp_file)
  unpacker.each do |hash|
    write_row(hash)
  end

  csv.close
end
row(hash) click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 29
def row(hash)
  return unless hash.present?
  update_field_counts(hash)
  packer.write(hash)
end

Private Instance Methods

update_field_counts(hash) click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 57
def update_field_counts(hash)
  hash.each do |key, values|
    value_count = Array.wrap(values).size
    existing_value_count = fields[key] || 0
    fields[key] = value_count if value_count > existing_value_count
  end
end
write_header_row!() click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 37
def write_header_row!
  header_row = []
  fields.each do |key, count|
    count.times { header_row << key }
  end
  csv << header_row
end
write_row(hash) click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 45
def write_row(hash)
  row = []
  fields.each do |key, count|
    values = Array.wrap(hash[key.to_s])
    (values.slice(0, count) || []).each do |value|
      row << value
    end
    (count - values.length).times { row << nil }
  end
  csv << row
end