class EventStore::EventAppender
Public Class Methods
new(aggregate)
click to toggle source
# File lib/event_store/event_appender.rb, line 4 def initialize aggregate @aggregate = aggregate end
Public Instance Methods
append(raw_events)
click to toggle source
# File lib/event_store/event_appender.rb, line 8 def append raw_events EventStore.db.transaction do set_current_version prepared_events = raw_events.map do |raw_event| event = prepare_event(raw_event) validate! event raise concurrency_error(event) if has_concurrency_issue?(event) event end # All concurrency issues need to be checked before persisting any of the events # Otherwise, the newly appended events may raise erroneous concurrency errors result = @aggregate.events.multi_insert(prepared_events) store_snapshot(prepared_events) unless result.nil? result end end
store_snapshot(prepared_events)
click to toggle source
# File lib/event_store/event_appender.rb, line 26 def store_snapshot(prepared_events) r = EventStore.redis current_version_numbers = r.hgetall(@aggregate.snapshot_version_table) current_version_numbers.default = -1 valid_snapshot_events = [] valid_snapshot_versions = [] prepared_events.each do |event| if event[:version].to_i > current_version_numbers[event[:fully_qualified_name]].to_i valid_snapshot_events << event[:fully_qualified_name] valid_snapshot_events << (event[:version].to_s + EventStore::SNAPSHOT_DELIMITER + event[:serialized_event] + EventStore::SNAPSHOT_DELIMITER + event[:occurred_at].to_s) valid_snapshot_versions << event[:fully_qualified_name] valid_snapshot_versions << event[:version] end end unless valid_snapshot_versions.empty? last_version = valid_snapshot_versions.last valid_snapshot_versions << :current_version valid_snapshot_versions << last_version.to_i r.multi do r.hmset(@aggregate.snapshot_version_table, valid_snapshot_versions) r.hmset(@aggregate.snapshot_table, valid_snapshot_events) end end end
Private Instance Methods
concurrency_error(event)
click to toggle source
# File lib/event_store/event_appender.rb, line 65 def concurrency_error event ConcurrencyError.new("The version of the event being added (version #{event[:version]}) is <= the current version (version #{current_version})") end
current_version()
click to toggle source
# File lib/event_store/event_appender.rb, line 70 def current_version @current_version ||= @aggregate.version end
Also aliased as: set_current_version
has_concurrency_issue?(event)
click to toggle source
# File lib/event_store/event_appender.rb, line 52 def has_concurrency_issue? event event[:version] <= current_version end
prepare_event(raw_event)
click to toggle source
# File lib/event_store/event_appender.rb, line 56 def prepare_event raw_event raise ArgumentError.new("Cannot Append a Nil Event") unless raw_event { :version => raw_event.version.to_i, :aggregate_id => raw_event.aggregate_id, :occurred_at => Time.parse(raw_event.occurred_at.to_s).utc, #to_s truncates microseconds, which brake Time equality :serialized_event => EventStore.escape_bytea(raw_event.serialized_event), :fully_qualified_name => raw_event.fully_qualified_name } end
validate!(event_hash)
click to toggle source
# File lib/event_store/event_appender.rb, line 75 def validate! event_hash [:aggregate_id, :fully_qualified_name, :occurred_at, :serialized_event, :version].each do |attribute_name| if event_hash[attribute_name].to_s.strip.empty? raise AttributeMissingError, "value required for #{attribute_name}" end end end