class Synapse::DuplicationRecorder
Records messages as they are sent to a bus so that duplicates can be tracked and prevented. Inspired by the de-duplication manager from Lokad.CQRS
This implementation is thread-safe
Public Class Methods
new()
click to toggle source
# File lib/synapse/common/duplication.rb, line 7 def initialize @recorded = Hash.new @mutex = Mutex.new end
Public Instance Methods
forget(message)
click to toggle source
Forgets the given message
@param [Message] message @return [undefined]
# File lib/synapse/common/duplication.rb, line 39 def forget(message) @mutex.synchronize do @recorded.delete message.id end end
forget_older_than(threshold)
click to toggle source
Cleans up messages that are older than the given timestamp
@param [Time] threshold @return [undefined]
# File lib/synapse/common/duplication.rb, line 49 def forget_older_than(threshold) @mutex.synchronize do @recorded.delete_if do |message_id, timestamp| timestamp <= threshold end end end
record(message)
click to toggle source
Records the given message so that duplicates can be ignored
@raise [DuplicationError] If a duplicate message has been detected @param [Message] message @return [undefined]
# File lib/synapse/common/duplication.rb, line 17 def record(message) @mutex.synchronize do if @recorded.has_key? message.id raise DuplicationError end @recorded.store message.id, Time.now end end
recorded?(message)
click to toggle source
Returns true if the given message has been recorded
@param [Message] message @return [Boolean]
# File lib/synapse/common/duplication.rb, line 31 def recorded?(message) @recorded.has_key? message.id end