class Sequent::Migrations::ViewSchema

ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.

The following migrations are supported:

To maintain your migrations you need to:

  1. Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`

  2. Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`

  3. Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)

  4. Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`

  5. If you want to run an ‘alter_table` migration ensure that

a sql file named `table_name_VERSION.sql` exists.

Example:

class AppMigrations < Sequent::Migrations::Projectors

def self.version
  '3'
end

def self.versions
  {
    '1' => [Sequent.all_projectors],
    '2' => [
      UserProjector,
      InvoiceProjector,
    ],
    '3' => [
      Sequent::Migrations.alter_table(UserRecord)
    ]

  }
end

end

Constants

LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE

Corresponds with the index on aggregate_id column in the event_records table

Since we replay in batches of the first 3 chars of the uuid we created an index on these 3 characters. Hence the name ;-)

This also means that the online replay is divided up into 16**3 groups This might seem a lot for starting event store, but when you will get more events, you will see that this is pretty good partitioned.

Attributes

db_config[R]
logger[R]
view_schema[R]

Public Class Methods

create_view_schema_if_not_exists(env:) click to toggle source

@see create_view_schema_if_not_exists @param env [String] The environment used for connecting the database

# File lib/sequent/migrations/view_schema.rb, line 92
def create_view_schema_if_not_exists(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)

  new(db_config: db_config).create_view_schema_if_not_exists
end
create_view_tables(env:) click to toggle source

@see create_view_tables @param env [String] The environment used for connecting the database

# File lib/sequent/migrations/view_schema.rb, line 82
def create_view_tables(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)
  new(db_config: db_config).create_view_tables
end
new(db_config:) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 102
def initialize(db_config:)
  @db_config = db_config
  @view_schema = Sequent.configuration.view_schema_name
  @logger = Sequent.logger
end

Public Instance Methods

create_view_schema_if_not_exists() click to toggle source

Utility method that creates the view_schema and the meta data tables

This method is mainly useful during an initial setup of the view schema

# File lib/sequent/migrations/view_schema.rb, line 158
def create_view_schema_if_not_exists
  exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema}))
  migrate_metadata_tables
end
create_view_tables() click to toggle source

Utility method that creates all tables in the view schema

This method is mainly useful in test scenario to just create the entire view schema without replaying the events

# File lib/sequent/migrations/view_schema.rb, line 119
def create_view_tables
  create_view_schema_if_not_exists
  return if Sequent.migration_class == Sequent::Migrations::Projectors
  return if Sequent.new_version == current_version

  in_view_schema do
    Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table|
      sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql"
      statements = sql_file_to_statements(sql_file) do |raw_sql|
        raw_sql.remove('%SUFFIX%')
      end
      statements.each { |statement| exec_sql(statement) }

      indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql"
      if File.exist?(indexes_file_name)
        statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') }
        statements.each(&method(:exec_sql))
      end
    end
    Versions.create!(version: Sequent.new_version)
  end
end
current_version() click to toggle source

Returns the current version from the database

# File lib/sequent/migrations/view_schema.rb, line 110
def current_version
  Versions.current_version
end
executor() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 167
def executor
  @executor ||= Executor.new
end
migrate_offline() click to toggle source

Last part of a view schema migration

+You have to ensure no events are being added to the event store while this method is running.+ For instance put your application in maintenance mode.

The offline part consists of:

  1. Replay all events not yet replayed since migration_online

  2. Within a single transaction do:

2.1 Rename current tables with the +current version+ as SUFFIX 2.2 Rename the new tables and remove the +new version+ suffix 2.3 Add the new version in the Versions table

  1. Update the versions table to complete the migration

If anything fails an exception is raised and everything is rolled back

When this method succeeds you can safely start the application from Sequent’s point of view.

# File lib/sequent/migrations/view_schema.rb, line 249
def migrate_offline
  return if Sequent.new_version == current_version

  ensure_version_correct!
  in_view_schema { Versions.start_offline!(Sequent.new_version) }
  Sequent.logger.info("Start migrate_offline for version #{Sequent.new_version}")

  executor.set_table_names_to_new_version(plan)

  # 1 replay events not yet replayed
  if plan.projectors.any?
    replay!(
      Sequent.configuration.offline_replay_persistor_class.new,
      groups: groups(group_exponent: 1),
      minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id,
    )
  end

  in_view_schema do
    Sequent::ApplicationRecord.transaction do
      # 2.1, 2.2
      executor.execute_offline(plan, current_version)
      # 2.3 Create migration record
      Versions.end_offline!(Sequent.new_version)
    end
  end
  logger.info "Migrated to version #{Sequent.new_version}"
rescue ConcurrentMigration
  raise
rescue MigrationDone
  # no-op same as Sequent.new_version == current_version
