class Dataflow::Nodes::DataNode
Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.
Nodes::DataNode
represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.
Public Instance Methods
Adds the given records to the dataset and updates the updated_at time. @param records [Array] an array of the records to be added.
# File lib/dataflow/nodes/data_node.rb, line 187 def add(records:) raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array) records = records.compact return if records.blank? db_adapter.save(records: records) self.updated_at = Time.now save! end
Returns all the records from a dataset that match the options. @param where [Hash] the condition to apply for retrieving the element.
e.g.: { 'id' => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.
@param fields [Array] Array of strings representing which fields to include.
e.g.: ['id', 'updated_at'] will only return these two fields.
@param sort [Hash] represents the sorting of the returned dataset.
e.g. { 'id' => 1, 'updated_at' => -1 } will sort by id ASC and by updated_at DESC.
@param limit [Integer] limits the amount of records returned. @param offset [Integer] starting offset of the records returned.
Use with limit to implement pagination.
@yield [db_client] When a block is passed, yields the db client on which .each
can be called to stream the results rather than load everything in memory. Other methods can also be called depending on the backend, the downside being back-end portability (use at your own risk).
# File lib/dataflow/nodes/data_node.rb, line 147 def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false, &block) db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, include_system_id: include_system_id, &block) end
Supports paginating efficiently through the dataset. @param where [Hash] the condition to apply for retrieving the element.
e.g.: { 'id' => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record. IMPORTANT: do not use the system id in the query. It will be overwritten.
@param fields [Array] Array of strings representing which fields to include.
e.g.: ['id', 'updated_at'] will only return these two fields.
@param limit [Integer] limits the amount of records returned. @param cursor [String] indicates from which page should the results be returned. @return [Hash] with 2 fields:
- data [Array] that contains the fetched records - next_cursor [String] a string to pass into the sub-sequent calls to fetch the next page of the data
# File lib/dataflow/nodes/data_node.rb, line 164 def all_paginated(where: {}, fields: [], cursor: nil) db_adapter.all_paginated(where: where, fields: fields, cursor: cursor) end
Clear the data that matches the options.
# File lib/dataflow/nodes/data_node.rb, line 197 def clear(where: {}) db_adapter.delete(where: where) end
Counts how many records matches the condition or all if no condition is given. @return [Integer] the record count.
# File lib/dataflow/nodes/data_node.rb, line 181 def count(where: {}) db_adapter.count(where: where) end
Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).
# File lib/dataflow/nodes/data_node.rb, line 229 def create_non_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :non_unique_only) end
Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data. @param dataset_type [Symbol] select which dataset to recreate.
Can :read or :write.
# File lib/dataflow/nodes/data_node.rb, line 221 def create_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :unique_only) end
# File lib/dataflow/nodes/data_node.rb, line 307 def db_indexes(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name db_adapter.retrieve_dataset_indexes(dataset) end
# File lib/dataflow/nodes/data_node.rb, line 343 def drop_dataset! db_adapter.drop_dataset(write_dataset_name) return unless use_double_buffering db_adapter.drop_dataset(read_dataset_name) end
Dump a backup of this dataset to a file. @return [String] the filepath to the dump file. The filename is
formatted as <node_name>.<read_dataset_idx>.<ext>
# File lib/dataflow/nodes/data_node.rb, line 352 def dump_dataset(base_folder: './dump') read_idx = 0 read_idx = read_dataset_idx if use_double_buffering db_adapter.dump(base_folder: base_folder, read_dataset_idx: read_idx) end
# File lib/dataflow/nodes/data_node.rb, line 320 def explain_update(depth: 0, verbose: false) logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}") end
# File lib/dataflow/nodes/data_node.rb, line 278 def export(connection_opts: { db_backend: :csv }, keys: [], where: {}) on_export_started(connection_opts: connection_opts, keys: keys) # instanciate and export without saving anything Export::ToCsvNode.new( dependency_ids: [self], query: where.to_json, keys: keys ).compute_impl on_export_finished end
Finds and return from the dataset, based on the given options. @param where [Hash] the condition to apply for retrieving the element.
e.g.: { 'id' => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.
@return [Hash] returns a single record from the dataset.
# File lib/dataflow/nodes/data_node.rb, line 127 def find(where: {}) db_adapter.find(where: where) end
When the dataset properties changed notify the adapter to handle the new settings.
# File lib/dataflow/nodes/data_node.rb, line 107 def handle_dataset_settings_changed db_adapter.update_settings(data_node: self) # if we're using double buffering, just wait for the next buffer # to be created to apply the changes. return if use_double_buffering # recreate the dataset if there is no data if db_adapter.count.zero? db_adapter.recreate_dataset(dataset: read_dataset_name) end db_adapter.create_indexes(dataset: read_dataset_name) end
# File lib/dataflow/nodes/data_node.rb, line 272 def import(connection_opts: {}, keys: nil) importer = db_adapter(connection_opts) records = importer.all add(records: records) end
retrieves some informations about this node and its usage
# File lib/dataflow/nodes/data_node.rb, line 290 def info(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name usage = db_adapter.usage(dataset: dataset) { name: name, type: self.class.to_s, dataset: dataset, db_backend: db_backend, updated_at: updated_at, record_count: count, indexes: indexes, db_indexes: db_indexes(write_dataset: write_dataset), mem_usage: usage[:memory], storage_usage: usage[:storage] } end
Return a list of order (ASC) system IDs. @param batch_size [Integer] how many IDs to select per query. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|
process(node.all(where: query))
end
# File lib/dataflow/nodes/data_node.rb, line 175 def ordered_system_id_queries(batch_size:, where: {}) db_adapter.ordered_system_id_queries(batch_size: batch_size, where: where) end
# File lib/dataflow/nodes/data_node.rb, line 234 def read_dataset_name return @temporary_read_dataset if @temporary_read_dataset if use_double_buffering "#{name}_#{double_buffer_prefix}#{read_dataset_idx}" else name end end
Use to select from which dataset you want to read. A possible use case is to read from an old dataset name. @param dataset [String] the dataset name from where to read from.
It must be a valid dataset name for the current settings.
# File lib/dataflow/nodes/data_node.rb, line 256 def read_dataset_name=(dataset) return unless valid_dataset_names.include?(dataset) @temporary_read_dataset = dataset db_adapter.update_settings(data_node: self) dataset end
Recreates a dataset. @param dataset_type [Symbol] select which dataset to recreate.
Can :read or :write.
# File lib/dataflow/nodes/data_node.rb, line 210 def recreate_dataset(dataset_type: :read) # fetch the proper dataset name dataset = send("#{dataset_type}_dataset_name") db_adapter.recreate_dataset(dataset: dataset) end
Dataflow::Node#required_by
# File lib/dataflow/nodes/data_node.rb, line 324 def required_by super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map do |node| { node: node, type: 'dataset' } end end
Restore a dump of this dataset @param files [String] the filepath to the dump file. The filename has
to be formatted as <node_name>.<read_dataset_idx>.<ext>
# File lib/dataflow/nodes/data_node.rb, line 362 def restore_dataset(filepath:) filename = filepath.split('/')[-1] read_idx = if filename.count('.') < 2 # for compatibility reasons: previously we were not # exporting the read idx in the filename 0 else filename.split('.')[1].to_i end raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a single buffer dataset but node '#{name}' is expecting a double buffered one." if read_idx == 0 && use_double_buffering raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a double buffer dataset but node '#{name}' is expecting a single buffered one." if read_idx > 0 && !use_double_buffering if use_double_buffering dataset_name = dataset_name_for_buffer(read_idx) else dataset_name = name end db_adapter.restore(filepath: filepath, dataset_name: dataset_name) self.read_dataset_idx = read_idx save db_adapter.update_settings(data_node: self) true end
this is not safe if there is some parallel processing going on
# File lib/dataflow/nodes/data_node.rb, line 331 def safely_clear_write_dataset # we can only clear the write dataset if we're using double buffering return unless use_double_buffering # check if there is any node that is currently computing to this dataset used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? } return if used_by.present? logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.") # TODO: lock the node? db_adapter.drop_dataset(write_dataset_name) end
Sets the default parameters before creating the object.
# File lib/dataflow/nodes/data_node.rb, line 82 def set_defaults self.schema = schema || {} # Use the schema as the inferred schema if none is provided. # This useful when there is no need to infer schemas (e.g. in SQL) self.inferred_schema ||= schema # This is needed for the flow to compute properly self.updated_at = Time.now end
# File lib/dataflow/nodes/data_node.rb, line 263 def swap_read_write_datasets! raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering tmp = read_dataset_idx self.read_dataset_idx = write_dataset_idx self.write_dataset_idx = tmp db_adapter.update_settings(data_node: self) save! end
Update this node's schema.
# File lib/dataflow/nodes/data_node.rb, line 202 def update_schema(sch) self.schema = sch db_adapter.update_settings(data_node: self) end
# File lib/dataflow/nodes/data_node.rb, line 316 def updated? true end
# File lib/dataflow/nodes/data_node.rb, line 312 def use_symbols? (db_backend.to_s =~ /sql/).present? end
# File lib/dataflow/nodes/data_node.rb, line 244 def write_dataset_name if use_double_buffering "#{name}_#{double_buffer_prefix}#{write_dataset_idx}" else name end end
Private Instance Methods
# File lib/dataflow/nodes/data_node.rb, line 431 def dataset_name_for_buffer(idx) "#{name}_#{double_buffer_prefix}#{idx}" end
# File lib/dataflow/nodes/data_node.rb, line 392 def db_adapter(connection_opts = {}) db_backend = connection_opts[:db_backend] || self.db_backend opts = connection_opts.deep_dup opts.delete(:db_backend) has_options = opts.present? case db_backend.downcase.to_s when 'mongodb' return Adapters::MongoDbAdapter.new(opts) if has_options @mongodb_adapter ||= Adapters::MongoDbAdapter.new(data_node: self) return @mongodb_adapter when 'csv' return Adapters::CsvAdapter.new(opts) if has_options @csv_adapter ||= Adapters::CsvAdapter.new(data_node: self) return @csv_adapter when 'mysql' opts[:adapter_type] = 'mysql' return Adapters::SqlAdapter.new(opts) if has_options @mysql_adapter ||= Adapters::MysqlAdapter.new(data_node: self, adapter_type: 'mysql') return @mysql_adapter when 'postgresql' opts[:adapter_type] = 'postgresql' return Adapters::SqlAdapter.new(opts) if has_options @postgresql_adapter ||= Adapters::PsqlAdapter.new(data_node: self, adapter_type: 'postgresql') return @postgresql_adapter end raise NotImplementedError, "'#{db_backend}' backend is not implemented." end
# File lib/dataflow/nodes/data_node.rb, line 435 def logger @logger ||= Dataflow::Logger.new(prefix: 'Dataflow') end
# File lib/dataflow/nodes/data_node.rb, line 423 def valid_dataset_names if use_double_buffering [dataset_name_for_buffer(read_dataset_idx), dataset_name_for_buffer(write_dataset_idx)] else [name] end end