class PG::Pglogical::Client

Attributes

connection[RW]

Public Class Methods

new(connection) click to toggle source

@param connection [PostgreSQLAdapter] ActiveRecord database connection

# File lib/pg/pglogical/client.rb, line 9
def initialize(connection)
  @connection = connection
end

Public Instance Methods

disable() click to toggle source

Disables pglogical postgres extensions

# File lib/pg/pglogical/client.rb, line 34
def disable
  connection.disable_extension("pglogical")
  connection.disable_extension("pglogical_origin") if connection.postgresql_version < 90_500
end
enable() click to toggle source

Enables pglogical postgres extensions

# File lib/pg/pglogical/client.rb, line 28
def enable
  connection.enable_extension("pglogical_origin") if connection.postgresql_version < 90_500
  connection.enable_extension("pglogical")
end
enabled?() click to toggle source

Returns whether pglogical is currently enabled or not

@return [Boolean]

# File lib/pg/pglogical/client.rb, line 21
def enabled?
  return false unless installed? && connection.extension_enabled?("pglogical")
  return true if connection.postgresql_version >= 90_500
  connection.extension_enabled?("pglogical_origin")
end
installed?() click to toggle source

Returns whether the pglogical postgres extension is installed or not

# File lib/pg/pglogical/client.rb, line 14
def installed?
  connection.select_value("SELECT EXISTS(SELECT * FROM pg_available_extensions WHERE name = 'pglogical')")
end
lag_bytes() click to toggle source

Reports on replication lag from provider to subscriber nodes This method must be run on the provider node

@return [Array<Hash<String,String>>] List of returned lag and application names,

one for each replication process
# File lib/pg/pglogical/client.rb, line 47
      def lag_bytes
        typed_exec(<<-SQL).to_a
          SELECT
            pg_xlog_location_diff(pg_current_xlog_insert_location(), flush_location) AS lag_bytes,
            application_name
          FROM pg_stat_replication
        SQL
      end
node_create(name, dsn) click to toggle source

Creates a node

@param name [String] @param dsn [String] external connection string to the node

# File lib/pg/pglogical/client.rb, line 78
def node_create(name, dsn)
  typed_exec("SELECT pglogical.create_node($1, $2)", name, dsn)
end
node_drop(name, ifexists = false) click to toggle source

Drops the node

@param name [String] @param ifexists [Boolean]

# File lib/pg/pglogical/client.rb, line 110
def node_drop(name, ifexists = false)
  typed_exec("SELECT pglogical.drop_node($1, $2)", name, ifexists)
end
node_dsn_update(name, dsn) click to toggle source

Updates a node connection string

NOTE: This method relies on the internals of the pglogical tables

rather than a published API.

NOTE: Disable subscriptions involving the node before calling this

method for a provider node in a subscriber database.

@param name [String] @param dsn [String] new external connection string to the node @return [Boolean] true if the dsn was updated, false otherwise

# File lib/pg/pglogical/client.rb, line 92
      def node_dsn_update(name, dsn)
        res = typed_exec(<<-SQL, name, dsn)
          UPDATE pglogical.node_interface
          SET if_dsn = $2
          WHERE if_nodeid = (
            SELECT node_id
            FROM pglogical.node
            WHERE node_name = $1
          )
        SQL

        res.cmd_tuples == 1
      end
nodes() click to toggle source
# File lib/pg/pglogical/client.rb, line 114
      def nodes
        typed_exec(<<-SQL)
          SELECT node_name AS name, if_dsn AS conn_string
          FROM pglogical.node join pglogical.node_interface
            ON if_nodeid = node_id
        SQL
      end
replication_set_add_all_tables(set_name, schema_names, sync = false) click to toggle source

Adds all tables in the given schemas to the replication set

@param set_name [String] replication set name @param schema_names [Array<String>] list of schema names @param sync [Boolean] sync table data to all the subscribers to the replication set

# File lib/pg/pglogical/client.rb, line 300
def replication_set_add_all_tables(set_name, schema_names, sync = false)
  typed_exec("SELECT pglogical.replication_set_add_all_tables($1, $2, $3)",
             set_name, schema_names, sync)
end
replication_set_add_table(set_name, table_name, sync = false) click to toggle source

Adds a table to a replication set

@param set_name [String] replication set name @param table_name [String] table name to add @param sync [Boolean] sync the table on all subscribers to the given replication set

# File lib/pg/pglogical/client.rb, line 290
def replication_set_add_table(set_name, table_name, sync = false)
  typed_exec("SELECT pglogical.replication_set_add_table($1, $2, $3)",
             set_name, table_name, sync)
