class Embulk::Output::Bigquery

Constants

FILE_WRITER_KEY

Public Class Methods

add_file_writer(file_writer) click to toggle source
# File lib/embulk/output/bigquery.rb, line 448
def self.add_file_writer(file_writer)
  @file_writers_mutex.synchronize do
    @file_writers << file_writer
  end
end
auto_create(task, bigquery) click to toggle source
# File lib/embulk/output/bigquery.rb, line 292
def self.auto_create(task, bigquery)
  if task['auto_create_dataset']
    bigquery.create_dataset(task['dataset'])
  else
    bigquery.get_dataset(task['dataset']) # raises NotFoundError
  end

  if task['mode'] == 'replace_backup' and task['dataset_old'] != task['dataset']
    if task['auto_create_dataset']
      bigquery.create_dataset(task['dataset_old'], reference: task['dataset'])
    else
      bigquery.get_dataset(task['dataset_old']) # raises NotFoundError
    end
  end

  temp_table_expiration = task['temporary_table_expiration']
  temp_options = {'expiration_time' => temp_table_expiration}

  case task['mode']
  when 'delete_in_advance'
    bigquery.delete_table_or_partition(task['table'])
    bigquery.create_table_if_not_exists(task['table'])
  when 'replace'
    bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
    bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
  when 'append'
    bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
    bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
  when 'replace_backup'
    bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
    bigquery.create_table_if_not_exists(task['table'])

    bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old']) # needs for when a partition
  else # append_direct
    if task['auto_create_table']
      bigquery.create_table_if_not_exists(task['table'])
    else
      bigquery.get_table(task['table']) # raises NotFoundError
    end
  end
end
bigquery() click to toggle source
# File lib/embulk/output/bigquery.rb, line 255
def self.bigquery
  @bigquery
