class Sequent::Core::AggregateRoot
Base class for all your domain classes.
load_from_history
functionality to be loaded_from_history, meaning a stream of events.
Attributes
event_stream[R]
id[R]
sequence_number[R]
uncommitted_events[R]
Private Class Methods
load_from_history(stream, events)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 46 def self.load_from_history(stream, events) first, *rest = events if first.is_a? SnapshotEvent # rubocop:disable Security/MarshalLoad aggregate_root = Marshal.load(Base64.decode64(first.data)) # rubocop:enable Security/MarshalLoad rest.each { |x| aggregate_root.apply_event(x) } else aggregate_root = allocate # allocate without calling new aggregate_root.load_from_history(stream, events) end aggregate_root end
new(id)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 60 def initialize(id) @id = id @uncommitted_events = [] @sequence_number = 1 @event_stream = EventStream.new aggregate_type: self.class.name, aggregate_id: id, snapshot_threshold: self.class.snapshot_default_threshold end
stream_from_history(stream)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 93 def self.stream_from_history(stream) aggregate_root = allocate aggregate_root.initialize_for_streaming(stream) aggregate_root end
Private Instance Methods
apply(event, params = {})
click to toggle source
Provide subclasses nice DSL to ‘apply’ events via:
def send_invoice apply InvoiceSentEvent, send_date: Time.now end
# File lib/sequent/core/aggregate_root.rb, line 129 def apply(event, params = {}) event = build_event(event, params) if event.is_a?(Class) apply_event(event) @uncommitted_events << event end
apply_event(event)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 112 def apply_event(event) handle_message(event) @sequence_number = event.sequence_number + 1 end
apply_if_changed(event_class, args = {})
click to toggle source
Only apply the event if one of the attributes of the event changed
on NameSet do |event|
@first_name = event.first_name @last_name = event.last_name
end
# The event is applied apply_if_changed
NameSet, first_name: ‘Ben’, last_name: ‘Vonk’
# This event is not applied apply_if_changed
NameSet, first_name: ‘Ben’, last_name: ‘Vonk’
# File lib/sequent/core/aggregate_root.rb, line 148 def apply_if_changed(event_class, args = {}) if args.empty? apply event_class elsif self.class .event_attribute_keys(event_class) .any? { |k| instance_variable_get(:"@#{k}") != args[k.to_sym] } apply event_class, args end end
build_event(event, params = {})
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 119 def build_event(event, params = {}) event.new(params.merge({aggregate_id: @id, sequence_number: @sequence_number})) end
clear_events()
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 103 def clear_events @uncommitted_events = [] end
initialize_for_streaming(stream)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 79 def initialize_for_streaming(stream) @uncommitted_events = [] @sequence_number = 1 @event_stream = stream end
load_from_history(stream, events)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 69 def load_from_history(stream, events) fail 'Empty history' if events.empty? @id = events.first.aggregate_id @uncommitted_events = [] @sequence_number = 1 @event_stream = stream events.each { |event| apply_event(event) } end
stream_from_history(stream_events)
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 85 def stream_from_history(stream_events) _stream, event = stream_events fail 'Empty history' if event.blank? @id ||= event.aggregate_id apply_event(event) end
take_snapshot!()
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 107 def take_snapshot! snapshot = build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self)) @uncommitted_events << snapshot end
to_s()
click to toggle source
# File lib/sequent/core/aggregate_root.rb, line 99 def to_s "#{self.class.name}: #{@id}" end