end
replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true) click to toggle source

Alters an existing replication set

@param set_name [String] replication set name @param insert [Boolean] replicate INSERT events @param update [Boolean] replicate UPDATE events @param delete [Boolean] replicate DELETE events @param truncate [Boolean] replicate TRUNCATE events

# File lib/pg/pglogical/client.rb, line 272
def replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true)
  typed_exec("SELECT pglogical.alter_replication_set($1, $2, $3, $4, $5)",
             set_name, insert, update, delete, truncate)
end
replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true) click to toggle source

Creates a new replication set

@param set_name [String] new replication set name @param insert [Boolean] replicate INSERT events @param update [Boolean] replicate UPDATE events @param delete [Boolean] replicate DELETE events @param truncate [Boolean] replicate TRUNCATE events

# File lib/pg/pglogical/client.rb, line 260
def replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true)
  typed_exec("SELECT pglogical.create_replication_set($1, $2, $3, $4, $5)",
             set_name, insert, update, delete, truncate)
end
replication_set_drop(set_name, ifexists = false) click to toggle source

Removes a replication set

@param set_name [string] replication set name @param ifexists [Boolean] if true an error is not thrown when the replication set does not exist

# File lib/pg/pglogical/client.rb, line 281
def replication_set_drop(set_name, ifexists = false)
  typed_exec("SELECT pglogical.drop_replication_set($1, $2)", set_name, ifexists)
end
replication_set_remove_table(set_name, table_name) click to toggle source

Removes a table from a replication set

@param set_name [String] replication set name @param table_name [String] table to remove

# File lib/pg/pglogical/client.rb, line 309
def replication_set_remove_table(set_name, table_name)
  typed_exec("SELECT pglogical.replication_set_remove_table($1, $2)",
             set_name, table_name)
end
replication_sets() click to toggle source

Lists the current replication sets

@return [Array<String>] the replication sets

# File lib/pg/pglogical/client.rb, line 249
def replication_sets
  typed_exec("SELECT set_name FROM pglogical.replication_set").values.flatten
end
subscription_add_replication_set(name, set_name) click to toggle source

Adds a replication set to a subscription Does not sync, only activates event consumption

@param name [String] subscription name @param set_name [String] replication set name

# File lib/pg/pglogical/client.rb, line 192
def subscription_add_replication_set(name, set_name)
  typed_exec("SELECT pglogical.alter_subscription_add_replication_set($1, $2)",
             name, set_name)
end
subscription_create(name, dsn, replication_sets = %w(default default_insert_only), sync_structure = true, sync_data = true, forward_origins = ["all"]) click to toggle source

Creates a subscription to a provider node

@param name [String] subscription name @param dsn [String] provider node connection string @param replication_sets [Array<String>] replication set names to subscribe to @param sync_structure [Boolean] sync the schema structure when subscribing @param sync_data [Boolean] sync the data when subscribing @param forward_origins [Array<String>] names of non-provider nodes to replicate changes

from (cascading replication)
# File lib/pg/pglogical/client.rb, line 134
def subscription_create(name, dsn, replication_sets = %w(default default_insert_only),
                        sync_structure = true, sync_data = true, forward_origins = ["all"])
  typed_exec("SELECT pglogical.create_subscription($1, $2, $3, $4, $5, $6)",
             name, dsn, replication_sets, sync_structure, sync_data, forward_origins)
end
subscription_disable(name, immediate = false) click to toggle source

Disables a subscription and disconnects it from the provider

@param name [String] subscription name @param immediate [Boolean] do not wait for the current transaction before stopping

# File lib/pg/pglogical/client.rb, line 153
def subscription_disable(name, immediate = false)
  typed_exec("SELECT pglogical.alter_subscription_disable($1, $2)",
             name, immediate)
end
subscription_drop(name, ifexists = false) click to toggle source

Disconnects the subscription and removes it

@param name [String] subscription name @param ifexists [Boolean] if true an error is not thrown when the subscription does not exist

# File lib/pg/pglogical/client.rb, line 144
def subscription_drop(name, ifexists = false)
  typed_exec("SELECT pglogical.drop_subscription($1, $2)",
             name, ifexists)
end
subscription_enable(name, immediate = false) click to toggle source

Enables a previously disabled subscription

@param name [String] subscription name @param immediate [Boolean] do not wait for the current transaction before starting

