module Dataflow

Override the constantize in active_support/inflector/methods.rb to rescue from Dataflow::Nodes::… name errors. In such cases, we return a generic Dataflow::Nodes::DataNode instead. This is used within mongoid to instance the correct node types.

Constants

CsvPath
VERSION

Public Class Methods

clear_tmp_datasets() click to toggle source

helper that helps clearing un-used datasets NOTE: although there is a best attempt to not delete datasets that are currently being written to, this is not safe to use while executing in parallel.

# File lib/dataflow-rb.rb, line 80
def self.clear_tmp_datasets
  Dataflow::Nodes::DataNode.all.each(&:safely_clear_write_dataset)
end
compute_node(id) click to toggle source

helper that tries to find a computed node by id and then name

# File lib/dataflow-rb.rb, line 71
def self.compute_node(id)
  Dataflow::Nodes::ComputeNode.find(id)
rescue Mongoid::Errors::DocumentNotFound
  Dataflow::Nodes::ComputeNode.find_by(name: id)
end
data_node(id) click to toggle source

helper that tries to find a data node by id and then by name

# File lib/dataflow-rb.rb, line 64
def self.data_node(id)
  Dataflow::Nodes::DataNode.find(id)
rescue Mongoid::Errors::DocumentNotFound
  Dataflow::Nodes::DataNode.find_by(name: id)
end
export(nodes:, export_dir: './flows', include_data: false) click to toggle source

Exports nodes and their data. Use import to re-import them elsewhere.

# File lib/dataflow-rb.rb, line 85
def self.export(nodes:, export_dir: './flows', include_data: false)
  raise ArgumentError, 'nodes must be an array of nodes' unless nodes.is_a?(Array)
  # make a tmp folder with the export dir
  archive_name = "flow_#{Time.now.strftime("%Y-%m-%d_%H-%M-%S")}"
  tmp_dir = "#{export_dir}/#{archive_name}"
  `mkdir -p #{tmp_dir}`

  # export all the dependencies
  all_nodes = nodes + nodes.flat_map(&:all_dependencies)
  # and all the compute node's datasets
  all_nodes += all_nodes.select { |x| x.is_a?(Dataflow::Nodes::ComputeNode) }
                        .map { |x| x.data_node }
  # get all the nodes' metadata in the yaml format
  metadata_yaml = all_nodes.compact.uniq.map(&:metadata).to_yaml
  File.write("#{tmp_dir}/metadata.yaml", metadata_yaml)

  # add the dataset's data if necessary
  if include_data
    all_nodes.select { |x| x.is_a?(Dataflow::Nodes::DataNode) }
             .each { |x| x.dump_dataset(base_folder: tmp_dir) }
  end

  # pack all the content in a tar archive
  archive_path = "#{archive_name}.tar"
  `(cd #{export_dir} && tar -cvf #{archive_path} #{archive_name})`

  # clear the tmp folder
  `rm -rf #{tmp_dir}`

  "#{export_dir}/#{archive_path}"
end
import(archive_path:) click to toggle source
# File lib/dataflow-rb.rb, line 117
def self.import(archive_path:)
  raise ArgumentError, 'expecting a tar archive file' unless archive_path.end_with?('.tar')

  # extract the tar
  folder_name = archive_path.split('/')[-1].split('.')[0]
  `tar -xvf #{archive_path}`

  # load and restore the content in the metadata.yaml
  metadata = YAML.load_file("#{folder_name}/metadata.yaml")

  # restore the nodes
  metadata.each do |m|
    klass = m[:_type].constantize

    # try to delete previously existing node
    begin
      previous_node = klass.find(m[:_id])
      previous_node.delete
    rescue Mongoid::Errors::DocumentNotFound
    end

    # create the node
    klass.create(m)
  end

  # look for dataset dumps and restore them
  filepaths = Dir["./#{folder_name}/**/*.gz"] + Dir["./#{folder_name}/**/*.dump"]

  filepaths.each do |filepath|
    # filepath: "./folder/db_name/dataset.1.gz"
    db_name = filepath.split('/')[2]
    dataset = filepath.split('/')[3].split('.')[0]
    n = Dataflow::Nodes::DataNode.find_by(db_name: db_name, name: dataset)
    n.restore_dataset(filepath: filepath)
  end


  # clean up the extracted folder
  `rm -rf #{folder_name}`
end