class Synapse::EventSourcing::EventSourcingRepository
Repository
that initializes the state of aggregates using events read from an event store and appends changes to aggregates to an event store
Attributes
@return [AggregateFactory]
@return [ConflictResolver]
@return [EventStore]
@return [SnapshotPolicy]
@return [SnapshotTaker]
@return [Array<EventStreamDecorator>]
Public Class Methods
@param [AggregateFactory] aggregate_factory
@param [EventStore] event_store
@param [LockManager] lock_manager @return [undefined]
# File lib/synapse/event_sourcing/repository.rb, line 28 def initialize(aggregate_factory, event_store, lock_manager) super lock_manager @aggregate_factory = aggregate_factory @event_store = event_store @stream_decorators = Array.new end
Protected Instance Methods
@return [Class]
# File lib/synapse/event_sourcing/repository.rb, line 105 def aggregate_type @aggregate_factory.aggregate_type end
@param [AggregateRoot] aggregate @return [undefined]
# File lib/synapse/event_sourcing/repository.rb, line 88 def delete_aggregate_with_lock(aggregate) save_aggregate_with_lock aggregate end
@raise [AggregateNotFoundError]
If the aggregate with the given identifier could not be found
@raise [AggregateDeletedError]
If the loaded aggregate has been marked as deleted
@raise [ConflictingModificationError]
If the expected version doesn't match the aggregate's actual version
@param [Object] aggregate_id @param [Integer] expected_version @return [AggregateRoot]
# File lib/synapse/event_sourcing/repository.rb, line 47 def perform_load(aggregate_id, expected_version) begin stream = @event_store.read_events type_identifier, aggregate_id rescue EventStore::StreamNotFoundError raise Repository::AggregateNotFoundError end @stream_decorators.each do |decorator| stream = decorator.decorate_for_read aggregate_type, aggregate_id, stream end aggregate = @aggregate_factory.create_aggregate aggregate_id, stream.peek stream = add_conflict_resolution stream, aggregate, expected_version aggregate.initialize_from_stream stream if aggregate.deleted? raise AggregateDeletedError.new type_identifier, aggregate_id end if expected_version && @conflict_resolver.nil? assert_version_expected aggregate, expected_version end aggregate end
@param [AggregateRoot] aggregate @return [undefined]
# File lib/synapse/event_sourcing/repository.rb, line 77 def post_registration(aggregate) if @snapshot_policy && @snapshot_taker listener = SnapshotUnitOfWorkListener.new type_identifier, aggregate, @snapshot_policy, @snapshot_taker register_listener listener end end
@param [AggregateRoot] aggregate @return [undefined]
# File lib/synapse/event_sourcing/repository.rb, line 94 def save_aggregate_with_lock(aggregate) stream = aggregate.uncommitted_events @stream_decorators.reverse_each do |decorator| stream = decorator.decorate_for_append type_identifier, aggregate, stream end @event_store.append_events type_identifier, stream aggregate.mark_committed end
@return [String]
# File lib/synapse/event_sourcing/repository.rb, line 110 def type_identifier @aggregate_factory.type_identifier end
Private Instance Methods
@param [DomainEventStream] stream @param [AggregateRoot] aggregate @param [Integer] expected_version @return [DomainEventStream]
# File lib/synapse/event_sourcing/repository.rb, line 120 def add_conflict_resolution(stream, aggregate, expected_version) unless expected_version && @conflict_resolver return stream end unseen_events = Array.new stream = CapturingEventStream.new stream, unseen_events, expected_version listener = ConflictResolvingUnitOfWorkListener.new aggregate, unseen_events, @conflict_resolver register_listener listener stream end