class Dataflow::Nodes::Export::ToCsvNode
Export
a dataset to CSV
Public Instance Methods
compute_impl()
click to toggle source
# File lib/dataflow/nodes/export/to_csv_node.rb, line 15 def compute_impl node = dependencies.first where = JSON.parse(query) # fetch the schema sch = if keys.present? keys.map { |k| [k, { type: 'string' }] }.to_h else node.infer_partial_schema(where: where, extended: true) end # create the dataset csv_adapter = Adapters::CsvAdapter.new(data_node: node) csv_adapter.set_schema(sch) csv_adapter.recreate_dataset # export in parallel max_per_process = 1000 max_per_process = limit_per_process if limit_per_process < 0 data_count = [node.count(where: where), 1].max equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = node.ordered_system_id_queries(batch_size: count_per_process) system_id = node.send(:db_adapter).class::SYSTEM_ID parallel_each(queries.each_with_index) do |query, idx| # TODO: re-enabled event on_export_progressed # progress = (idx / queries.count.to_f * 100).ceil # on_export_progressed(pct_complete: progress) fields = transform_fields(node.db_backend, sch.keys) batch = node.all(where: query.merge(where), fields: fields, sort: { system_id => 1 }) csv_adapter.save(records: batch, part: idx.to_s.rjust(queries.count.to_s.length, "0")) end # needed by the csv exporter to finalize in a single file csv_adapter.on_save_finished end
transform_fields(db_backend, keys)
click to toggle source
Transform the keys to the field that need to be selected on the backend. That's a fix meant especially for selecting nested values on mongo
# File lib/dataflow/nodes/export/to_csv_node.rb, line 59 def transform_fields(db_backend, keys) return keys unless db_backend == :mongodb # replace the separator with a dot and make sure we don't select individual # array keys... it seems to breakdown mongodb keys.map { |k| k.gsub(Dataflow::SchemaMixin::SEPARATOR, '.') } .map { |k| k.gsub(/\.[0-9]+/, '') }.uniq end