class EventStore::Aggregate

Attributes

event_table[R]
id[R]
snapshot_table[R]
snapshot_version_table[R]
type[R]

Public Class Methods

count() click to toggle source
# File lib/event_store/aggregate.rb, line 6
def self.count
  EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).count
end
ids(offset, limit) click to toggle source
# File lib/event_store/aggregate.rb, line 10
def self.ids(offset, limit)
  EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).select(:aggregate_id).order(:aggregate_id).limit(limit, offset).all.map{|item| item[:aggregate_id]}
end
new(id, type = EventStore.table_name) click to toggle source
# File lib/event_store/aggregate.rb, line 14
def initialize(id, type = EventStore.table_name)
  @id = id
  @type = type
  @schema = EventStore.schema
  @event_table = EventStore.fully_qualified_table
  @snapshot_table = "#{@type}_snapshots_for_#{@id}"
  @snapshot_version_table = "#{@type}_snapshot_versions_for_#{@id}"
end

Public Instance Methods

delete_events!() click to toggle source
# File lib/event_store/aggregate.rb, line 76
def delete_events!
  events.delete
end
delete_snapshot!() click to toggle source
# File lib/event_store/aggregate.rb, line 72
def delete_snapshot!
  EventStore.redis.del [@snapshot_table, @snapshot_version_table]
end
event_stream() click to toggle source
# File lib/event_store/aggregate.rb, line 60
def event_stream
  events.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end
event_stream_between(start_time, end_time, fully_qualified_names = []) click to toggle source
# File lib/event_store/aggregate.rb, line 54
def event_stream_between(start_time, end_time, fully_qualified_names = [])
  query = events.where(occurred_at: start_time..end_time)
  query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any?
  query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end
events() click to toggle source
# File lib/event_store/aggregate.rb, line 23
def events
  @events_query ||= EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).order(:version)
end
events_from(version_number, max = nil) click to toggle source
# File lib/event_store/aggregate.rb, line 47
def events_from(version_number, max = nil)
  events.limit(max).where{ version >= version_number.to_i }.all.map do |event|
    event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event])
    event
  end
end
last_event() click to toggle source
# File lib/event_store/aggregate.rb, line 64
def last_event
  snapshot.last
end
rebuild_snapshot!() click to toggle source
# File lib/event_store/aggregate.rb, line 41
def rebuild_snapshot!
  delete_snapshot!
  corrected_events = events.all.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e}
  EventAppender.new(self).store_snapshot(corrected_events)
end
snapshot() click to toggle source
# File lib/event_store/aggregate.rb, line 27
def snapshot
  events_hash = auto_rebuild_snapshot(read_raw_snapshot)
  snap = []
  events_hash.each_pair do |key, value|
    raw_event            = value.split(EventStore::SNAPSHOT_DELIMITER)
    fully_qualified_name = key
    version              = raw_event.first.to_i
    serialized_event     = EventStore.unescape_bytea(raw_event[1])
    occurred_at          = Time.parse(raw_event.last)
    snap << SerializedEvent.new(fully_qualified_name, serialized_event, version, occurred_at)
  end
  snap.sort {|a,b| a.version <=> b.version}
end
version() click to toggle source
# File lib/event_store/aggregate.rb, line 68
def version
  (EventStore.redis.hget(@snapshot_version_table, :current_version) || -1).to_i
end

Private Instance Methods

auto_rebuild_snapshot(events_hash) click to toggle source
# File lib/event_store/aggregate.rb, line 81
def auto_rebuild_snapshot(events_hash)
  return events_hash unless events_hash.empty?
  event = events.select(:version).limit(1).all
  return events_hash if event.nil?
  rebuild_snapshot!
  events_hash = read_raw_snapshot
end
read_raw_snapshot() click to toggle source
# File lib/event_store/aggregate.rb, line 89
def read_raw_snapshot
  EventStore.redis.hgetall(@snapshot_table)
end