rescue Exception => e # rubocop:disable Lint/RescueException
  rollback_migration
  raise e
end
migrate_online() click to toggle source

First part of a view schema migration

Call this method while your application is running. The online part consists of:

  1. Ensure any previous migrations are cleaned up

  2. Create new tables for the Projectors which need to be migrated to the new version

These tables will be called `table_name_VERSION`.
  1. Replay all events to populate the tables

It keeps track of all events that are already replayed.
  1. Resets the table names of the activerecord models (projections)

back to their original values (so without the VERSION suffix)

If anything fails an exception is raised and everything is rolled back

@raise ConcurrentMigrationError if migration is already running

# File lib/sequent/migrations/view_schema.rb, line 188
def migrate_online
  migrate_metadata_tables
  ensure_valid_plan!

  return if Sequent.new_version == current_version

  ensure_version_correct!

  Sequent.logger.info("Start migrate_online for version #{Sequent.new_version}")

  in_view_schema do
    Versions.start_online!(Sequent.new_version)

    drop_old_tables(Sequent.new_version)
    executor.execute_online(plan)
  end

  if plan.projectors.any?
    replay!(
      Sequent.configuration.online_replay_persistor_class.new,
      groups: groups,
      maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id,
    )
  end

  in_view_schema do
    executor.create_indexes_after_execute_online(plan)
    executor.reset_table_names(plan)
    Versions.end_online!(Sequent.new_version)
  end
  Sequent.logger.info("Done migrate_online for version #{Sequent.new_version}")
rescue ConcurrentMigration
  # Do not rollback the migration when this is a concurrent migration as the other one is running
  raise
rescue InvalidMigrationDefinition
  # Do not rollback the migration when since there is nothing to rollback
  raise
rescue Exception => e # rubocop:disable Lint/RescueException
  rollback_migration
  raise e
end
plan() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 163
def plan
  @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version)
end
replay_all!(group_exponent: 1) click to toggle source

Utility method that replays events for all managed_tables from all Sequent::Core::Projector‘s

This method is mainly useful in test scenario’s or development tasks

# File lib/sequent/migrations/view_schema.rb, line 146
def replay_all!(group_exponent: 1)
  replay!(
    Sequent.configuration.online_replay_persistor_class.new,
    projectors: Core::Migratable.projectors,
    groups: groups(group_exponent: group_exponent),
  )
end

Private Instance Methods

disconnect!() click to toggle source

shortcut methods

# File lib/sequent/migrations/view_schema.rb, line 478
def disconnect!
  Sequent::Support::Database.disconnect!
end
drop_old_tables(new_version) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 418
      def drop_old_tables(new_version)
        versions_to_check = (current_version - 10)..new_version
        old_tables = versions_to_check.flat_map do |old_version|
          exec_sql(<<~SQL).flat_map(&:values)
            select table_name from information_schema.tables where table_schema = '#{Sequent.configuration.view_schema_name}' and table_name LIKE '%_#{old_version}'
          SQL
        end
        old_tables.each do |old_table|
          exec_sql("DROP TABLE #{Sequent.configuration.view_schema_name}.#{old_table} CASCADE")
        end
      end
ensure_valid_plan!() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 287
def ensure_valid_plan!
  plan
end
ensure_version_correct!() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 300
def ensure_version_correct!
  create_view_schema_if_not_exists
  new_version = Sequent.new_version

  if new_version < current_version
    fail ArgumentError,
         "new_version [#{new_version}] must be greater or equal to current_version [#{current_version}]"
  end
end
establish_connection() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 482
def establish_connection
  Sequent::Support::Database.establish_connection(db_config)
