class Dataflow::Nodes::ComputeNode

Represents a compution. May stores its output in a separate data node. It depends on other data nodes to compute its own data.

Public Class Methods

data_node_opts() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 30
def data_node_opts
  @data_node_opts || {}
end
dependency_opts() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 26
def dependency_opts
  @dependency_opts || {}
end
ensure_data_node_exists() click to toggle source

DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).

# File lib/dataflow/nodes/compute_node.rb, line 52
def ensure_data_node_exists
  @data_node_opts = { ensure_exists: true }
end
ensure_dependencies(opts) click to toggle source

DSL to be used while making computeqd nodes. It supports enforcing validations by checking whether there is exactly, at_least (min) or at_most (max) a given number of dependencies. Usage: class MyComputeNode < ComputeNode

ensure_dependencies exactly: 1 # could be e.g.: min: 3, or max: 5

end

# File lib/dataflow/nodes/compute_node.rb, line 40
def ensure_dependencies(opts)
  raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must be given a hash. Received: #{opts.class}" unless opts.is_a?(Hash)
  valid_keys = %i(exactly min max).freeze
  has_attributes = (valid_keys - opts.keys).count < valid_keys.count
  raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must have at least one of 'min', 'max' or 'exactly' attributes set. Given: #{opts.keys}" unless has_attributes

  add_property(:dependency_ids, opts)
  @dependency_opts = opts
end

Public Instance Methods

all_dependencies() click to toggle source

retrieve the whole dependency tree

# File lib/dataflow/nodes/compute_node.rb, line 149
def all_dependencies
  (dependencies + dependencies.flat_map(&:all_dependencies)).uniq
end
compute(depth: 0, force_compute: false, source: nil) click to toggle source

Compute this node's data if not already updated. Acquires a computing lock before computing. In the eventuality that the lock is already acquired, it awaits until it finishes or times out. @param force_compute [Boolean] if true, computes

even if the node is already up to date.
# File lib/dataflow/nodes/compute_node.rb, line 240
def compute(depth: 0, force_compute: false, source: nil)
  has_compute_lock = false
  validate!

  if updated? && !force_compute
    logger.log("#{'>' * (depth + 1)} #{name} is up-to-date.")
    return
  end

  has_compute_lock = acquire_computing_lock!
  if has_compute_lock
    logger.log("#{'>' * (depth + 1)} #{name} started computing.")
    on_computing_started
    start_time = Time.now

    if data_node.present? && clear_data_on_compute != data_node.use_double_buffering
      # make sure the data node has a compatible settings
      data_node.use_double_buffering = clear_data_on_compute
      data_node.save
    end

    send_heartbeat
    pre_compute(force_compute: force_compute)

    # update this node's schema with the necessary fields
    data_node&.update_schema(required_schema)

    if clear_data_on_compute
      # Pre-compute, we recreate the table, the unique indexes
      data_node&.recreate_dataset(dataset_type: :write)
      data_node&.create_unique_indexes(dataset_type: :write)
    end

    send_heartbeat
    Executor.execute(self)

    if clear_data_on_compute
      # Post-compute, delay creating other indexes for insert speed
      data_node&.create_non_unique_indexes(dataset_type: :write)
      # swap read/write datasets
      data_node&.swap_read_write_datasets!
    end

    set_last_compute_starting_time(start_time)
    duration = Time.now - start_time
    logger.log("#{'>' * (depth + 1)} #{name} took #{duration} seconds to compute.")
    on_computing_finished(state: 'computed')
    true
  else
    logger.log("#{'>' * (depth + 1)} [IS AWAITING] #{name}.")
    await_computing!
    logger.log("#{'>' * (depth + 1)} [IS DONE AWAITING] #{name}.")
  end

rescue Errors::RemoteExecutionError => e
  on_computing_finished(state: 'error', error: e) if has_compute_lock
  logger.error(error: e, custom_message: "#{name} failed computing remotely.")
rescue StandardError => e
  on_computing_finished(state: 'error', error: e) if has_compute_lock
  logger.error(error: e, custom_message: "#{name} failed computing.")
  raise
