class Dataflow::Nodes::Filter::DropWhileNode
Makes a sequency based on a key (e.g. id), and order it (e.g. by time), and then applies the same logic as ruby's drop_while. See: ruby-doc.org/core-2.4.0/Array.html#method-i-drop_while
Constants
- VALID_MODES
- VALID_OPS
Public Instance Methods
compute_impl()
click to toggle source
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 27 def compute_impl base_node = dependencies.first records_count = base_node.count return if records_count == 0 ids = base_node.all(fields: [id_key]) do |results| results.distinct(id_key) end count_per_process = (ids.count / Parallel.processor_count.to_f).ceil limit = limit_per_process.to_i count_per_process = [limit, count_per_process].min if limit > 0 parallel_each(ids.each_slice(count_per_process)) do |ids_slice| # ids.each_slice(count_per_process) do |ids_slice| process_ids(node: base_node, ids: ids_slice) end end
Private Instance Methods
drop_while(group)
click to toggle source
apply a single drop_while
on the group.
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 78 def drop_while(group) value_tokens = record_dig_tokens(key: field, use_sym: dependencies.first.use_symbols?) case op.to_s.downcase when 'eq' group.drop_while { |x| x.dig(*value_tokens) == value } when 'ne' group.drop_while { |x| x.dig(*value_tokens) != value } when 'le' group.drop_while do |x| val = x.dig(*value_tokens) next true if val.nil? # drop nil values val <= value end when 'lt' group.drop_while do |x| val = x.dig(*value_tokens) next true if val.nil? # drop nil values val < value end when 'ge' group.drop_while do |x| val = x.dig(*value_tokens) next true if val.nil? # drop nil values val >= value end when 'gt' group.drop_while do |x| val = x.dig(*value_tokens) next true if val.nil? # drop nil values val > value end else raise Errors::InvalidConfigurationError, "Invalid op key: #{op}" end end
process_group(record_group)
click to toggle source
sort the record group and then proceed to drop the elements that satisfy the condition
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 60 def process_group(record_group) sort_tokens = record_dig_tokens(key: sort_by, use_sym: dependencies.first.use_symbols?) group = record_group.sort_by { |x| x.dig(*sort_tokens) } group = group.reverse unless sort_asc modes = drop_mode == 'both' ? %w(left right) : [drop_mode] modes.each do |mode| # if we want to drop on the right, # reverse the array, drop on the left and reverse again group = group.reverse if mode == 'right' group = drop_while(group) group = group.reverse if mode == 'right' end group end
process_ids(node:, ids:)
click to toggle source
# File lib/dataflow/nodes/filter/drop_while_node.rb, line 47 def process_ids(node:, ids:) records = node.all(where: { id_key => ids }) groups = records.group_by { |x| x[id_key] } result = groups.flat_map do |_, group| process_group(group) end.compact data_node.add(records: result) end