class ArcFurnace::AllFieldsCSVSink
Public Class Methods
new(filename: , encoding: 'UTF-8', force_quotes: false)
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 8 def initialize(filename: , encoding: 'UTF-8', force_quotes: false) @tmp_file = Tempfile.new('intermediate_results', encoding: 'binary') @packer = MessagePack::Packer.new(tmp_file) @csv = CSV.open(filename, 'wb', encoding: encoding, headers: true, force_quotes: force_quotes) @fields = {} end
Public Instance Methods
finalize()
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 15 def finalize packer.flush tmp_file.rewind write_header_row! unpacker = MessagePack::Unpacker.new(tmp_file) unpacker.each do |hash| write_row(hash) end csv.close end
row(hash)
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 29 def row(hash) return unless hash.present? update_field_counts(hash) packer.write(hash) end
Private Instance Methods
update_field_counts(hash)
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 57 def update_field_counts(hash) hash.each do |key, values| value_count = Array.wrap(values).size existing_value_count = fields[key] || 0 fields[key] = value_count if value_count > existing_value_count end end
write_header_row!()
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 37 def write_header_row! header_row = [] fields.each do |key, count| count.times { header_row << key } end csv << header_row end
write_row(hash)
click to toggle source
# File lib/arc-furnace/all_fields_csv_sink.rb, line 45 def write_row(hash) row = [] fields.each do |key, count| values = Array.wrap(hash[key.to_s]) (values.slice(0, count) || []).each do |value| row << value end (count - values.length).times { row << nil } end csv << row end