ensure
  release_computing_lock! if has_compute_lock
  true
end
data_node() click to toggle source

Fetch the data node if it is set

# File lib/dataflow/nodes/compute_node.rb, line 136
def data_node
  @data_node ||= Dataflow::Nodes::DataNode.find(data_node_id) if data_node_id.present?
end
dependencies(reload: false) click to toggle source

Override the relation because self.dependencies is not ordered.

# File lib/dataflow/nodes/compute_node.rb, line 141
def dependencies(reload: false)
  return @dependencies if @dependencies.present? && !reload
  @dependencies = dependency_ids.map do |x|
    Dataflow::Node.find(x)
  end
end
execute_local_batch_computation(batch_params) click to toggle source

Interface to execute a part (batch) of this node locally. This method is called when the framework needs to execute a batch on a worker. Override when needed, to execute a batch depending on the params. If you override, you may want to override the make_batch_params as well.

# File lib/dataflow/nodes/compute_node.rb, line 352
def execute_local_batch_computation(batch_params)
  records = dependencies.first.all(where: batch_params)
  compute_batch(records: records)
end
execute_local_computation() click to toggle source

Interface to execute this node locally

# File lib/dataflow/nodes/compute_node.rb, line 344
def execute_local_computation
  compute_impl
end
execution_valid?(uuid) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 334
def execution_valid?(uuid)
  execution_uuid.to_s == uuid.to_s
end
explain_update(depth: 0, verbose: false) click to toggle source

Logs out the dependencies tree update time and whether it should or not be updated. Useful to understand why a given nodes had to be recomputed.

# File lib/dataflow/nodes/compute_node.rb, line 172
def explain_update(depth: 0, verbose: false)
  if depth == 0 || !updated? || verbose
    logger.log("#{'>' * (depth + 1)} #{name} [COMPUTE] | #{updated? ? 'UPDATED' : 'OLD'} = #{updated_at}")
  end

  return if updated? && !verbose

  dependencies.each do |dependency|
    dependency.explain_update(depth: depth + 1, verbose: verbose)
  end
  true
end
force_computing_lock_release!() click to toggle source

Force the release of this node's computing lock. Do not use unless there is a problem with the lock.

# File lib/dataflow/nodes/compute_node.rb, line 330
def force_computing_lock_release!
  release_computing_lock!
end
locked_for_computing?() click to toggle source

Check this node's locking status. @return [Boolean] Whtere this node is locked or not.

# File lib/dataflow/nodes/compute_node.rb, line 324
def locked_for_computing?
  computing_state == 'computing'
end
make_batch_params() click to toggle source

Interface used to retrieve the params for scheduled batchs. Override when needed. The default implemention is to make queries that would ensure the full processing of the first dependency's records. @return [Array] of params that are passed to scheduled batches.

# File lib/dataflow/nodes/compute_node.rb, line 361
def make_batch_params
  make_batch_queries(node: dependencies.first)
end
needs_automatic_recomputing?() click to toggle source

Checks whether an automatic recomputing is needed. @return [Boolean]

# File lib/dataflow/nodes/compute_node.rb, line 196
def needs_automatic_recomputing?
  interval = recompute_interval.to_i
  return false if interval <= 0
  return false if updated?
  return false if locked_for_computing?
  return true if updated_at.blank?

  updated_at + interval.seconds < Time.now
end
recompute(depth: 0, force_recompute: false) click to toggle source

Update the dependencies that need to be updated and then compute its own data. @param force_recompute [Boolean] if true, computes

