class Synapse::EventSourcing::EventSourcingRepository

Repository that initializes the state of aggregates using events read from an event store and appends changes to aggregates to an event store

Attributes

aggregate_factory[R]

@return [AggregateFactory]

conflict_resolver[RW]

@return [ConflictResolver]

event_store[R]

@return [EventStore]

snapshot_policy[RW]

@return [SnapshotPolicy]

snapshot_taker[RW]

@return [SnapshotTaker]

stream_decorators[R]

@return [Array<EventStreamDecorator>]

Public Class Methods

new(aggregate_factory, event_store, lock_manager) click to toggle source

@param [AggregateFactory] aggregate_factory @param [EventStore] event_store @param [LockManager] lock_manager @return [undefined]

Calls superclass method
# File lib/synapse/event_sourcing/repository.rb, line 28
def initialize(aggregate_factory, event_store, lock_manager)
  super lock_manager

  @aggregate_factory = aggregate_factory
  @event_store = event_store
  @stream_decorators = Array.new
end

Protected Instance Methods

aggregate_type() click to toggle source

@return [Class]

# File lib/synapse/event_sourcing/repository.rb, line 105
def aggregate_type
  @aggregate_factory.aggregate_type
end
delete_aggregate_with_lock(aggregate) click to toggle source

@param [AggregateRoot] aggregate @return [undefined]

# File lib/synapse/event_sourcing/repository.rb, line 88
def delete_aggregate_with_lock(aggregate)
  save_aggregate_with_lock aggregate
end
perform_load(aggregate_id, expected_version) click to toggle source

@raise [AggregateNotFoundError]

If the aggregate with the given identifier could not be found

@raise [AggregateDeletedError]

If the loaded aggregate has been marked as deleted

@raise [ConflictingModificationError]

If the expected version doesn't match the aggregate's actual version

@param [Object] aggregate_id @param [Integer] expected_version @return [AggregateRoot]

# File lib/synapse/event_sourcing/repository.rb, line 47
def perform_load(aggregate_id, expected_version)
  begin
    stream = @event_store.read_events type_identifier, aggregate_id
  rescue EventStore::StreamNotFoundError
    raise Repository::AggregateNotFoundError
  end

  @stream_decorators.each do |decorator|
    stream = decorator.decorate_for_read aggregate_type, aggregate_id, stream
  end

  aggregate = @aggregate_factory.create_aggregate aggregate_id, stream.peek

  stream = add_conflict_resolution stream, aggregate, expected_version

  aggregate.initialize_from_stream stream

  if aggregate.deleted?
    raise AggregateDeletedError.new type_identifier, aggregate_id
  end

  if expected_version && @conflict_resolver.nil?
    assert_version_expected aggregate, expected_version
  end

  aggregate
end
post_registration(aggregate) click to toggle source

@param [AggregateRoot] aggregate @return [undefined]

# File lib/synapse/event_sourcing/repository.rb, line 77
def post_registration(aggregate)
  if @snapshot_policy && @snapshot_taker
    listener =
      SnapshotUnitOfWorkListener.new type_identifier, aggregate, @snapshot_policy, @snapshot_taker

    register_listener listener
  end
end
save_aggregate_with_lock(aggregate) click to toggle source

@param [AggregateRoot] aggregate @return [undefined]

# File lib/synapse/event_sourcing/repository.rb, line 94
def save_aggregate_with_lock(aggregate)
  stream = aggregate.uncommitted_events
  @stream_decorators.reverse_each do |decorator|
    stream = decorator.decorate_for_append type_identifier, aggregate, stream
  end

  @event_store.append_events type_identifier, stream
  aggregate.mark_committed
end
type_identifier() click to toggle source

@return [String]

# File lib/synapse/event_sourcing/repository.rb, line 110
def type_identifier
  @aggregate_factory.type_identifier
end

Private Instance Methods

add_conflict_resolution(stream, aggregate, expected_version) click to toggle source

@param [DomainEventStream] stream @param [AggregateRoot] aggregate @param [Integer] expected_version @return [DomainEventStream]

# File lib/synapse/event_sourcing/repository.rb, line 120
def add_conflict_resolution(stream, aggregate, expected_version)
  unless expected_version && @conflict_resolver
    return stream
  end

  unseen_events = Array.new

  stream = CapturingEventStream.new stream, unseen_events, expected_version
  listener = ConflictResolvingUnitOfWorkListener.new aggregate, unseen_events, @conflict_resolver

  register_listener listener

  stream
end