module Dataflow
Override the constantize in active_support/inflector/methods.rb to rescue from Dataflow::Nodes::… name errors. In such cases, we return a generic Dataflow::Nodes::DataNode
instead. This is used within mongoid to instance the correct node types.
Constants
- CsvPath
- VERSION
Public Class Methods
clear_tmp_datasets()
click to toggle source
helper that helps clearing un-used datasets NOTE: although there is a best attempt to not delete datasets that are currently being written to, this is not safe to use while executing in parallel.
# File lib/dataflow-rb.rb, line 80 def self.clear_tmp_datasets Dataflow::Nodes::DataNode.all.each(&:safely_clear_write_dataset) end
compute_node(id)
click to toggle source
helper that tries to find a computed node by id and then name
# File lib/dataflow-rb.rb, line 71 def self.compute_node(id) Dataflow::Nodes::ComputeNode.find(id) rescue Mongoid::Errors::DocumentNotFound Dataflow::Nodes::ComputeNode.find_by(name: id) end
data_node(id)
click to toggle source
helper that tries to find a data node by id and then by name
# File lib/dataflow-rb.rb, line 64 def self.data_node(id) Dataflow::Nodes::DataNode.find(id) rescue Mongoid::Errors::DocumentNotFound Dataflow::Nodes::DataNode.find_by(name: id) end
export(nodes:, export_dir: './flows', include_data: false)
click to toggle source
Exports nodes and their data. Use import to re-import them elsewhere.
# File lib/dataflow-rb.rb, line 85 def self.export(nodes:, export_dir: './flows', include_data: false) raise ArgumentError, 'nodes must be an array of nodes' unless nodes.is_a?(Array) # make a tmp folder with the export dir archive_name = "flow_#{Time.now.strftime("%Y-%m-%d_%H-%M-%S")}" tmp_dir = "#{export_dir}/#{archive_name}" `mkdir -p #{tmp_dir}` # export all the dependencies all_nodes = nodes + nodes.flat_map(&:all_dependencies) # and all the compute node's datasets all_nodes += all_nodes.select { |x| x.is_a?(Dataflow::Nodes::ComputeNode) } .map { |x| x.data_node } # get all the nodes' metadata in the yaml format metadata_yaml = all_nodes.compact.uniq.map(&:metadata).to_yaml File.write("#{tmp_dir}/metadata.yaml", metadata_yaml) # add the dataset's data if necessary if include_data all_nodes.select { |x| x.is_a?(Dataflow::Nodes::DataNode) } .each { |x| x.dump_dataset(base_folder: tmp_dir) } end # pack all the content in a tar archive archive_path = "#{archive_name}.tar" `(cd #{export_dir} && tar -cvf #{archive_path} #{archive_name})` # clear the tmp folder `rm -rf #{tmp_dir}` "#{export_dir}/#{archive_path}" end
import(archive_path:)
click to toggle source
# File lib/dataflow-rb.rb, line 117 def self.import(archive_path:) raise ArgumentError, 'expecting a tar archive file' unless archive_path.end_with?('.tar') # extract the tar folder_name = archive_path.split('/')[-1].split('.')[0] `tar -xvf #{archive_path}` # load and restore the content in the metadata.yaml metadata = YAML.load_file("#{folder_name}/metadata.yaml") # restore the nodes metadata.each do |m| klass = m[:_type].constantize # try to delete previously existing node begin previous_node = klass.find(m[:_id]) previous_node.delete rescue Mongoid::Errors::DocumentNotFound end # create the node klass.create(m) end # look for dataset dumps and restore them filepaths = Dir["./#{folder_name}/**/*.gz"] + Dir["./#{folder_name}/**/*.dump"] filepaths.each do |filepath| # filepath: "./folder/db_name/dataset.1.gz" db_name = filepath.split('/')[2] dataset = filepath.split('/')[3].split('.')[0] n = Dataflow::Nodes::DataNode.find_by(db_name: db_name, name: dataset) n.restore_dataset(filepath: filepath) end # clean up the extracted folder `rm -rf #{folder_name}` end