even if the node is already up to date.
# File lib/dataflow/nodes/compute_node.rb, line 210
def recompute(depth: 0, force_recompute: false)
  send_heartbeat
  logger.log("#{'>' * (depth + 1)} #{name} started recomputing...")
  start_time = Time.now

  parallel_each(dependencies) do |dependency|
    logger.log("#{'>' * (depth + 1)} #{name} checking deps: #{dependency.name}...")
    if !dependency.updated? || force_recompute
      dependency.recompute(depth: depth + 1, force_recompute: force_recompute)
    end
    send_heartbeat
  end

  # Dependencies data may have changed in a child process.
  # Reload to make sure we have the latest metadata.
  logger.log("#{'>' * (depth + 1)} #{name} reloading dependencies...")
  dependencies(reload: true)

  compute(depth: depth, force_compute: force_recompute)
  logger.log("#{'>' * (depth + 1)} #{name} took #{Time.now - start_time} seconds to recompute.")

  true
end
schema() click to toggle source

Keep a compatible interface with the data node

# File lib/dataflow/nodes/compute_node.rb, line 339
def schema
  required_schema
end
set_defaults() click to toggle source

Sets the default parameters before creating the object.

# File lib/dataflow/nodes/compute_node.rb, line 110
def set_defaults
  # support setting the fields with a Document rather
  # than an ObjectId. Handle the transformations here:
  if data_node_id.present?
    self.data_node_id = data_node_id._id unless data_node_id.is_a?(BSON::ObjectId)

    # the data node use_double_buffering setting
    # must match clear_data_on_compute:
    if data_node.use_double_buffering != clear_data_on_compute
      data_node.use_double_buffering = clear_data_on_compute
      data_node.save
    end
  end

  # Again support having an ObjectId or a document.
  self.dependency_ids = dependency_ids.map { |dep|
    next dep if dep.is_a? BSON::ObjectId
    dep._id
  }

  # Update the data node schema with the required schema
  # for this computed node.
  data_node&.update_schema(required_schema)
end
updated?() click to toggle source

Returns false if any of our dependencies has been updated after our last update. We define a computed node's last update as the time it started its last successful update (instead of the time it completed it, has dependencies may have changed in the mean time). @return [Boolean]

# File lib/dataflow/nodes/compute_node.rb, line 159
def updated?
  return false if updated_at.blank?

  dependencies.each do |dependency|
    return false unless dependency.updated?
    return false if dependency.updated_at > updated_at
  end
  true
end
updated_at() click to toggle source

Keep a uniform interface with a DataNode.

# File lib/dataflow/nodes/compute_node.rb, line 186
def updated_at
  last_compute_starting_time
end
updated_at=(val) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 190
def updated_at=(val)
  self.last_compute_starting_time = val
end
valid_for_computation?() click to toggle source

Check wethere this node can or not compute. Errors are added to the active model errors. @return [Boolean] true has no errors and can be computed.

# File lib/dataflow/nodes/compute_node.rb, line 309
def valid_for_computation?
  # Perform additional checks: also add errors to "self.errors"
  opts = self.class.dependency_opts
  ensure_exact_dependencies(count: opts[:exactly]) if opts.key?(:exactly)
  ensure_at_most_dependencies(count: opts[:max])   if opts.key?(:max)
  ensure_at_least_dependencies(count: opts[:min])  if opts.key?(:min)
  ensure_no_cyclic_dependencies
  ensure_keys_are_set
  ensure_data_node_exists if self.class.data_node_opts[:ensure_exists]

  errors.count == 0
end

Private Instance Methods

acquire_computing_lock!() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 424
def acquire_computing_lock!
  # make sure that any pending changes are saved.
  save

  compute_state = {
    computing_state: 'computing',
    computing_started_at: Time.now,
    execution_uuid: BSON::ObjectId.new
  }
  find_query = { _id: _id, computing_state: { '$ne' => 'computing' } }
  update_query = { '$set' => compute_state }

  # send a query directly to avoid mongoid's caching layers
  res = Dataflow::Nodes::ComputeNode.where(find_query).find_one_and_update(update_query)

  # reload the model data after the query above
  reload

  # the query is atomic so if res != nil, we acquired the lock
  !res.nil?
end
await_computing!() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 460
def await_computing!
  max_wait_time = 15.minutes
  while Time.now < last_heartbeat_time + max_wait_time
    sleep 5
    # reloads with the data stored on mongodb:
    # something maybe have been changed by another process.
    reload
    return unless locked_for_computing?
  end

  raise StandardError, "Awaiting computing on #{name} reached timeout."
