class Synapse::DuplicationRecorder

Records messages as they are sent to a bus so that duplicates can be tracked and prevented. Inspired by the de-duplication manager from Lokad.CQRS

This implementation is thread-safe

Public Class Methods

new() click to toggle source
# File lib/synapse/common/duplication.rb, line 7
def initialize
  @recorded = Hash.new
  @mutex = Mutex.new
end

Public Instance Methods

forget(message) click to toggle source

Forgets the given message

@param [Message] message @return [undefined]

# File lib/synapse/common/duplication.rb, line 39
def forget(message)
  @mutex.synchronize do
    @recorded.delete message.id
  end
end
forget_older_than(threshold) click to toggle source

Cleans up messages that are older than the given timestamp

@param [Time] threshold @return [undefined]

# File lib/synapse/common/duplication.rb, line 49
def forget_older_than(threshold)
  @mutex.synchronize do
    @recorded.delete_if do |message_id, timestamp|
      timestamp <= threshold
    end
  end
end
record(message) click to toggle source

Records the given message so that duplicates can be ignored

@raise [DuplicationError] If a duplicate message has been detected @param [Message] message @return [undefined]

# File lib/synapse/common/duplication.rb, line 17
def record(message)
  @mutex.synchronize do
    if @recorded.has_key? message.id
      raise DuplicationError
    end

    @recorded.store message.id, Time.now
  end
end
recorded?(message) click to toggle source

Returns true if the given message has been recorded

@param [Message] message @return [Boolean]

# File lib/synapse/common/duplication.rb, line 31
def recorded?(message)
  @recorded.has_key? message.id
end