class Dataflow::Nodes::Filter::DropWhileNode

Makes a sequency based on a key (e.g. id), and order it (e.g. by time), and then applies the same logic as ruby's drop_while. See: ruby-doc.org/core-2.4.0/Array.html#method-i-drop_while

Constants

VALID_MODES
VALID_OPS

Public Instance Methods

compute_impl() click to toggle source
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 27
def compute_impl
  base_node = dependencies.first
  records_count = base_node.count
  return if records_count == 0

  ids = base_node.all(fields: [id_key]) do |results|
    results.distinct(id_key)
  end
  count_per_process = (ids.count / Parallel.processor_count.to_f).ceil
  limit = limit_per_process.to_i
  count_per_process = [limit, count_per_process].min if limit > 0

  parallel_each(ids.each_slice(count_per_process)) do |ids_slice|
    # ids.each_slice(count_per_process) do |ids_slice|
    process_ids(node: base_node, ids: ids_slice)
  end
end

Private Instance Methods

drop_while(group) click to toggle source

apply a single drop_while on the group.

# File lib/dataflow/nodes/filter/drop_while_node.rb, line 78
def drop_while(group)
  value_tokens = record_dig_tokens(key: field, use_sym: dependencies.first.use_symbols?)

  case op.to_s.downcase
  when 'eq'
    group.drop_while { |x| x.dig(*value_tokens) == value }
  when 'ne'
    group.drop_while { |x| x.dig(*value_tokens) != value }
  when 'le'
    group.drop_while do |x|
      val = x.dig(*value_tokens)
      next true if val.nil? # drop nil values
      val <= value
    end
  when 'lt'
    group.drop_while do |x|
      val = x.dig(*value_tokens)
      next true if val.nil? # drop nil values
      val < value
    end
  when 'ge'
    group.drop_while do |x|
      val = x.dig(*value_tokens)
      next true if val.nil? # drop nil values
      val >= value
    end
  when 'gt'
    group.drop_while do |x|
      val = x.dig(*value_tokens)
      next true if val.nil? # drop nil values
      val > value
    end
  else
    raise Errors::InvalidConfigurationError, "Invalid op key: #{op}"
  end
end
process_group(record_group) click to toggle source

sort the record group and then proceed to drop the elements that satisfy the condition

# File lib/dataflow/nodes/filter/drop_while_node.rb, line 60
def process_group(record_group)
  sort_tokens = record_dig_tokens(key: sort_by, use_sym: dependencies.first.use_symbols?)
  group = record_group.sort_by { |x| x.dig(*sort_tokens) }
  group = group.reverse unless sort_asc
  modes = drop_mode == 'both' ? %w(left right) : [drop_mode]

  modes.each do |mode|
    # if we want to drop on the right,
    # reverse the array, drop on the left and reverse again
    group = group.reverse if mode == 'right'
    group = drop_while(group)
    group = group.reverse if mode == 'right'
  end

  group
end
process_ids(node:, ids:) click to toggle source
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 47
def process_ids(node:, ids:)
  records = node.all(where: { id_key => ids })
  groups = records.group_by { |x| x[id_key] }

  result = groups.flat_map do |_, group|
    process_group(group)
  end.compact

  data_node.add(records: result)
end