end
compute_batch(records:) click to toggle source

This is an interface only. Override when you can implement a computation in terms of the records of the first dependent node. @param records [Array] a batch of records from the first dependency @return [Array] an array of results that are to be pushed to the data node (if set).

# File lib/dataflow/nodes/compute_node.rb, line 382
def compute_batch(records:)
  []
end
compute_impl() click to toggle source

Default compute implementation:

  • recreate the table

  • compute the records

  • save them to the DB

(the process may be overwritten on a per-node basis if needed) Override if you need to have a completely custom compute implementation

# File lib/dataflow/nodes/compute_node.rb, line 373
def compute_impl
  process_parallel(node: dependencies.first)
end
disconnect_db_clients() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 604
def disconnect_db_clients
  Dataflow::Adapters::SqlAdapter.disconnect_clients
  Dataflow::Adapters::MongoDbAdapter.disconnect_clients
  Mongoid.disconnect_clients
end
ensure_at_least_dependencies(count:) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 537
def ensure_at_least_dependencies(count:)
  # we need to use .size, not .count
  # for the mongo relation to work as expected
  current_count = (dependency_ids || []).size
  return if current_count >= count

  error_msg = "Expecting at least #{count} dependencies. Has #{current_count} dependencies."
  errors.add(:dependency_ids, error_msg)
end
ensure_at_most_dependencies(count:) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 547
def ensure_at_most_dependencies(count:)
  # we need to use .size, not .count
  # for the mongo relation to work as expected
  current_count = (dependency_ids || []).size
  return if current_count <= count

  error_msg = "Expecting at most #{count} dependencies. Has #{current_count} dependencies."
  errors.add(:dependency_ids, error_msg)
end
ensure_data_node_exists() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 568
def ensure_data_node_exists
  if data_node_id.blank?
    error_msg = 'Expecting a data node to be set.'
    errors.add(:data_node_id, error_msg)
    return
  end

  # the data node id is present. Check if it found
  Dataflow::Nodes::DataNode.find(data_node.id)
rescue Mongoid::Errors::DocumentNotFound
  # it was not found:
  error_msg = "No data node was found for Id: '#{data_node_id}'."
  errors.add(:data_node_id, error_msg)
end
ensure_exact_dependencies(count:) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 527
def ensure_exact_dependencies(count:)
  # we need to use .size, not .count
  # for the mongo relation to work as expected
  current_count = (dependency_ids || []).size
  return if current_count == count

  error_msg = "Expecting exactly #{count} dependencies. Has #{current_count} dependencies."
  errors.add(:dependency_ids, error_msg)
end
ensure_keys_are_set() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 557
def ensure_keys_are_set
  required_keys = self.class.properties.select { |_k, opts| opts[:required_for_computing] }
  required_keys.each do |key, opts|
    errors.add(key, "#{self.class}.#{key} must be set for computing.") if self[key].nil?
    if opts[:values].is_a?(Array)
      # make sure the key's value is one of the possible values
      errors.add(key, "#{self.class}.#{key} must be set to one of #{opts[:values].join(', ')}. Given: #{self[key]}") unless opts[:values].include?(self[key])
    end
  end
end
ensure_no_cyclic_dependencies() click to toggle source

Dependency validations

# File lib/dataflow/nodes/compute_node.rb, line 501
def ensure_no_cyclic_dependencies
  node_map = Dataflow::Nodes::ComputeNode.all.map { |n| [n._id, n] }.to_h

  dep_ids = (dependency_ids || [])
  dep_ids.each do |dependency_id|
    next unless has_dependency_in_hierarchy?(node_map[dependency_id], dependency_id, node_map)
    error_msg = "Dependency to node #{dependency_id} ('#{node_map[dependency_id].name}') is cylic."
    errors.add(:dependency_ids, error_msg)
  end
end
ensure_no_cyclic_dependencies!() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 522
def ensure_no_cyclic_dependencies!
  ensure_no_cyclic_dependencies
  raise_dependendy_errors_if_needed!