# File lib/pg/pglogical/client.rb, line 162
def subscription_enable(name, immediate = false)
  typed_exec("SELECT pglogical.alter_subscription_enable($1, $2)",
             name, immediate)
end
subscription_remove_replication_set(name, set_name) click to toggle source

Removes a replication set from a subscription

@param name [String] subscription name @param set_name [String] replication set name

# File lib/pg/pglogical/client.rb, line 201
def subscription_remove_replication_set(name, set_name)
  typed_exec("SELECT pglogical.alter_subscription_remove_replication_set($1, $2)",
             name, set_name)
end
subscription_resync_table(name, table, truncate = true) click to toggle source

Resyncs one existing table Table will be truncated before the sync

@param name [String] subscription name @param table [String] name of the table to resync

# File lib/pg/pglogical/client.rb, line 182
def subscription_resync_table(name, table, truncate = true)
  typed_exec("SELECT pglogical.alter_subscription_resynchronize_table($1, $2, $3)",
             name, table, truncate)
end
subscription_show_status(name) click to toggle source

Shows status and basic information about a subscription

@prarm name [String] subscription name @return [Hash] a hash with the subscription information

keys:
  subscription_name
  status
  provider_node
  provider_dsn
  slot_name
  replication_sets
  forward_origins
  remote_replication_lsn(Log Sequence Number)
  local_replication_lsn(Log Sequence Number)
# File lib/pg/pglogical/client.rb, line 220
      def subscription_show_status(name)
        sql = <<-SQL
          SELECT sub.*, stat.remote_lsn AS remote_replication_lsn, stat.local_lsn AS local_replication_lsn
          FROM pglogical.show_subscription_status($1) sub
          LEFT JOIN pg_replication_origin_status stat
            ON sub.slot_name = stat.external_id
        SQL

        typed_exec(sql, name).first.tap do |s|
          s["replication_sets"] = s["replication_sets"][1..-2].split(",")
          s["forward_origins"] = s["forward_origins"][1..-2].split(",")
        end
      end
subscription_sync(name, truncate = false) click to toggle source

Syncs all unsynchronized tables in all sets in a single operation.

Command does not block

@param name [String] subscription name @param truncate [Boolean] truncate the tables before syncing

# File lib/pg/pglogical/client.rb, line 172
def subscription_sync(name, truncate = false)
  typed_exec("SELECT pglogical.alter_subscription_synchronize($1, $2)",
             name, truncate)
end
subscriptions() click to toggle source

Shows the status of all configured subscriptions

@return Array<Hash> list of results from subscription_show_status

# File lib/pg/pglogical/client.rb, line 237
def subscriptions
  connection.select_values("SELECT sub_name FROM pglogical.subscription").collect do |s|
    subscription_show_status(s)
  end
end
tables_in_replication_set(set_name) click to toggle source

Lists the tables currently in the replication set

@param set_name [String] replication set name @return [Array<String>] names of the tables in the given set

# File lib/pg/pglogical/client.rb, line 318
      def tables_in_replication_set(set_name)
        typed_exec(<<-SQL, set_name).values.flatten
          SELECT set_reloid
          FROM pglogical.replication_set_table
          JOIN pglogical.replication_set
            USING (set_id)
          WHERE set_name = $1
        SQL
      end
wal_retained_bytes() click to toggle source

Reports on replication bytes of WAL being retained for each replication slot This method must be run on the provider node

@return [Array<Hash<String,String>>] List of returned WAL bytes and replication slot names,

one for each replication process
# File lib/pg/pglogical/client.rb, line 61
      def wal_retained_bytes
        typed_exec(<<-SQL).to_a
          SELECT
            pg_xlog_location_diff(pg_current_xlog_insert_location(), restart_lsn) AS retained_bytes,
            slot_name
          FROM pg_replication_slots
          WHERE plugin = 'pglogical_output'
        SQL
      end
with_replication_set_lock(set_name) { || ... } click to toggle source
# File lib/pg/pglogical/client.rb, line 328
      def with_replication_set_lock(set_name)
        connection.transaction(:requires_new => true) do
          typed_exec(<<-SQL, set_name)
            SELECT *
            FROM pglogical.replication_set
            WHERE set_name = $1
            FOR UPDATE
          SQL
          yield
        end
      end

Private Instance Methods

typed_exec(sql, *params) click to toggle source
# File lib/pg/pglogical/client.rb, line 342
def typed_exec(sql, *params)
  type_map = PG::BasicTypeMapForQueries.new(connection.raw_connection)
  connection.raw_connection.async_exec(sql, params, nil, type_map)
end