class Dataflow::Nodes::DataNode

Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.

Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.

Public Instance Methods

add(records:) click to toggle source

Adds the given records to the dataset and updates the updated_at time. @param records [Array] an array of the records to be added.

# File lib/dataflow/nodes/data_node.rb, line 187
def add(records:)
  raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array)
  records = records.compact
  return if records.blank?
  db_adapter.save(records: records)
  self.updated_at = Time.now
  save!
end
all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false, &block) click to toggle source

Returns all the records from a dataset that match the options. @param where [Hash] the condition to apply for retrieving the element.

e.g.: { 'id' => 1 } will fetch a record with the id 1.
An empty option hash will retrieve any record.

@param fields [Array] Array of strings representing which fields to include.

e.g.: ['id', 'updated_at'] will only return these two fields.

@param sort [Hash] represents the sorting of the returned dataset.

e.g. { 'id' => 1, 'updated_at' => -1 } will sort by
id ASC and by updated_at DESC.

@param limit [Integer] limits the amount of records returned. @param offset [Integer] starting offset of the records returned.

Use with limit to implement pagination.

@yield [db_client] When a block is passed, yields the db client on which .each

can be called to stream the results rather than load everything in memory.
Other methods can also be called depending on the backend,
the downside being back-end portability (use at your own risk).
# File lib/dataflow/nodes/data_node.rb, line 147
def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false, &block)
  db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, include_system_id: include_system_id, &block)
end
all_paginated(where: {}, fields: [], cursor: nil) click to toggle source

Supports paginating efficiently through the dataset. @param where [Hash] the condition to apply for retrieving the element.

e.g.: { 'id' => 1 } will fetch a record with the id 1.
An empty option hash will retrieve any record.
IMPORTANT: do not use the system id in the query. It will be overwritten.

@param fields [Array] Array of strings representing which fields to include.

e.g.: ['id', 'updated_at'] will only return these two fields.

@param limit [Integer] limits the amount of records returned. @param cursor [String] indicates from which page should the results be returned. @return [Hash] with 2 fields:

- data [Array] that contains the fetched records
- next_cursor [String] a string to pass into the sub-sequent
                       calls to fetch the next page of the data
# File lib/dataflow/nodes/data_node.rb, line 164
def all_paginated(where: {}, fields: [], cursor: nil)
  db_adapter.all_paginated(where: where, fields: fields, cursor: cursor)
end
clear(where: {}) click to toggle source

Clear the data that matches the options.

# File lib/dataflow/nodes/data_node.rb, line 197
def clear(where: {})
  db_adapter.delete(where: where)
end
count(where: {}) click to toggle source

Counts how many records matches the condition or all if no condition is given. @return [Integer] the record count.

# File lib/dataflow/nodes/data_node.rb, line 181
def count(where: {})
  db_adapter.count(where: where)
end
create_non_unique_indexes(dataset_type: :read) click to toggle source

Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).

# File lib/dataflow/nodes/data_node.rb, line 229
def create_non_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :non_unique_only)
end
create_unique_indexes(dataset_type: :read) click to toggle source

Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data. @param dataset_type [Symbol] select which dataset to recreate.

Can :read or :write.
# File lib/dataflow/nodes/data_node.rb, line 221
def create_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :unique_only)
end
db_indexes(write_dataset: false) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 307
def db_indexes(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  db_adapter.retrieve_dataset_indexes(dataset)
end
drop_dataset!() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 343
def drop_dataset!
  db_adapter.drop_dataset(write_dataset_name)
  return unless use_double_buffering
  db_adapter.drop_dataset(read_dataset_name)
end
dump_dataset(base_folder: './dump') click to toggle source

Dump a backup of this dataset to a file. @return [String] the filepath to the dump file. The filename is

formatted as <node_name>.<read_dataset_idx>.<ext>
# File lib/dataflow/nodes/data_node.rb, line 352
def dump_dataset(base_folder: './dump')
  read_idx = 0
  read_idx = read_dataset_idx if use_double_buffering

  db_adapter.dump(base_folder: base_folder, read_dataset_idx: read_idx)
end
explain_update(depth: 0, verbose: false) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 320
def explain_update(depth: 0, verbose: false)
  logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}")
