class Dataflow::Nodes::Filter::NewestNode

Select the newest record among records with the same id key.

Private Instance Methods

compute_impl() click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 20
def compute_impl
  base_node = dependencies.first
  records_count = base_node.count
  return if records_count == 0

  ids = base_node.all(fields: [id_key]) do |results|
    results.distinct(id_key)
  end
  count_per_process = (ids.count / Parallel.processor_count.to_f).ceil
  limit = limit_per_process.to_i
  count_per_process = [limit, count_per_process].min if limit > 0

  parallel_each(ids.each_slice(count_per_process)) do |ids_slice|
    # ids.each_slice(count_per_process) do |ids_slice|
    process_ids(node: base_node, ids: ids_slice)
  end
end
ensure_keys_are_set!() click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 15
def ensure_keys_are_set!
  raise Errors::InvalidConfigurationError, 'Id key must be set.' if id_key.blank?
  raise Errors::InvalidConfigurationError, 'Date key must be set.' if date_key.blank?
end
filter_by_newest(groups:, date_key:) click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 54
def filter_by_newest(groups:, date_key:)
  groups.map do |_, entries|
    # sort by date ASC and select the newest
    entries
      .sort_by do |x|
      x[date_key].is_a?(Time) ? x[date_key] : Timeliness.parse(x[date_key])
    end.last
  end
end
process_ids(node:, ids:) click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 38
def process_ids(node:, ids:)
  metatata = node.all(where: { id_key => ids }, fields: [id_key, date_key])
  groups = metatata.group_by { |x| x[id_key] }
  newest_record_metadata = filter_by_newest(groups: groups,
                                            date_key: date_key)
  records = newest_record_metadata.map do |metadata|
    query = {
      id_key => metadata[id_key],
      date_key => metadata[date_key]
    }
    node.find(where: query)
  end.compact

  data_node.add(records: records)
end