class Dataflow::Nodes::SnapshotNode

TODO: extend the unique node?

Public Instance Methods

add(records:) click to toggle source
Calls superclass method Dataflow::Nodes::DataNode#add
# File lib/dataflow/nodes/snapshot_node.rb, line 32
def add(records:)
  raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array)
  records = records.compact
  return if records.blank?

  # TODO: create a chain of behavior "before add"
  rename_dotted_fields(records: records)
  add_internal_timestamp(records: records)

  records.delete_if do |record|
    convert_update_at_key(record)
    is_record_redundant?(record: record)
  end.compact
  super(records: records)
end
set_defaults() click to toggle source
Calls superclass method Dataflow::Nodes::DataNode#set_defaults
# File lib/dataflow/nodes/snapshot_node.rb, line 16
def set_defaults
  super

  self.indexes ||= []
  # get rid of keys/string confusion
  self.indexes = JSON.parse(self.indexes.to_json)

  # add keys for the index, updated_at and unique keys
  self.indexes += [{ 'key' => index_key }] if index_key
  self.indexes += [{ 'key' => updated_at_key }] if updated_at_key
  self.indexes += [{ 'key' => [index_key, updated_at_key], 'unique' => true }] if index_key && updated_at_key
  self.indexes.uniq!

  self.updated_at ||= Time.now
end

Private Instance Methods

convert_update_at_key(record) click to toggle source
# File lib/dataflow/nodes/snapshot_node.rb, line 70
def convert_update_at_key(record)
  return if record[updated_at_key].is_a?(Time)

  # try to parse as a string
  record[updated_at_key] = Time.parse(record[updated_at_key])
rescue TypeError
  # try to parse as a timestamp
  record[updated_at_key] = Time.at(record[updated_at_key])
end
is_record_redundant?(record:) click to toggle source

If this record already exists, and only the updated_at key changed, but the rest of the content is the same, we will consider it to be redundant

# File lib/dataflow/nodes/snapshot_node.rb, line 53
def is_record_redundant?(record:)
  id = record[index_key]
  previous_record = db_adapter.find(where: { index_key => id },
                                    sort: { updated_at_key => -1 })
  return false if previous_record.blank?

  has_same_content = previous_record.keys == record.keys
  has_same_content &&= previous_record.keys.all? do |k|
    # we allow the updated_at key to change, or the mojaco time stamp
    next true if k == updated_at_key || k == internal_timestamp_key
    # but most importantly, the rest of the content should be the same
    record[k] == previous_record[k]
  end

  has_same_content
end