class Dataflow::Nodes::Filter::NewestNode
Select the newest record among records with the same id key.
Private Instance Methods
compute_impl()
click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 20 def compute_impl base_node = dependencies.first records_count = base_node.count return if records_count == 0 ids = base_node.all(fields: [id_key]) do |results| results.distinct(id_key) end count_per_process = (ids.count / Parallel.processor_count.to_f).ceil limit = limit_per_process.to_i count_per_process = [limit, count_per_process].min if limit > 0 parallel_each(ids.each_slice(count_per_process)) do |ids_slice| # ids.each_slice(count_per_process) do |ids_slice| process_ids(node: base_node, ids: ids_slice) end end
ensure_keys_are_set!()
click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 15 def ensure_keys_are_set! raise Errors::InvalidConfigurationError, 'Id key must be set.' if id_key.blank? raise Errors::InvalidConfigurationError, 'Date key must be set.' if date_key.blank? end
filter_by_newest(groups:, date_key:)
click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 54 def filter_by_newest(groups:, date_key:) groups.map do |_, entries| # sort by date ASC and select the newest entries .sort_by do |x| x[date_key].is_a?(Time) ? x[date_key] : Timeliness.parse(x[date_key]) end.last end end
process_ids(node:, ids:)
click to toggle source
# File lib/dataflow/nodes/filter/newest_node.rb, line 38 def process_ids(node:, ids:) metatata = node.all(where: { id_key => ids }, fields: [id_key, date_key]) groups = metatata.group_by { |x| x[id_key] } newest_record_metadata = filter_by_newest(groups: groups, date_key: date_key) records = newest_record_metadata.map do |metadata| query = { id_key => metadata[id_key], date_key => metadata[date_key] } node.find(where: query) end.compact data_node.add(records: records) end