class Rake::DataTask::Postgres

Public Class Methods

new(options) click to toggle source

Connect to a PostgreSQL database.

If we’ve already used this class to connect to the same host, port, and database with the same username, re-use that connection for this instance.

@param [Hash] options the connection parameters @option options [String] ‘host’ the server hostname or IP address @option options [Integer] ‘port’ the server port number @option options [String] ‘database’ the database name @option options [String] ‘username’ the name of the database user to connect as @option options [String] ‘password’ the database user’s password @return [Sqlite] an instance of this adapter

# File lib/data_task/adapters/postgres.rb, line 34
def initialize options
  host = options['host'] || 'localhost'
  port = options['port'] || 5432
  database = options['database']
  username = options['username']

  # always reuse an existing connection if it matches on these connection options
  conn_options = {:host => host, :port => port, :database => database, :username => username}
  existing_connection = self.class.persisted_connection(conn_options)

  if existing_connection.nil?
    # create and persist a new connection
    @connection = PG::Connection.new(
      host,
      port,
      nil,
      nil,
      database,
      username,
      options['password'] || ''
    )
    @connection.set_notice_processor do |msg|
      if msg =~ /^ERROR:/
        LOG.error('psql') { msg.gsub(/\n/,'; ') }
      else
        LOG.info('psql') { msg.gsub(/\n/,'; ') }
      end
    end
    self.class.persist_connection(@connection, conn_options)
  else
    # reuse an existing connection
    @connection = existing_connection
  end

  # set up trackig if it isn't set up already
  set_up_tracking if !tracking_tables?
end

Public Instance Methods

