class Dataflow::Adapters::CsvAdapter

Interface between a data node and csv. We use mongodb to perform all the store/retrieve operations.

Attributes

settings[R]

Public Class Methods

new(args) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 13
def initialize(args)
  # make sure the CsvPath exist
  `mkdir -p #{Dataflow::CsvPath}`
  update_settings(args)
end

Public Instance Methods

all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) click to toggle source

retrieve all elements from a data node

# File lib/dataflow/adapters/csv_adapter.rb, line 34
def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false)
  SmarterCSV.process(file_path, strings_as_keys: true)
rescue Errno::ENOENT => e
  []
end
count(where: {}) click to toggle source

count the number of records

# File lib/dataflow/adapters/csv_adapter.rb, line 41
def count(where: {})
  all(where: where).count
end
create_indexes(*) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 65
def create_indexes(*); end
find(where: opts = {}) click to toggle source

retrieve a single element from a data node

# File lib/dataflow/adapters/csv_adapter.rb, line 29
def find(where: opts = {})
  raise NotImplementedError, '#find is not yet support on CSV.'
end
on_save_finished() click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 50
def on_save_finished
  write_single_csv(keys: @schema.keys)
end
recreate_dataset(dataset: nil) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 58
def recreate_dataset(dataset: nil)
  # simply delete the file
  delete_file(file_path)
  # and any parts if any is still there
  file_parts.each { |part| delete_file(part) }
end
remove(_opts = {}) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 54
def remove(_opts = {})
  raise NotImplementedError, '#find is not yet support on CSV.'
end
save(records:, part: nil) click to toggle source

save the given records

# File lib/dataflow/adapters/csv_adapter.rb, line 46
def save(records:, part: nil)
  write_csv_part(records, keys: @schema.keys, part: part)
end
set_schema(schema) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 24
def set_schema(schema)
  @schema = schema
end
update_settings(args) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 19
def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
  @schema = [] # TODO: pre-fetch the csv's schema
end

Private Instance Methods

delete_file(path) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 69
def delete_file(path)
  File.delete(path)
rescue Errno::ENOENT => e
  # no file present, no problem
end
file_parts() click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 80
def file_parts
  part = "#{settings.db_name}.#{settings.dataset_name}.csv.part_"
  Dir["#{file_path}.part_*"].sort
end
file_path() click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 75
def file_path
  filename = "#{settings.db_name}.#{settings.dataset_name}.csv"
  "#{Dataflow::CsvPath}/#{filename}"
end
write_csv_part(data, keys:, part:) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 85
def write_csv_part(data, keys:, part:)
  # prepare the data
  key_tokens = keys.map { |key| record_dig_tokens(key: key) }
  rows = data.map do |datum|
    key_tokens.map { |tokens| datum.dig(*tokens) }
  end

  # dump in a part file
  part ||= SecureRandom.hex
  CSV.open("#{file_path}.part_#{part}", 'w') do |csv|
    rows.each { |row| csv << row }
  end
end
write_single_csv(keys:) click to toggle source
# File lib/dataflow/adapters/csv_adapter.rb, line 99
def write_single_csv(keys:)
  # export headers
  header_filepath = "#{file_path}.header"
  CSV.open(header_filepath, 'w') do |csv|
    csv << keys
  end

  # make sure the destination file is deleted
  delete_file(file_path)

  # merge the files into the output
  files = [header_filepath] + file_parts
  files.each do |file|
    # cat each file to the destination file
    `cat #{file} >> #{file_path}`
  end

  # remove the intermediary files
  files.each do |file|
    delete_file(file)
  end
end