end
has_dependency_in_hierarchy?(node, dependency_id, node_map) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 512
def has_dependency_in_hierarchy?(node, dependency_id, node_map)
  return false if node.blank?
  # if we're reach a node that has no more deps, then we did not find
  # the given dependency_id in the hierarchy
  return true if (node.dependency_ids || []).include?(dependency_id)
  (node.dependency_ids || []).any? do |dep_id|
    has_dependency_in_hierarchy?(node_map[dep_id], dependency_id, node_map)
  end
end
logger() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 610
def logger
  @logger ||= Dataflow::Logger.new(prefix: 'Dataflow')
end
make_batch_queries(node:, where: {}) click to toggle source

Makes queries that support traversing the node's records in parallel without overlap.

# File lib/dataflow/nodes/compute_node.rb, line 411
def make_batch_queries(node:, where: {})
  return [] if node.blank?
  record_count = node.count
  return [] if record_count == 0

  equal_split_per_process = (record_count / Parallel.processor_count.to_f).ceil
  count_per_process = equal_split_per_process
  limit = limit_per_process.to_i
  count_per_process = [limit, equal_split_per_process].min if limit > 0

  queries = node.ordered_system_id_queries(batch_size: count_per_process, where: where)
end
parallel_each(itr) { |*args| ... } click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 583
def parallel_each(itr)
  # before fork: always disconnect currently used connections.
  disconnect_db_clients

  # set to true to debug code in the iteration
  is_debugging_impl = ENV['DEBUG_DATAFLOW']
  opts = if is_debugging_impl
           # this will turn of the parallel processing
           { in_processes: 0 }
         elsif max_parallel_processes > 0
           { in_processes: max_parallel_processes }
         else
           {}
         end

  Parallel.each(itr, opts) do |*args|
    yield(*args)
    disconnect_db_clients
  end
end
pre_compute(force_compute:) click to toggle source

Interface only. Re-implement for node-specific behavior before computing

# File lib/dataflow/nodes/compute_node.rb, line 474
def pre_compute(force_compute:); end
process_parallel(node:) { |records| ... } click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 386
def process_parallel(node:)
  queries = make_batch_queries(node: node)
  return if queries.blank?

  queries_count = queries.count
  parallel_each(queries.each_with_index) do |query, idx|
    send_heartbeat

    progress = (idx / queries_count.to_f * 100).ceil
    on_computing_progressed(pct_complete: progress)
    logger.log("Executing #{name} [Batch #{idx}/#{queries_count}]")

    records = node.all(where: query)

    new_records = if block_given?
                    yield records
                  else
                    compute_batch(records: records)
                  end

    data_node&.add(records: new_records)
  end
end
release_computing_lock!() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 446
def release_computing_lock!
  # make sure that any pending changes are saved.
  save

  find_query = { _id: _id }
  update_query = { '$set' => { computing_state: nil, computing_started_at: nil, execution_uuid: nil } }

  # send a query directly to avoid mongoid's caching layers
  Dataflow::Nodes::ComputeNode.where(find_query).find_one_and_update(update_query)

  # reload the model data after the query above
  reload
end
required_schema() click to toggle source

Override to define a required schema.

# File lib/dataflow/nodes/compute_node.rb, line 477
def required_schema
  data_node&.schema
end
send_heartbeat() click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 481
def send_heartbeat
  update_query = { '$set' => { last_heartbeat_time: Time.now } }
  Dataflow::Nodes::ComputeNode.where(_id: _id)
                              .find_one_and_update(update_query)
end
set_last_compute_starting_time(time) click to toggle source
# File lib/dataflow/nodes/compute_node.rb, line 487
def set_last_compute_starting_time(time)
  # this is just to avoid the reload.
  # But this change will not be propagated across processes
  self.last_compute_starting_time = time
  # update directly on the DB
  update_query = { '$set' => { last_compute_starting_time: time } }
  Dataflow::Nodes::ComputeNode.where(_id: _id)
                              .find_one_and_update(update_query)
end