class Dataflow::Nodes::SnapshotNode
TODO: extend the unique node?
Public Instance Methods
add(records:)
click to toggle source
Calls superclass method
Dataflow::Nodes::DataNode#add
# File lib/dataflow/nodes/snapshot_node.rb, line 32 def add(records:) raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array) records = records.compact return if records.blank? # TODO: create a chain of behavior "before add" rename_dotted_fields(records: records) add_internal_timestamp(records: records) records.delete_if do |record| convert_update_at_key(record) is_record_redundant?(record: record) end.compact super(records: records) end
set_defaults()
click to toggle source
Calls superclass method
Dataflow::Nodes::DataNode#set_defaults
# File lib/dataflow/nodes/snapshot_node.rb, line 16 def set_defaults super self.indexes ||= [] # get rid of keys/string confusion self.indexes = JSON.parse(self.indexes.to_json) # add keys for the index, updated_at and unique keys self.indexes += [{ 'key' => index_key }] if index_key self.indexes += [{ 'key' => updated_at_key }] if updated_at_key self.indexes += [{ 'key' => [index_key, updated_at_key], 'unique' => true }] if index_key && updated_at_key self.indexes.uniq! self.updated_at ||= Time.now end
Private Instance Methods
convert_update_at_key(record)
click to toggle source
# File lib/dataflow/nodes/snapshot_node.rb, line 70 def convert_update_at_key(record) return if record[updated_at_key].is_a?(Time) # try to parse as a string record[updated_at_key] = Time.parse(record[updated_at_key]) rescue TypeError # try to parse as a timestamp record[updated_at_key] = Time.at(record[updated_at_key]) end
is_record_redundant?(record:)
click to toggle source
If this record already exists, and only the updated_at key changed, but the rest of the content is the same, we will consider it to be redundant
# File lib/dataflow/nodes/snapshot_node.rb, line 53 def is_record_redundant?(record:) id = record[index_key] previous_record = db_adapter.find(where: { index_key => id }, sort: { updated_at_key => -1 }) return false if previous_record.blank? has_same_content = previous_record.keys == record.keys has_same_content &&= previous_record.keys.all? do |k| # we allow the updated_at key to change, or the mojaco time stamp next true if k == updated_at_key || k == internal_timestamp_key # but most importantly, the rest of the content should be the same record[k] == previous_record[k] end has_same_content end