class BigShift::BigQuery::Table

Constants

DEFAULT_POLL_INTERVAL

Public Class Methods

new(big_query_service, table_data, options={}) click to toggle source
# File lib/bigshift/big_query/table.rb, line 4
def initialize(big_query_service, table_data, options={})
  @big_query_service = big_query_service
  @table_data = table_data
  @logger = options[:logger] || NullLogger::INSTANCE
  @thread = options[:thread] || Kernel
end

Public Instance Methods

load(uri, options={}) click to toggle source
# File lib/bigshift/big_query/table.rb, line 11
def load(uri, options={})
  poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
  load_configuration = {}
  load_configuration[:source_uris] = [uri]
  load_configuration[:write_disposition] = options[:allow_overwrite] ? 'WRITE_TRUNCATE' : 'WRITE_EMPTY'
  load_configuration[:create_disposition] = 'CREATE_IF_NEEDED'
  load_configuration[:schema] = options[:schema] if options[:schema]
  load_configuration[:source_format] = 'CSV'
  load_configuration[:field_delimiter] = '\t'
  load_configuration[:quote] = '"'
  load_configuration[:allow_quoted_newlines] = true
  load_configuration[:destination_table] = @table_data.table_reference
  load_configuration[:max_bad_records] = options[:max_bad_records] if options[:max_bad_records]
  job = Google::Apis::BigqueryV2::Job.new(
    configuration: Google::Apis::BigqueryV2::JobConfiguration.new(
      load: Google::Apis::BigqueryV2::JobConfigurationLoad.new(load_configuration)
    )
  )
  job = @big_query_service.insert_job(@table_data.table_reference.project_id, job)
  @logger.info(sprintf('Loading rows from %s to the table %s.%s', uri, @table_data.table_reference.dataset_id, @table_data.table_reference.table_id))
  started = false
  loop do
    job = @big_query_service.get_job(@table_data.table_reference.project_id, job.job_reference.job_id)
    if job.status && job.status.state == 'DONE'
      if job.status.errors.nil? || job.status.errors.empty?
        break
      else
        job.status.errors.each do |error|
          message = %<Load error: "#{error.message}">
          message << " in #{error.location}" if error.location
          @logger.debug(message)
        end
        raise job.status.error_result.message
      end
    else
      state = job.status && job.status.state
      if state == 'RUNNING' && !started
        @logger.info('Loading started')
        started = true
      else
        @logger.debug(sprintf('Waiting for job %s (status: %s)', job.job_reference.job_id.inspect, state ? state.inspect : 'unknown'))
      end
      @thread.sleep(poll_interval)
    end
  end
  report_complete(job)
  nil
end

Private Instance Methods

report_complete(job) click to toggle source
# File lib/bigshift/big_query/table.rb, line 64
def report_complete(job)
  statistics = job.statistics.load
  input_size = statistics.input_file_bytes.to_f/2**30
  output_size = statistics.output_bytes.to_f/2**30
  @logger.info(sprintf('Loading complete, %.2f GiB loaded from %s files, %s rows created, table size %.2f GiB', input_size, statistics.input_files, statistics.output_rows, output_size))
end