class EventStore::EventAppender

Public Class Methods

new(aggregate) click to toggle source
# File lib/event_store/event_appender.rb, line 4
def initialize aggregate
  @aggregate = aggregate
end

Public Instance Methods

append(raw_events) click to toggle source
# File lib/event_store/event_appender.rb, line 8
def append raw_events
  EventStore.db.transaction do
    set_current_version

    prepared_events = raw_events.map do |raw_event|
      event = prepare_event(raw_event)
      validate! event
      raise concurrency_error(event) if has_concurrency_issue?(event)
      event
    end
    # All concurrency issues need to be checked before persisting any of the events
    # Otherwise, the newly appended events may raise erroneous concurrency errors
    result = @aggregate.events.multi_insert(prepared_events)
    store_snapshot(prepared_events) unless result.nil?
    result
  end
end
store_snapshot(prepared_events) click to toggle source
# File lib/event_store/event_appender.rb, line 26
def store_snapshot(prepared_events)
  r = EventStore.redis
  current_version_numbers = r.hgetall(@aggregate.snapshot_version_table)
  current_version_numbers.default = -1
  valid_snapshot_events = []
  valid_snapshot_versions = []
  prepared_events.each do |event|
    if event[:version].to_i > current_version_numbers[event[:fully_qualified_name]].to_i
      valid_snapshot_events   << event[:fully_qualified_name]
      valid_snapshot_events   << (event[:version].to_s + EventStore::SNAPSHOT_DELIMITER + event[:serialized_event] + EventStore::SNAPSHOT_DELIMITER + event[:occurred_at].to_s)
      valid_snapshot_versions << event[:fully_qualified_name]
      valid_snapshot_versions << event[:version]
    end
  end
  unless valid_snapshot_versions.empty?
    last_version            = valid_snapshot_versions.last
    valid_snapshot_versions << :current_version
    valid_snapshot_versions << last_version.to_i
    r.multi do
      r.hmset(@aggregate.snapshot_version_table, valid_snapshot_versions)
      r.hmset(@aggregate.snapshot_table, valid_snapshot_events)
    end
  end
end

Private Instance Methods

concurrency_error(event) click to toggle source
# File lib/event_store/event_appender.rb, line 65
def concurrency_error event
  ConcurrencyError.new("The version of the event being added (version #{event[:version]}) is <= the current version (version #{current_version})")
end
current_version() click to toggle source
# File lib/event_store/event_appender.rb, line 70
def current_version
  @current_version ||= @aggregate.version
end
Also aliased as: set_current_version
has_concurrency_issue?(event) click to toggle source
# File lib/event_store/event_appender.rb, line 52
def has_concurrency_issue? event
  event[:version] <= current_version
end
prepare_event(raw_event) click to toggle source
# File lib/event_store/event_appender.rb, line 56
def prepare_event raw_event
  raise ArgumentError.new("Cannot Append a Nil Event") unless raw_event
  { :version              => raw_event.version.to_i,
    :aggregate_id         => raw_event.aggregate_id,
    :occurred_at          => Time.parse(raw_event.occurred_at.to_s).utc, #to_s truncates microseconds, which brake Time equality
    :serialized_event     => EventStore.escape_bytea(raw_event.serialized_event),
    :fully_qualified_name => raw_event.fully_qualified_name }
end
set_current_version()
Alias for: current_version
validate!(event_hash) click to toggle source
# File lib/event_store/event_appender.rb, line 75
def validate! event_hash
  [:aggregate_id, :fully_qualified_name, :occurred_at, :serialized_event, :version].each do |attribute_name|
    if event_hash[attribute_name].to_s.strip.empty?
      raise AttributeMissingError, "value required for #{attribute_name}"
    end
  end
end