end
export(connection_opts: { db_backend: :csv }, keys: [], where: {}) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 278
def export(connection_opts: { db_backend: :csv }, keys: [], where: {})
  on_export_started(connection_opts: connection_opts, keys: keys)
  # instanciate and export without saving anything
  Export::ToCsvNode.new(
    dependency_ids: [self],
    query: where.to_json,
    keys: keys
  ).compute_impl
  on_export_finished
end
find(where: {}) click to toggle source

Finds and return from the dataset, based on the given options. @param where [Hash] the condition to apply for retrieving the element.

e.g.: { 'id' => 1 } will fetch a record with the id 1.
An empty option hash will retrieve any record.

@return [Hash] returns a single record from the dataset.

# File lib/dataflow/nodes/data_node.rb, line 127
def find(where: {})
  db_adapter.find(where: where)
end
handle_dataset_settings_changed() click to toggle source

When the dataset properties changed notify the adapter to handle the new settings.

# File lib/dataflow/nodes/data_node.rb, line 107
def handle_dataset_settings_changed
  db_adapter.update_settings(data_node: self)

  # if we're using double buffering, just wait for the next buffer
  # to be created to apply the changes.
  return if use_double_buffering

  # recreate the dataset if there is no data
  if db_adapter.count.zero?
    db_adapter.recreate_dataset(dataset: read_dataset_name)
  end

  db_adapter.create_indexes(dataset: read_dataset_name)
end
import(connection_opts: {}, keys: nil) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 272
def import(connection_opts: {}, keys: nil)
  importer = db_adapter(connection_opts)
  records = importer.all
  add(records: records)
end
info(write_dataset: false) click to toggle source

retrieves some informations about this node and its usage

# File lib/dataflow/nodes/data_node.rb, line 290
def info(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  usage = db_adapter.usage(dataset: dataset)
  {
    name: name,
    type: self.class.to_s,
    dataset: dataset,
    db_backend: db_backend,
    updated_at: updated_at,
    record_count: count,
    indexes: indexes,
    db_indexes: db_indexes(write_dataset: write_dataset),
    mem_usage: usage[:memory],
    storage_usage: usage[:storage]
  }
end
ordered_system_id_queries(batch_size:, where: {}) click to toggle source

Return a list of order (ASC) system IDs. @param batch_size [Integer] how many IDs to select per query. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|

process(node.all(where: query))

end

# File lib/dataflow/nodes/data_node.rb, line 175
def ordered_system_id_queries(batch_size:, where: {})
  db_adapter.ordered_system_id_queries(batch_size: batch_size, where: where)
end
read_dataset_name() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 234
def read_dataset_name
  return @temporary_read_dataset if @temporary_read_dataset

  if use_double_buffering
    "#{name}_#{double_buffer_prefix}#{read_dataset_idx}"
  else
    name
  end
end
read_dataset_name=(dataset) click to toggle source

Use to select from which dataset you want to read. A possible use case is to read from an old dataset name. @param dataset [String] the dataset name from where to read from.

It must be a valid dataset name for the current settings.
# File lib/dataflow/nodes/data_node.rb, line 256
def read_dataset_name=(dataset)
  return unless valid_dataset_names.include?(dataset)
  @temporary_read_dataset = dataset
  db_adapter.update_settings(data_node: self)
  dataset
end
recreate_dataset(dataset_type: :read) click to toggle source

Recreates a dataset. @param dataset_type [Symbol] select which dataset to recreate.

Can :read or :write.
# File lib/dataflow/nodes/data_node.rb, line 210
def recreate_dataset(dataset_type: :read)
  # fetch the proper dataset name
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.recreate_dataset(dataset: dataset)
end
required_by() click to toggle source
Calls superclass method Dataflow::Node#required_by
# File lib/dataflow/nodes/data_node.rb, line 324
def required_by
  super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map do |node|
    { node: node, type: 'dataset' }
  end
end
restore_dataset(filepath:) click to toggle source

Restore a dump of this dataset @param files [String] the filepath to the dump file. The filename has

to be formatted as <node_name>.<read_dataset_idx>.<ext>
# File lib/dataflow/nodes/data_node.rb, line 362
def restore_dataset(filepath:)
  filename = filepath.split('/')[-1]
  read_idx = if filename.count('.') < 2
               # for compatibility reasons: previously we were not
               # exporting the read idx in the filename
               0
             else
               filename.split('.')[1].to_i
             end

  raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a single buffer dataset but node '#{name}' is expecting a double buffered one." if read_idx == 0 && use_double_buffering
  raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a double buffer dataset but node '#{name}' is expecting a single buffered one." if read_idx > 0 && !use_double_buffering

  if use_double_buffering
    dataset_name = dataset_name_for_buffer(read_idx)
  else
    dataset_name = name
  end

  db_adapter.restore(filepath: filepath, dataset_name: dataset_name)
  self.read_dataset_idx = read_idx
  save

  db_adapter.update_settings(data_node: self)

  true
end
safely_clear_write_dataset() click to toggle source

this is not safe if there is some parallel processing going on

# File lib/dataflow/nodes/data_node.rb, line 331
def safely_clear_write_dataset
  # we can only clear the write dataset if we're using double buffering
  return unless use_double_buffering
  # check if there is any node that is currently computing to this dataset
  used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? }
  return if used_by.present?

  logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.")
  # TODO: lock the node?
  db_adapter.drop_dataset(write_dataset_name)