end
configure(config, schema, task_count) click to toggle source
# File lib/embulk/output/bigquery.rb, line 33
def self.configure(config, schema, task_count)
  task = {
    'mode'                           => config.param('mode',                           :string,  :default => 'append'),
    'auth_method'                    => config.param('auth_method',                    :string,  :default => 'application_default'),
    'json_keyfile'                   => config.param('json_keyfile',                  LocalFile, :default => nil),
    'project'                        => config.param('project',                        :string,  :default => nil),
    'destination_project'            => config.param('destination_project',            :string,  :default => nil),
    'dataset'                        => config.param('dataset',                        :string),
    'location'                       => config.param('location',                       :string,  :default => nil),
    'table'                          => config.param('table',                          :string),
    'dataset_old'                    => config.param('dataset_old',                    :string,  :default => nil),
    'table_old'                      => config.param('table_old',                      :string,  :default => nil),
    'table_name_old'                 => config.param('table_name_old',                 :string,  :default => nil), # lower version compatibility
    'auto_create_dataset'            => config.param('auto_create_dataset',            :bool,    :default => false),
    'auto_create_table'              => config.param('auto_create_table',              :bool,    :default => true),
    'schema_file'                    => config.param('schema_file',                    :string,  :default => nil),
    'template_table'                 => config.param('template_table',                 :string,  :default => nil),

    'delete_from_local_when_job_end' => config.param('delete_from_local_when_job_end', :bool,    :default => true),
    'job_status_max_polling_time'    => config.param('job_status_max_polling_time',    :integer, :default => 3600),
    'job_status_polling_interval'    => config.param('job_status_polling_interval',    :integer, :default => 10),
    'is_skip_job_result_check'       => config.param('is_skip_job_result_check',       :bool,    :default => false),
    'with_rehearsal'                 => config.param('with_rehearsal',                 :bool,    :default => false),
    'rehearsal_counts'               => config.param('rehearsal_counts',               :integer, :default => 1000),
    'abort_on_error'                 => config.param('abort_on_error',                 :bool,    :default => nil),
    'progress_log_interval'          => config.param('progress_log_interval',          :float,   :default => nil),

    'column_options'                 => config.param('column_options',                 :array,   :default => []),
    'default_timezone'               => config.param('default_timezone',               :string,  :default => ValueConverterFactory::DEFAULT_TIMEZONE),
    'default_timestamp_format'       => config.param('default_timestamp_format',       :string,  :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT),
    'payload_column'                 => config.param('payload_column',                 :string,  :default => nil),
    'payload_column_index'           => config.param('payload_column_index',           :integer, :default => nil),

    'description'                    => config.param('description',                    :string,  :default => nil),

    'open_timeout_sec'               => config.param('open_timeout_sec',               :integer, :default => nil),
    'timeout_sec'                    => config.param('timeout_sec',                    :integer, :default => nil), # google-api-ruby-client < v0.11.0
    'send_timeout_sec'               => config.param('send_timeout_sec',               :integer, :default => nil), # google-api-ruby-client >= v0.11.0
    'read_timeout_sec'               => config.param('read_timeout_sec',               :integer, :default => nil), # google-api-ruby-client >= v0.11.0
    'retries'                        => config.param('retries',                        :integer, :default => 5),
    'application_name'               => config.param('application_name',               :string,  :default => 'Embulk BigQuery plugin'),
    'sdk_log_level'                  => config.param('sdk_log_level',                  :string,  :default => nil),

    'path_prefix'                    => config.param('path_prefix',                    :string,  :default => nil),
    'sequence_format'                => config.param('sequence_format',                :string,  :default => '.%d.%d'),
    'file_ext'                       => config.param('file_ext',                       :string,  :default => nil),
    'skip_file_generation'           => config.param('skip_file_generation',           :bool,    :default => false),
    'compression'                    => config.param('compression',                    :string,  :default => 'NONE'),

    'gcs_bucket'                     => config.param('gcs_bucket',                     :string,  :default => nil),
    'auto_create_gcs_bucket'         => config.param('auto_create_gcs_bucket',         :bool,    :default => false),

    'source_format'                  => config.param('source_format',                  :string,  :default => 'CSV'),
    'max_bad_records'                => config.param('max_bad_records',                :integer, :default => 0),
    'field_delimiter'                => config.param('field_delimiter',                :string,  :default => ','),
    'encoding'                       => config.param('encoding',                       :string,  :default => 'UTF-8'),
    'ignore_unknown_values'          => config.param('ignore_unknown_values',          :bool,    :default => false),
    'allow_quoted_newlines'          => config.param('allow_quoted_newlines',          :bool,    :default => false),
    'time_partitioning'              => config.param('time_partitioning',              :hash,    :default => nil),
    'clustering'                     => config.param('clustering',                     :hash,    :default => nil), # google-api-ruby-client >= v0.21.0
    'schema_update_options'          => config.param('schema_update_options',          :array,   :default => nil),

    'temporary_table_expiration'     => config.param('temporary_table_expiration',     :integer, :default => nil),

    # for debug
    'skip_load'                      => config.param('skip_load',                      :bool,    :default => false),
    'temp_table'                     => config.param('temp_table',                     :string,  :default => nil),
    'rehearsal_table'                => config.param('rehearsal_table',                :string,  :default => nil),
  }

  now = Time.now

  task['mode'] = task['mode'].downcase
  unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode'])
    raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
  end

  if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table']
    raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`"
  end

  if task['mode'] == 'replace_backup'
    task['table_old'] ||= task['table_name_old'] # for lower version compatibility
    if task['dataset_old'].nil? and task['table_old'].nil?
      raise ConfigError.new "`mode: replace_backup` requires either of `dataset_old` or `table_old`"
    end
    task['dataset_old'] ||= task['dataset']
    task['table_old']   ||= task['table']
  end

  if task['table_old']
    task['table_old'] = now.strftime(task['table_old'])
  end
  if task['table']
    task['table'] = now.strftime(task['table'])
  end

  task['auth_method'] = task['auth_method'].downcase
  unless %w[json_key service_account authorized_user compute_engine application_default].include?(task['auth_method'])
    raise ConfigError.new "`auth_method` must be one of service_account (or json_key), authorized_user, compute_engine, application_default"
  end
  if (task['auth_method'] == 'service_account' or task['auth_method'] == 'json_key') and task['json_keyfile'].nil?
    raise ConfigError.new "`json_keyfile` is required for auth_method: service_account (or json_key)"
  end

  if task['json_keyfile']
    begin
      json_key = JSON.parse(task['json_keyfile'])
      task['project'] ||= json_key['project_id']
    rescue => e
      raise ConfigError.new "Parsing 'json_keyfile' failed with error: #{e.class} #{e.message}"
    end
  end
  if task['project'].nil?
    raise ConfigError.new "Required field \"project\" is not set"
  end
  task['destination_project'] ||= task['project']

  if (task['payload_column'] or task['payload_column_index']) and task['auto_create_table']
    if task['schema_file'].nil? and task['template_table'].nil?
      raise ConfigError.new "Cannot guess table schema from Embulk schema with `payload_column` or `payload_column_index`. Either of `schema_file` or `template_table` is required for auto_create_table true"
    end
  end

  if task['payload_column_index']
    if task['payload_column_index'] < 0 || schema.size <= task['payload_column_index']
      raise ConfigError.new "payload_column_index #{task['payload_column_index']} is out of schema size"
    end
  elsif task['payload_column']
    task['payload_column_index'] = schema.find_index {|c| c[:name] == task['payload_column'] }
    if task['payload_column_index'].nil?
      raise ConfigError.new "payload_column #{task['payload_column']} does not exist in schema"
    end
  end

  if task['schema_file']
    unless File.exist?(task['schema_file'])
      raise ConfigError.new "schema_file #{task['schema_file']} is not found"
    end
    begin
      JSON.parse(File.read(task['schema_file']))
    rescue => e
      raise ConfigError.new "Parsing 'schema_file' #{task['schema_file']} failed with error: #{e.class} #{e.message}"
    end
  end

  if task['path_prefix'].nil?
    task['path_prefix'] = Tempfile.create('embulk_output_bigquery_') {|fp| fp.path }
  end

  task['source_format'] = task['source_format'].upcase
  if task['source_format'] == 'JSONL'
    task['source_format'] = 'NEWLINE_DELIMITED_JSON'
  end
  unless %w[CSV NEWLINE_DELIMITED_JSON].include?(task['source_format'])
    raise ConfigError.new "`source_format` must be CSV or NEWLINE_DELIMITED_JSON (JSONL)"
  end

  task['compression'] = task['compression'].upcase
  unless %w[GZIP NONE].include?(task['compression'])
    raise ConfigError.new "`compression` must be GZIP or NONE"
  end

  if task['file_ext'].nil?
    case task['source_format']
    when 'CSV'
      file_ext = '.csv'
    else # newline_delimited_json
      file_ext = '.jsonl'
    end
    case task['compression']
    when 'GZIP'
      file_ext << '.gz'
    end
    task['file_ext'] = file_ext
  end

  unique_name = SecureRandom.uuid.gsub('-', '_')

  if %w[replace replace_backup append].include?(task['mode'])
    task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}"
  else
    task['temp_table'] = nil
  end

  if task['with_rehearsal']
    task['rehearsal_table'] ||= "LOAD_REHEARSAL_#{unique_name}_#{task['table']}"
  end

  if task['sdk_log_level']
    Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}")
  end

  if task['abort_on_error'].nil?
    task['abort_on_error'] = (task['max_bad_records'] == 0)
  end

  if task['time_partitioning']
    unless task['time_partitioning']['type']
      raise ConfigError.new "`time_partitioning` must have `type` key"
    end
  elsif Helper.has_partition_decorator?(task['table'])
    task['time_partitioning'] = {'type' => 'DAY'}
  end

  if task['clustering']
    unless task['clustering']['fields']
      raise ConfigError.new "`clustering` must have `fields` key"
    end
  end

  if task['schema_update_options']
    task['schema_update_options'].each do |schema_update_option|
      unless %w[ALLOW_FIELD_ADDITION ALLOW_FIELD_RELAXATION].include?(schema_update_option)
        raise ConfigError.new "`schema_update_options` must contain either of ALLOW_FIELD_ADDITION or ALLOW_FIELD_RELAXATION or both"
      end
    end
  end

  task
end
converters() click to toggle source
# File lib/embulk/output/bigquery.rb, line 259
def self.converters
  @converters
end
file_writers() click to toggle source
# File lib/embulk/output/bigquery.rb, line 444
def self.file_writers
  @file_writers
end
new(task, schema, index) click to toggle source

instance is created on each task

Calls superclass method
# File lib/embulk/output/bigquery.rb, line 472
def initialize(task, schema, index)
  super

  if task['with_rehearsal'] and @index == 0
    @rehearsaled = false
  end
end
rehearsal_thread() click to toggle source
# File lib/embulk/output/bigquery.rb, line 263
def self.rehearsal_thread
  @rehearsal_thread
end
rehearsal_thread=(rehearsal_thread) click to toggle source
# File lib/embulk/output/bigquery.rb, line 267
def self.rehearsal_thread=(rehearsal_thread)
  @rehearsal_thread = rehearsal_thread
end
reset_file_writers() click to toggle source
# File lib/embulk/output/bigquery.rb, line 440
def self.reset_file_writers
  @file_writers = Array.new
end
transaction(config, schema, task_count) { |task| ... } click to toggle source
# File lib/embulk/output/bigquery.rb, line 334
def self.transaction(config, schema, task_count, &control)
  task = self.configure(config, schema, task_count)

  @task = task
  @schema = schema
  @bigquery = BigqueryClient.new(task, schema)
  @converters = ValueConverterFactory.create_converters(task, schema)

  self.auto_create(@task, @bigquery)

  begin
    paths = []
    if task['skip_file_generation']
      yield(task) # does nothing, but seems it has to be called
      path_pattern = "#{task['path_prefix']}*#{task['file_ext']}"
      Embulk.logger.info { "embulk-output-bigquery: Skip file generation. Get paths from `#{path_pattern}`" }
      paths = Dir.glob(path_pattern)
    else
      task_reports = yield(task) # generates local files

      ios = file_writers.map(&:io)
      paths = ios.map(&:path)
      ios.each do |io|
        Embulk.logger.debug { "close #{io.path}" }
        io.close rescue nil
      end
    end

    if rehearsal_thread
      rehearsal_thread.join
    end

    if task['skip_load'] # only for debug
      Embulk.logger.info { "embulk-output-bigquery: Skip load" }
    else
      if !paths.empty?
        target_table = task['temp_table'] ? task['temp_table'] : task['table']
        if bucket = task['gcs_bucket']
          gcs = GcsClient.new(task)
          gcs.insert_temporary_bucket(bucket) if task['auto_create_gcs_bucket']
          objects = paths.size.times.map { SecureRandom.uuid.to_s }
          gcs.insert_objects(paths, objects: objects, bucket: bucket)
          object_uris = objects.map {|object| URI.join("gs://#{bucket}", object).to_s }
          responses = bigquery.load_from_gcs(object_uris, target_table)
          objects.each {|object| gcs.delete_object(object, bucket: bucket) }
        else
          responses = bigquery.load_in_parallel(paths, target_table)
        end
      else
        responses = []
      end
      transaction_report = self.transaction_report(task, responses)
      Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" }

      if task['abort_on_error'] && !task['is_skip_job_result_check']
        if transaction_report['num_input_rows'] != transaction_report['num_output_rows']
          raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \
            "`num_output_rows (#{transaction_report['num_output_rows']})` does not match"
        end
      end

      if task['mode'] == 'replace_backup'
        begin
          bigquery.get_table_or_partition(task['table'])
          bigquery.copy(task['table'], task['table_old'], task['dataset_old'])
        rescue NotFoundError
        end
      end

      if task['temp_table']
        if task['mode'] == 'append'
          bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND')
        else # replace or replace_backup
          bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE')
        end
      end
    end
  ensure
    begin
      if task['temp_table'] # append or replace or replace_backup
        bigquery.delete_table(task['temp_table'])
      end
    ensure
      if task['delete_from_local_when_job_end']
        paths.each do |path|
          Embulk.logger.info { "embulk-output-bigquery: delete #{path}" }
          File.unlink(path) rescue nil
        end
      else
        paths.each do |path|
          if File.exist?(path)
            Embulk.logger.info { "embulk-output-bigquery: keep #{path}" }
          end
        end
      end
    end
  end

  # this is for -c next_config option, add some paramters for next execution if wants
  next_config_diff = {}
  return next_config_diff