[](name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 72
def [](name)
  Data.new(name, self)
end
create_data(table_name, data_definition, column_definitions, track_table=true)
Alias for: create_table
create_table(table_name, data_definition, column_definitions, track_table=true) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 208
      def create_table table_name, data_definition, column_definitions, track_table=true
        drop_table table_name
        execute <<-EOSQL
          create table #{table_name} #{column_definitions}
          #{ "as #{data_definition}" if !data_definition.nil? }
        EOSQL
        if track_table
          create_tracking_rules(table_name)
          track_creation table_name, 0
        end
      end
Also aliased as: create_data
create_view(view_name, view_definition) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 222
      def create_view view_name, view_definition
        drop_view view_name
        execute <<-EOSQL
          create view #{view_name} as
          #{view_definition}
        EOSQL
      end
data_exists?(table_name, options = {})
Alias for: table_exists?
data_mtime(qualified_table_name)
Alias for: table_mtime
drop_data(table_name)
Alias for: drop_table
drop_table(table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 176
def drop_table table_name
  execute "drop table if exists #{table_name} cascade"
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  track_drop table_name
end
Also aliased as: drop_data
drop_view(view_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 230
def drop_view view_name
  execute "drop view if exists #{view_name} cascade"
end
execute(sql) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 94
def execute sql
  connect if @connection.nil?

  begin

    r = @connection.exec sql
    r.values

  rescue PG::UndefinedTable => e

    if /ERROR:  relation "(last_operations|.*\.last_operations)" does not exist/ =~ e.message
      LOG.error "Tracking is not set up in this schema. Set up tracking in this schema first."
    end
    execute "rollback;"
    raise e

  rescue PGError => e

    LOG.info e.message.chomp
    execute "rollback;"
    raise e

  end
end
operations_supported() click to toggle source
# File lib/data_task/adapters/postgres.rb, line 234
def operations_supported
  {
    :by_db => operations_supported_by_db,
    :by_app => [:truncate, :create] - operations_supported_by_db
  }
end
reset_tracking(options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 144
def reset_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    truncate_table TABLE_TRACKER_NAME
  end
end
set_up_tracking(options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 123
def set_up_tracking options = {}
  tear_down_tracking options

  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do

    column_definitions = table_tracker_columns.map do |col, col_defn|
      col.to_s + ' ' + col_defn[:data_type].to_s
    end.join(', ')
    create_table TABLE_TRACKER_NAME, nil, " (#{column_definitions})", false

  end
end
table_exists?(table_name, options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 198
def table_exists? table_name, options = {}
  relation_exists? table_name, :table, options
end
Also aliased as: data_exists?
table_mtime(qualified_table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 151
      def table_mtime qualified_table_name
        schema_name, table_name = parse_schema_and_table_name(qualified_table_name)
        schema_name = first_schema_for(table_name) if schema_name.nil?

        with_search_path(schema_name) do
          Sql.get_single_time(
            execute <<-EOSQL
              select max(time)
              from #{schema_name}.#{TABLE_TRACKER_NAME}
              where relation_name = '#{table_name}'
            EOSQL
          )
        end
      end
Also aliased as: data_mtime
table_tracker_columns() click to toggle source
Calls superclass method
# File lib/data_task/adapters/postgres.rb, line 76
def table_tracker_columns
  # upcase all enum'd column values because system tables store them in upcase
  cols = super
  cols.each do |k1,v1|
    cols[k1].each do |k2, v2|
      if k2 == :values
        cols[k1][k2].each do |k3, v3|
          cols[k1][k2][k3] = v3.upcase
        end
      end
    end
  end

  cols[:relation_type][:values][:table] = 'BASE TABLE'
  cols[:time][:data_type] = :'timestamp with time zone'
  cols
end
tear_down_tracking(options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 137
def tear_down_tracking options = {}
  target_search_path = options[:search_path] || search_path.join(',')
  with_search_path(target_search_path) do
    drop_table TABLE_TRACKER_NAME
  end
end
track_drop(table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 182
      def track_drop table_name
        schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
        table_tracker_name = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

        if table_exists?(table_tracker_name)
          execute <<-EOSQL
            delete from #{table_tracker_name}
            where
              relation_name = '#{unqualified_table_name}' and 
              relation_type = '#{relation_type_values[:table]}'
          EOSQL
        end
      end
tracking_tables?() click to toggle source
# File lib/data_task/adapters/postgres.rb, line 119
def tracking_tables?
  data_exists?(TABLE_TRACKER_NAME)
end
truncate_data(table_name)
Alias for: truncate_table
truncate_table(table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 168
def truncate_table table_name
  return if table_name.casecmp(TABLE_TRACKER_NAME) == 0
  execute "truncate table #{table_name}"
  track_truncate table_name
end
Also aliased as: truncate_data
view_exists?(view_name, options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 204
def view_exists? view_name, options = {}
  relation_exists? view_name, :view, options
end
with_role(role) { || ... } click to toggle source
# File lib/data_task/adapters/postgres.rb, line 249
def with_role role
  original_role = current_user
  execute "set role #{role}"
  r = yield
  execute "set role #{original_role}"
  r
end
with_search_path(schemas) { || ... } click to toggle source
# File lib/data_task/adapters/postgres.rb, line 241
def with_search_path schemas
  original_search_path = search_path
  execute "set search_path to #{Array(schemas).join(',')}"
  r = yield
  execute "set search_path to #{original_search_path.join(',')}"
  r
end

Private Instance Methods

create_tracking_rules(table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 349
        def create_tracking_rules table_name
          schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
          qualified_table_tracker = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

          operations_supported_by_db_rules.each do |operation|
            execute <<-EOSQL
              create or replace rule "#{rule_name(table_name, operation)}" as
                on #{operation.to_s} to #{table_name} do also (

                  delete from #{qualified_table_tracker} where
                    relation_name = '#{unqualified_table_name}' and
                    relation_type = '#{relation_type_values[:table]}'
                    ;

                  insert into #{qualified_table_tracker} values (
                    '#{unqualified_table_name}',
                    '#{relation_type_values[:table]}',
                    '#{operation_values[operation]}',
                    clock_timestamp()
                  );

                )
            EOSQL
          end
        end
current_user() click to toggle source

@returns [String] the name of the current database user

# File lib/data_task/adapters/postgres.rb, line 310
def current_user
  execute("select current_user").first.first
end
first_schema_for(table_name) click to toggle source

@returns [String] the name of the first schema in the search path containing table_name

# File lib/data_task/adapters/postgres.rb, line 315
        def first_schema_for table_name
          return if !table_exists?(table_name)
          schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)

          search_path_when_stmts = []
          search_path.each_with_index do |s,i| 
            search_path_when_stmts << "when table_schema = '#{s}' then #{(i+1).to_s}"
          end

          schema_name = execute <<-EOSQL
            select 
              table_schema,
              search_order
            from (
              select 
                table_schema, 
                table_name,
                case 
                  #{search_path_when_stmts.join(' ')}
                  else 'NaN'::float 
                end as search_order
              from information_schema.tables
              where table_name ilike '#{unqualified_table_name}'
              ) a
            order by search_order
            limit 1
          EOSQL
          schema_name.first.first
        end
operations_supported_by_db() click to toggle source
# File lib/data_task/adapters/postgres.rb, line 261
def operations_supported_by_db
  operations_supported_by_db_rules
end
operations_supported_by_db_rules() click to toggle source
# File lib/data_task/adapters/postgres.rb, line 265
def operations_supported_by_db_rules
  [:update, :insert, :delete]
end
parse_schema_and_table_name(table_name) click to toggle source

Split a table name qualified with a schema name into separate strings for schema and table names.

@returns [String, String] the schema name and table name, separately, for table_name. If table_name is unqualified with the schema name, return [nil, table_name].

# File lib/data_task/adapters/postgres.rb, line 274
def parse_schema_and_table_name table_name
  return [nil, table_name] if table_name.count('.') == 0

  if table_name.count('.') > 1
    raise "Invalid relation reference #{table_name} (only one '.' is allowed)"
  end

  schema_name, table_name = table_name.split('.')
  [schema_name, table_name]
end
relation_exists?(relation_name, relation_type, options = {}) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 409
        def relation_exists? relation_name, relation_type, options = {}
          schema_name, unqualified_relation_name = parse_schema_and_table_name(relation_name)

          if !schema_name.nil?
            schema_conditions_sql = "table_schema ilike '#{schema_name}'"
          else
            schema_conditions_sql = "table_schema in (#{search_path.to_quoted_s})"
          end

          n_matches = Sql.get_single_int(
            execute <<-EOSQL 
              select count(*)
              from information_schema.tables 
              where 
                table_name = '#{unqualified_relation_name}' and
                table_type = '#{relation_type_values[relation_type]}' and
                #{ schema_conditions_sql }
            EOSQL
          )
          (n_matches > 0)
        end
rule_name(table_name, operation) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 345
def rule_name table_name, operation
  "#{table_name}_#{operation.to_s}"
end
search_path() click to toggle source

@returns [Array] the ordered schema names in the search path as strings

# File lib/data_task/adapters/postgres.rb, line 286
        def search_path
          current_search_path = execute("show search_path").first.first.split(',').map { |s| s.strip }
          username = current_user

          # the default search path begins with a symbolic reference to the current username
          # if that reference is in the search path, replace it with the resolved current username
          if current_search_path.first == '"$user"'
            user_schema_exists = execute <<-EOSQL
              select 1
              from information_schema.schemata 
              where schema_name = '#{username}'
            EOSQL

            if user_schema_exists.nil? || user_schema_exists.first.nil?
              current_search_path = current_search_path[1..-1]
            else
              current_search_path = [username] + current_search_path[1..-1]
            end
          end

          current_search_path.map(&:downcase)
        end
track_creation(table_name, n_tuples) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 375
        def track_creation table_name, n_tuples
          schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
          qualified_table_tracker = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

          operation = :create
          execute <<-EOSQL
            delete from #{qualified_table_tracker} where
              relation_name = '#{unqualified_table_name}' and
              relation_type = '#{relation_type_values[:table]}'
              ;
            insert into #{qualified_table_tracker} values (
              '#{unqualified_table_name}',
              '#{relation_type_values[:table]}',
              '#{operation_values[operation]}',
              clock_timestamp()
            );
          EOSQL
        end
track_truncate(table_name) click to toggle source
# File lib/data_task/adapters/postgres.rb, line 394
        def track_truncate table_name
          schema_name, unqualified_table_name = parse_schema_and_table_name(table_name)
          qualified_table_tracker = schema_name.nil? ? TABLE_TRACKER_NAME : "#{schema_name}.#{TABLE_TRACKER_NAME}"

          execute <<-EOSQL
            update #{qualified_table_tracker}
            set 
              operation = '#{operation_values[:truncate]}',
              time = clock_timestamp()
            where
              relation_name = '#{unqualified_table_name}' and
              relation_type = '#{relation_type_values[:table]}'
          EOSQL
        end