class Dataflow::Adapters::CsvAdapter
Interface between a data node and csv. We use mongodb to perform all the store/retrieve operations.
Attributes
settings[R]
Public Class Methods
new(args)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 13 def initialize(args) # make sure the CsvPath exist `mkdir -p #{Dataflow::CsvPath}` update_settings(args) end
Public Instance Methods
all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false)
click to toggle source
retrieve all elements from a data node
# File lib/dataflow/adapters/csv_adapter.rb, line 34 def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) SmarterCSV.process(file_path, strings_as_keys: true) rescue Errno::ENOENT => e [] end
count(where: {})
click to toggle source
count the number of records
# File lib/dataflow/adapters/csv_adapter.rb, line 41 def count(where: {}) all(where: where).count end
create_indexes(*)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 65 def create_indexes(*); end
find(where: opts = {})
click to toggle source
retrieve a single element from a data node
# File lib/dataflow/adapters/csv_adapter.rb, line 29 def find(where: opts = {}) raise NotImplementedError, '#find is not yet support on CSV.' end
on_save_finished()
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 50 def on_save_finished write_single_csv(keys: @schema.keys) end
recreate_dataset(dataset: nil)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 58 def recreate_dataset(dataset: nil) # simply delete the file delete_file(file_path) # and any parts if any is still there file_parts.each { |part| delete_file(part) } end
remove(_opts = {})
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 54 def remove(_opts = {}) raise NotImplementedError, '#find is not yet support on CSV.' end
save(records:, part: nil)
click to toggle source
save the given records
# File lib/dataflow/adapters/csv_adapter.rb, line 46 def save(records:, part: nil) write_csv_part(records, keys: @schema.keys, part: part) end
set_schema(schema)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 24 def set_schema(schema) @schema = schema end
update_settings(args)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 19 def update_settings(args) @settings = Dataflow::Adapters::Settings.new(args) @schema = [] # TODO: pre-fetch the csv's schema end
Private Instance Methods
delete_file(path)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 69 def delete_file(path) File.delete(path) rescue Errno::ENOENT => e # no file present, no problem end
file_parts()
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 80 def file_parts part = "#{settings.db_name}.#{settings.dataset_name}.csv.part_" Dir["#{file_path}.part_*"].sort end
file_path()
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 75 def file_path filename = "#{settings.db_name}.#{settings.dataset_name}.csv" "#{Dataflow::CsvPath}/#{filename}" end
write_csv_part(data, keys:, part:)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 85 def write_csv_part(data, keys:, part:) # prepare the data key_tokens = keys.map { |key| record_dig_tokens(key: key) } rows = data.map do |datum| key_tokens.map { |tokens| datum.dig(*tokens) } end # dump in a part file part ||= SecureRandom.hex CSV.open("#{file_path}.part_#{part}", 'w') do |csv| rows.each { |row| csv << row } end end
write_single_csv(keys:)
click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 99 def write_single_csv(keys:) # export headers header_filepath = "#{file_path}.header" CSV.open(header_filepath, 'w') do |csv| csv << keys end # make sure the destination file is deleted delete_file(file_path) # merge the files into the output files = [header_filepath] + file_parts files.each do |file| # cat each file to the destination file `cat #{file} >> #{file_path}` end # remove the intermediary files files.each do |file| delete_file(file) end end