end
transaction_report(task, responses) click to toggle source
# File lib/embulk/output/bigquery.rb, line 271
def self.transaction_report(task, responses)
  num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+)
  return {'num_input_rows' => num_input_rows} if task['is_skip_job_result_check']

  num_response_rows = responses.inject(0) do |sum, response|
    sum + (response ? response.statistics.load.output_rows.to_i : 0)
  end
  if task['temp_table']
    num_output_rows = bigquery.get_table_or_partition(task['temp_table']).num_rows.to_i
  else
    num_output_rows = num_response_rows
  end
  num_rejected_rows = num_input_rows - num_output_rows
  transaction_report = {
    'num_input_rows' => num_input_rows,
    'num_response_rows' => num_response_rows,
    'num_output_rows' => num_output_rows,
    'num_rejected_rows' => num_rejected_rows,
  }
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/bigquery.rb, line 524
def abort
end
add(page) click to toggle source

called for each page in each task

# File lib/embulk/output/bigquery.rb, line 485
def add(page)
  return if task['skip_file_generation']
  num_rows = file_writer.add(page)

  if task['with_rehearsal'] and @index == 0 and !@rehearsaled
    if num_rows >= task['rehearsal_counts']
      load_rehearsal
      @rehearsaled = true
    end
  end
end
close() click to toggle source