end
event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 454
      def event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
        fail ArgumentError, 'aggregate_prefixes is mandatory' unless aggregate_prefixes.present?

        event_stream = Sequent.configuration.event_record_class.where(event_type: event_types)
        event_stream = event_stream.where(<<~SQL, aggregate_prefixes)
          substring(aggregate_id::text from 1 for #{LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE}) in (?)
        SQL
        if minimum_xact_id_inclusive && maximum_xact_id_exclusive
          event_stream = event_stream.where(
            'xact_id >= ? AND xact_id < ?',
            minimum_xact_id_inclusive,
            maximum_xact_id_exclusive,
          )
        elsif minimum_xact_id_inclusive
          event_stream = event_stream.where('xact_id >= ?', minimum_xact_id_inclusive)
        elsif maximum_xact_id_exclusive
          event_stream = event_stream.where('xact_id IS NULL OR xact_id < ?', maximum_xact_id_exclusive)
        end
        event_stream
          .order('aggregate_id ASC, sequence_number ASC')
          .select('id, event_type, event_json, sequence_number')
      end
groups(group_exponent: 3, limit: nil, offset: nil) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 390
def groups(group_exponent: 3, limit: nil, offset: nil)
  number_of_groups = 16**group_exponent
  groups = groups_of_aggregate_id_prefixes(number_of_groups)
  groups = groups.drop(offset) unless offset.nil?
  groups = groups.take(limit) unless limit.nil?
  groups
end
groups_of_aggregate_id_prefixes(number_of_groups) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 398
def groups_of_aggregate_id_prefixes(number_of_groups)
  all_prefixes = (0...16**LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE).to_a.map do |i|
    i.to_s(16)
  end
  all_prefixes = all_prefixes.map { |s| s.length == 3 ? s : "#{'0' * (3 - s.length)}#{s}" }

  logger.info "Number of groups #{number_of_groups}"

  logger.debug "Prefixes: #{all_prefixes.length}"
  if number_of_groups > all_prefixes.length
    fail "Can not have more groups #{number_of_groups} than number of prefixes #{all_prefixes.length}"
  end

  all_prefixes.each_slice(all_prefixes.length / number_of_groups).to_a
end
in_view_schema(&block) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 414
def in_view_schema(&block)
  Sequent::Support::Database.with_schema_search_path(view_schema, db_config, &block)
end
migrate_metadata_tables() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 291
def migrate_metadata_tables
  Sequent::ApplicationRecord.transaction do
    in_view_schema do
      exec_sql([Versions.migration_sql].join("\n"))
    end
  end
  Versions.reset_column_information
end
on_progress() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 430
def on_progress
  ->(progress, done, ids) do
    Sequent::Core::EventStore::PRINT_PROGRESS[progress, done, ids] if progress > 0
  end
end
replay!( replay_persistor, groups:, projectors: plan.projectors, minimum_xact_id_inclusive: nil, maximum_xact_id_exclusive: nil ) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 310
      def replay!(
        replay_persistor,
        groups:,
        projectors: plan.projectors,
        minimum_xact_id_inclusive: nil,
        maximum_xact_id_exclusive: nil
      )
        logger.info "groups: #{groups.size}"

        with_sequent_config(replay_persistor, projectors) do
          logger.info 'Start replaying events'

          time("#{groups.size} groups replayed") do
            event_types = projectors.flat_map { |projector| projector.message_mapping.keys }.uniq.map(&:name)
            disconnect!

            @connected = false
            # using `map_with_index` because https://github.com/grosser/parallel/issues/175
            result = Parallel.map_with_index(
              groups,
              in_processes: Sequent.configuration.number_of_replay_processes,
            ) do |aggregate_prefixes, index|
              @connected ||= establish_connection
              msg = <<~EOS.chomp
                Group (#{aggregate_prefixes.first}-#{aggregate_prefixes.last}) #{index + 1}/#{groups.size} replayed
              EOS
              time(msg) do
                replay_events(
                  aggregate_prefixes,
                  event_types,
                  minimum_xact_id_inclusive,
                  maximum_xact_id_exclusive,
                  replay_persistor,
                  &on_progress
                )
              end
              nil
            rescue StandardError => e
              logger.error "Replaying failed for ids: ^#{aggregate_prefixes.first} - #{aggregate_prefixes.last}"
              logger.error '+++++++++++++++ ERROR +++++++++++++++'
              recursively_print(e)
              raise Parallel::Kill # immediately kill all sub-processes
            end
            establish_connection
            fail if result.nil?
          end
        end
      end
replay_events( aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive, replay_persistor, &on_progress ) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 359
def replay_events(
  aggregate_prefixes,
  event_types,
  minimum_xact_id_inclusive,
  maximum_xact_id_exclusive,
  replay_persistor,
  &on_progress
)
  Sequent.configuration.event_store.replay_events_from_cursor(
    block_size: 1000,
    get_events: -> {
      event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
    },
    on_progress: on_progress,
  )

  replay_persistor.commit

  # Also commit all specific declared replay persistors on projectors.
  Sequent.configuration.event_handlers.select { |e| e.class.replay_persistor }.each(&:commit)
end
rollback_migration() click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 381
def rollback_migration
  disconnect!
  establish_connection
  drop_old_tables(Sequent.new_version)

  executor.reset_table_names(plan)
  Versions.rollback!(Sequent.new_version)
end
with_sequent_config(replay_persistor, projectors, &block) click to toggle source
# File lib/sequent/migrations/view_schema.rb, line 436
def with_sequent_config(replay_persistor, projectors, &block)
  old_config = Sequent.configuration

  config = Sequent.configuration.dup

  replay_projectors = projectors.map do |projector_class|
    projector_class.new(projector_class.replay_persistor || replay_persistor)
  end
  config.transaction_provider = Sequent::Core::Transactions::NoTransactions.new
  config.event_handlers = replay_projectors

  Sequent::Configuration.restore(config)

  block.call
ensure
  Sequent::Configuration.restore(old_config)
end