class Sequent::Core::AggregateRoot

Base class for all your domain classes.

load_from_history functionality to be loaded_from_history, meaning a stream of events.

Attributes

event_stream[R]
id[R]
sequence_number[R]
uncommitted_events[R]

Private Class Methods

load_from_history(stream, events) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 46
def self.load_from_history(stream, events)
  first, *rest = events
  if first.is_a? SnapshotEvent
    # rubocop:disable Security/MarshalLoad
    aggregate_root = Marshal.load(Base64.decode64(first.data))
    # rubocop:enable Security/MarshalLoad
    rest.each { |x| aggregate_root.apply_event(x) }
  else
    aggregate_root = allocate # allocate without calling new
    aggregate_root.load_from_history(stream, events)
  end
  aggregate_root
end
new(id) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 60
def initialize(id)
  @id = id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = EventStream.new aggregate_type: self.class.name,
                                  aggregate_id: id,
                                  snapshot_threshold: self.class.snapshot_default_threshold
end
stream_from_history(stream) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 93
def self.stream_from_history(stream)
  aggregate_root = allocate
  aggregate_root.initialize_for_streaming(stream)
  aggregate_root
end

Private Instance Methods

apply(event, params = {}) click to toggle source

Provide subclasses nice DSL to ‘apply’ events via:

def send_invoice
  apply InvoiceSentEvent, send_date: Time.now
end
# File lib/sequent/core/aggregate_root.rb, line 129
def apply(event, params = {})
  event = build_event(event, params) if event.is_a?(Class)
  apply_event(event)
  @uncommitted_events << event
end
apply_event(event) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 112
def apply_event(event)
  handle_message(event)
  @sequence_number = event.sequence_number + 1
end
apply_if_changed(event_class, args = {}) click to toggle source

Only apply the event if one of the attributes of the event changed

on NameSet do |event|

@first_name = event.first_name
@last_name = event.last_name

end

# The event is applied apply_if_changed NameSet, first_name: ‘Ben’, last_name: ‘Vonk’

# This event is not applied apply_if_changed NameSet, first_name: ‘Ben’, last_name: ‘Vonk’

# File lib/sequent/core/aggregate_root.rb, line 148
def apply_if_changed(event_class, args = {})
  if args.empty?
    apply event_class
  elsif self.class
      .event_attribute_keys(event_class)
      .any? { |k| instance_variable_get(:"@#{k}") != args[k.to_sym] }
    apply event_class, args
  end
end
build_event(event, params = {}) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 119
def build_event(event, params = {})
  event.new(params.merge({aggregate_id: @id, sequence_number: @sequence_number}))
end
clear_events() click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 103
def clear_events
  @uncommitted_events = []
end
initialize_for_streaming(stream) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 79
def initialize_for_streaming(stream)
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
end
load_from_history(stream, events) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 69
def load_from_history(stream, events)
  fail 'Empty history' if events.empty?

  @id = events.first.aggregate_id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
  events.each { |event| apply_event(event) }
end
stream_from_history(stream_events) click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 85
def stream_from_history(stream_events)
  _stream, event = stream_events
  fail 'Empty history' if event.blank?

  @id ||= event.aggregate_id
  apply_event(event)
end
take_snapshot!() click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 107
def take_snapshot!
  snapshot = build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
  @uncommitted_events << snapshot
end
to_s() click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 99
def to_s
  "#{self.class.name}: #{@id}"
end