class Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor

The ReplayOptimizedPostgresPersistor is optimized for bulk loading records in a Postgres database.

Depending on the amount of records it uses CSV import, otherwise statements are batched using normal sql.

Rebuilding the view state (or projection) of an aggregate typically consists of an initial insert and then many updates and maybe a delete. With a normal Persistor (like ActiveRecordPersistor) each action is executed to the database. This persistor creates an in-memory store first and finally flushes the in-memory store to the database. This can significantly reduce the amount of queries to the database. E.g. 1 insert, 6 updates is only a single insert using this Persistor.

After lot of experimenting this turned out to be the fastest way to to bulk inserts in the database. You can tweak the amount of records in the CSV via insert_with_csv_size before it flushes to the database to gain (or loose) speed.

It is highly recommended to create indices on the in memory record_store to speed up the processing. By default all records are indexed by aggregate_id if they have such a property.

Example:

class InvoiceProjector < Sequent::Core::Projector
  on RecipientMovedEvent do |event|
    update_all_records(
      InvoiceRecord,
      { aggregate_id: event.aggregate_id, recipient_id: event.recipient.aggregate_id },
      { recipient_street: event.recipient.street },
    end
  end
end

In this case it is wise to create an index on InvoiceRecord on the aggregate_id and recipient_id attributes like you would in the database. Note that previous versions of this class supported multi-column indexes. These are now split into multiple single-column indexes and the results of each index is combined using set-intersection. This reduces the amount of memory used and makes it possible to use an index in more cases (whenever an indexed attribute is present in the where clause the index will be used, so not all attributes need to be present).

Example:

ReplayOptimizedPostgresPersistor.new(
  50,
  {InvoiceRecord => [:aggregate_id, :recipient_id]}
)