class Bricolage::PostgresConnection

Public Class Methods

install_signal_handlers() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 10
def PostgresConnection.install_signal_handlers
  Signal.trap(:TERM) {
    $stderr.puts 'receive SIGTERM'
    raise Interrupt, 'SIGTERM'
  }
end
new(connection, ds, logger) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 38
def initialize(connection, ds, logger)
  @connection = connection
  @ds = ds
  @logger = logger
  @closed = false
  @connection_failed = false
end
open_data_source(ds) { |conn| ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 17
def PostgresConnection.open_data_source(ds)
  conn = _open_ds(ds)
  if block_given?
    begin
      yield conn
    ensure
      conn.close_force
    end
  else
    return conn
  end
end

Private Class Methods

_open_ds(ds) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 30
def PostgresConnection._open_ds(ds)
  conn = PG::Connection.open(host: ds.host, port: ds.port, dbname: ds.database, user: ds.user, password: ds.password)
  new(conn, ds, ds.logger)
rescue PG::ConnectionBad, PG::UnableToSend => ex
  raise ConnectionError.wrap(ex)
end

Public Instance Methods

analyze(table) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 257
def analyze(table)
  execute "analyze #{table};"
end
cancel() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 69
def cancel
  @logger.info "cancelling PostgreSQL query..."
  err = @connection.cancel
  if err
    @logger.error "could not cancel query: #{err}"
    raise PostgreSQLException, "could not cancel query: #{err}"
  else
    @logger.info "successfully cancelled"
  end
end
cancel_force() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 64
def cancel_force
  cancel
rescue PostgreSQLException
end
close() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 50
def close
  @connection.close
  @closed = true
end
close_force() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 55
def close_force
  close
rescue
end
closed?() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 60
def closed?
  @closed
end
drop_table(name) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 241
def drop_table(name)
  execute "drop table #{name} cascade;"
end
drop_table_force(name) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 245
def drop_table_force(name)
  execute "drop table if exists #{name} cascade;"
end
execute(query)
Alias for: execute_update
execute_query(query) { |rs)| ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 130
def execute_query(query, &block)
  log_query query
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return (yield rs)
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end
Also aliased as: query
execute_update(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 89
def execute_update(query)
  log_query query
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return rs.to_a
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end
Also aliased as: execute, update
in_transaction?() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 155
def in_transaction?
  @connection.transaction_status == PG::Constants::PQTRANS_INTRANS
end
lock(table) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 261
def lock(table)
  execute("lock #{table}")
end
log_elapsed_time() { || ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 274
def log_elapsed_time
  b = Time.now
  return yield
ensure
  e = Time.now
  t = e - b
  @logger.log(@ds.sql_log_level) { "#{'%.1f' % t} secs" }
end
log_query(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 265
def log_query(query)
  @logger.log(@ds.sql_log_level) { "[#{@ds.name}] #{mask_secrets query}" }
end
make_unique_cursor_name() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 217
def make_unique_cursor_name
  seq = (Thread.current['bricolage_cursor_seq'] += 1)
  "cur_bric_#{$$}_#{'%X' % Thread.current.object_id}_#{seq}"
end
open_cursor(query, name = nil) { |cursor| ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 204
def open_cursor(query, name = nil, &block)
  unless in_transaction?
    transaction {
      return open_cursor(query, &block)
    }
  end
  name ||= make_unique_cursor_name
  execute "declare #{name} cursor for #{query}"
  yield Cursor.new(name, self, @logger)
end
query(query, &block)
Alias for: execute_query
query_batch(query, batch_size = 5000, &block) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 149
def query_batch(query, batch_size = 5000, &block)
  open_cursor(query) {|cur|
    cur.each_result_set(batch_size, &block)
  }
end
query_row(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 122
def query_row(query)
  execute_query(query) {|rs| rs.to_a.first }
end
query_rows(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 126
def query_rows(query)
  execute_query(query) {|rs| rs.to_a }
end
query_value(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 113
def query_value(query)
  row = query_row(query)
  row ? row.values.first : nil
end
query_values(query) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 118
def query_values(query)
  execute_query(query) {|rs| rs.to_a }.flat_map {|rec| rec.values }
end
select(table, &block) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 109
def select(table, &block)
  execute_query("select * from #{table}", &block)
end
source() click to toggle source
# File lib/bricolage/postgresconnection.rb, line 46
def source
  @connection
end
transaction() { |txn| ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 159
def transaction
  execute 'begin transaction'
  txn = Transaction.new(self)
  begin
    yield txn
  rescue
    begin
      if not txn.committed? and not @connection_failed
        txn.abort
      end
    rescue => ex
      @logger.error "SQL error on transaction abort: #{ex.message} (ignored)"
    end
    raise
  ensure
    txn.commit unless txn.committed?
  end
end
update(query)
Alias for: execute_update
vacuum(table) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 249
def vacuum(table)
  execute "vacuum #{table};"
end
vacuum_sort_only(table) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 253
def vacuum_sort_only(table)
  execute "vacuum sort only #{table};"
end

Private Instance Methods

mask_secrets(msg) click to toggle source
# File lib/bricolage/postgresconnection.rb, line 269
def mask_secrets(msg)
  msg.gsub(/\bcredentials\s+'.*?'/mi, "credentials '****'")
end
querying() { || ... } click to toggle source
# File lib/bricolage/postgresconnection.rb, line 80
def querying
  return yield
rescue Interrupt
  @logger.info "query interrupted"
  cancel_force
  raise
end