called for each page in each task

# File lib/embulk/output/bigquery.rb, line 481
def close
end
commit() click to toggle source

called after processing all pages in each task, returns a task_report

# File lib/embulk/output/bigquery.rb, line 528
def commit
  {}
end
file_writer() click to toggle source

Create one FileWriter object for one output thread, that is, share among tasks. Close theses shared objects in transaction. This is mainly to suppress (or control by -X max_threads) number of files, which equals to number of concurrency to load in parallel, when number of input tasks is many

file_writer must be called at only add because threads in other methods are different (called from non-output threads). Note also that add method of the same task instance would be called in different output threads

# File lib/embulk/output/bigquery.rb, line 464
def file_writer
  return Thread.current[FILE_WRITER_KEY] if Thread.current[FILE_WRITER_KEY]
  file_writer = FileWriter.new(@task, @schema, @index, self.class.converters)
  self.class.add_file_writer(file_writer)
  Thread.current[FILE_WRITER_KEY] = file_writer
end
finish() click to toggle source
# File lib/embulk/output/bigquery.rb, line 521
def finish
end
load_rehearsal() click to toggle source
# File lib/embulk/output/bigquery.rb, line 497
def load_rehearsal
  bigquery = self.class.bigquery
  Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" }

  io = file_writer.close # need to close once for gzip
  rehearsal_path = "#{io.path}.rehearsal"
  Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" }
  FileUtils.cp(io.path, rehearsal_path)
  file_writer.reopen

  self.class.rehearsal_thread = Thread.new do
    begin
      bigquery.create_table_if_not_exists(task['rehearsal_table'])
      response = bigquery.load(rehearsal_path, task['rehearsal_table'])
      num_output_rows = response ? response.statistics.load.output_rows.to_i : 0
      Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" }
    ensure
      Embulk.logger.debug { "embulk_output_bigquery: delete #{rehearsal_path}" }
      File.unlink(rehearsal_path) rescue nil
      bigquery.delete_table(task['rehearsal_table'])
    end
  end
end