class Synapse::UnitOfWork::UnitOfWork

Default implementation of a unit of work

Public Class Methods

new(provider) click to toggle source

@param [UnitOfWorkProvider] provider @return [undefined]

# File lib/synapse/uow/uow.rb, line 7
def initialize(provider)
  super

  @aggregates = Hash.new
  @events = Hash.new
  @listeners = UnitOfWorkListenerCollection.new
end

Public Instance Methods

publish_event(event, event_bus) click to toggle source

Buffers an event for publication to the given event bus until this unit of work is committed

@api public @param [EventMessage] event @param [EventBus] event_bus @return [EventMessage] The event that will be published to the event bus

# File lib/synapse/uow/uow.rb, line 70
def publish_event(event, event_bus)
  event = @listeners.on_event_registered self, event
  event.tap do
    begin
      events = @events.fetch event_bus
    rescue KeyError
      events = @events.store event_bus, Array.new
    end

    events.push event
  end
end
register_aggregate(aggregate, event_bus, &storage_callback) click to toggle source

Registers an aggregate with this unit of work

This unit of work adds an event listener to the aggregate so that any events generated are published to the given event bus when this unit of work is committed.

The provided storage callback is used to commit the aggregate to its respective underlying storage mechanism.

If there is already an aggregate registered with this unit of work of the same type and with the same identifier, that aggregate will be returned instead of the given aggregate.

@api public @yield [AggregateRoot] Deferred until the aggregate is stored @param [AggregateRoot] aggregate @param [EventBus] event_bus @param [Proc] storage_callback @return [AggregateRoot]

# File lib/synapse/uow/uow.rb, line 50
def register_aggregate(aggregate, event_bus, &storage_callback)
  similar = find_similar_aggregate aggregate
  return similar if similar

  aggregate.add_registration_listener do |event|
    publish_event event, event_bus
  end

  @aggregates.store aggregate, storage_callback

  aggregate
end
register_listener(listener) click to toggle source

Registers a listener that is notified of state changes in this unit of work

@api public @param [UnitOfWorkListener] listener @return [undefined]

# File lib/synapse/uow/uow.rb, line 28
def register_listener(listener)
  @listeners.push listener
end
transaction_manager=(transaction_manager) click to toggle source

Sets the transaction manager that will be used by this unit of work

@api public @raise [RuntimeError] If unit of work has been started @param [TransactionManager] transaction_manager @return [undefined]

# File lib/synapse/uow/uow.rb, line 89
def transaction_manager=(transaction_manager)
  if started?
    raise 'Transaction manager not permitted to change after unit of work has started'
  end

  @transaction_manager = transaction_manager
end
transactional?() click to toggle source

Returns true if this unit of work is bound to a transaction

@api public @return [Boolean]

# File lib/synapse/uow/uow.rb, line 19
def transactional?
  !!@transaction_manager
end

Protected Instance Methods

notify_cleanup() click to toggle source

@return [undefined]

# File lib/synapse/uow/uow.rb, line 128
def notify_cleanup
  @listeners.on_cleanup self
end
notify_prepare_commit() click to toggle source

@return [undefined]

# File lib/synapse/uow/uow.rb, line 133
def notify_prepare_commit
  @listeners.on_prepare_commit self, @aggregates.keys, @events
end
perform_commit() click to toggle source

@return [undefined]

# File lib/synapse/uow/uow.rb, line 100
def perform_commit
  publish_events
  commit_inner_units

  if transactional?
    @listeners.on_prepare_transaction_commit self, @transaction
    @transaction_manager.commit @transaction
  end

  @listeners.after_commit self
end
perform_rollback(cause = nil) click to toggle source

@param [Error] cause @return [undefined]

# File lib/synapse/uow/uow.rb, line 114
def perform_rollback(cause = nil)
  @aggregates.clear
  @events.clear

  begin
    if @transaction
      @transaction_manager.rollback @transaction
    end
  ensure
    @listeners.on_rollback self, cause
  end
end
perform_start() click to toggle source

@return [undefined]

# File lib/synapse/uow/uow.rb, line 138
def perform_start
  if transactional?
    @transaction = @transaction_manager.start
  end

  @listeners.on_start self
end
store_aggregates() click to toggle source

@return [undefined]

# File lib/synapse/uow/uow.rb, line 147
def store_aggregates
  @aggregates.each_pair do |aggregate, storage_callback|
    storage_callback.call aggregate
  end
  @aggregates.clear
end

Private Instance Methods

find_similar_aggregate(aggregate) click to toggle source

Checks if an aggregate of the same type and identifier as the given aggregate has been previously registered with this unit work. If one is found, it is returned.

@param [AggregateRoot] aggregate @return [AggregateRoot] Returns nil if no similar aggregate was found

# File lib/synapse/uow/uow.rb, line 161
def find_similar_aggregate(aggregate)
  @aggregates.each_key do |candidate|
    if aggregate.class === candidate && aggregate.id == candidate.id
      return candidate
    end
  end

  return
end
publish_events() click to toggle source

Continually publishes all buffered events to their respective event buses until all events have been published

# File lib/synapse/uow/uow.rb, line 173
def publish_events
  until @events.empty?
    @events.keys.each do |event_bus|
      events = @events.delete event_bus
      event_bus.publish events
    end
  end
end