end
set_defaults() click to toggle source

Sets the default parameters before creating the object.

# File lib/dataflow/nodes/data_node.rb, line 82
def set_defaults
  self.schema = schema || {}

  # Use the schema as the inferred schema if none is provided.
  # This useful when there is no need to infer schemas (e.g. in SQL)
  self.inferred_schema ||= schema

  # This is needed for the flow to compute properly
  self.updated_at = Time.now
end
swap_read_write_datasets!() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 263
def swap_read_write_datasets!
  raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering
  tmp = read_dataset_idx
  self.read_dataset_idx = write_dataset_idx
  self.write_dataset_idx = tmp
  db_adapter.update_settings(data_node: self)
  save!
end
update_schema(sch) click to toggle source

Update this node's schema.

# File lib/dataflow/nodes/data_node.rb, line 202
def update_schema(sch)
  self.schema = sch
  db_adapter.update_settings(data_node: self)
end
updated?() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 316
def updated?
  true
end
use_symbols?() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 312
def use_symbols?
  (db_backend.to_s =~ /sql/).present?
end
write_dataset_name() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 244
def write_dataset_name
  if use_double_buffering
    "#{name}_#{double_buffer_prefix}#{write_dataset_idx}"
  else
    name
  end
end

Private Instance Methods

dataset_name_for_buffer(idx) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 431
def dataset_name_for_buffer(idx)
  "#{name}_#{double_buffer_prefix}#{idx}"
end
db_adapter(connection_opts = {}) click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 392
def db_adapter(connection_opts = {})
  db_backend = connection_opts[:db_backend] || self.db_backend

  opts = connection_opts.deep_dup
  opts.delete(:db_backend)
  has_options = opts.present?

  case db_backend.downcase.to_s
  when 'mongodb'
    return Adapters::MongoDbAdapter.new(opts) if has_options
    @mongodb_adapter ||= Adapters::MongoDbAdapter.new(data_node: self)
    return @mongodb_adapter
  when 'csv'
    return Adapters::CsvAdapter.new(opts) if has_options
    @csv_adapter ||= Adapters::CsvAdapter.new(data_node: self)
    return @csv_adapter
  when 'mysql'
    opts[:adapter_type] = 'mysql'
    return Adapters::SqlAdapter.new(opts) if has_options
    @mysql_adapter ||= Adapters::MysqlAdapter.new(data_node: self, adapter_type: 'mysql')
    return @mysql_adapter
  when 'postgresql'
    opts[:adapter_type] = 'postgresql'
    return Adapters::SqlAdapter.new(opts) if has_options
    @postgresql_adapter ||= Adapters::PsqlAdapter.new(data_node: self, adapter_type: 'postgresql')
    return @postgresql_adapter
  end

  raise NotImplementedError, "'#{db_backend}' backend is not implemented."
end
logger() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 435
def logger
  @logger ||= Dataflow::Logger.new(prefix: 'Dataflow')
end
valid_dataset_names() click to toggle source
# File lib/dataflow/nodes/data_node.rb, line 423
def valid_dataset_names
  if use_double_buffering
    [dataset_name_for_buffer(read_dataset_idx), dataset_name_for_buffer(write_dataset_idx)]
  else
    [name]
  end
end