class Fluent::Plugin::BigQueryLoadOutput

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 37
def configure(conf)
  super

  placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}"
  placeholder_validate!(:bigquery_load, placeholder_params)
end
format(tag, time, record) click to toggle source

for Fluent::Plugin::Output#implement? method

# File lib/fluent/plugin/out_bigquery_load.rb, line 60
def format(tag, time, record)
  super
end
prefer_delayed_commit() click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 55
def prefer_delayed_commit
  @use_delayed_commit
end
start() click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 44
def start
  super

  if prefer_delayed_commit
    @polling_targets = []
    @polling_mutex = Mutex.new
    log.debug("start load job polling")
    timer_execute(:polling_bigquery_load_job, @wait_job_interval, &method(:poll))
  end
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 98
def try_write(chunk)
  job_reference = do_write(chunk)
  @polling_mutex.synchronize do
    @polling_targets << job_reference
  end
rescue Fluent::BigQuery::Error => e
  raise if e.retryable?

  @retry_mutex.synchronize do
    if @secondary
      # TODO: find better way
      @retry = retry_state_create(
        :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
        forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
        max_interval: @buffer_config.retry_max_interval,
        secondary: true, secondary_threshold: Float::EPSILON,
        randomize: @buffer_config.retry_randomize
      )
    else
      @retry = retry_state_create(
        :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
        forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
        max_interval: @buffer_config.retry_max_interval,
        randomize: @buffer_config.retry_randomize
      )
    end
  end

  raise
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 64
def write(chunk)
  job_reference = do_write(chunk)

  until response = writer.fetch_load_job(job_reference)
    sleep @wait_job_interval
  end

  writer.commit_load_job(job_reference.chunk_id_hex, response)
rescue Fluent::BigQuery::Error => e
  raise if e.retryable?

  @retry_mutex.synchronize do
    if @secondary
      # TODO: find better way
      @retry = retry_state_create(
        :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
        forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
        max_interval: @buffer_config.retry_max_interval,
        secondary: true, secondary_threshold: Float::EPSILON,
        randomize: @buffer_config.retry_randomize
      )
    else
      @retry = retry_state_create(
        :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
        forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
        max_interval: @buffer_config.retry_max_interval,
        randomize: @buffer_config.retry_randomize
      )
    end
  end

  raise
end

Private Instance Methods

create_upload_source(chunk) { |file| ... } click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 203
def create_upload_source(chunk)
  chunk_is_file = @buffer_config["@type"] == 'file'
  if chunk_is_file
    File.open(chunk.path) do |file|
      yield file
    end
  else
    Tempfile.open("chunk-tmp") do |file|
      file.binmode
      chunk.write_to(file)
      file.sync
      file.rewind
      yield file
    end
  end
end
do_write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 131
def do_write(chunk)
  table_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end

  metadata = chunk.metadata
  project = extract_placeholders(@project, metadata)
  dataset = extract_placeholders(@dataset, metadata)
  table_id = extract_placeholders(table_format, metadata)
  schema = get_schema(project, dataset, metadata)

  create_upload_source(chunk) do |upload_source|
    writer.create_load_job(chunk.unique_id, dump_unique_id_hex(chunk.unique_id), project, dataset, table_id, upload_source, schema)
  end
end
poll() click to toggle source
# File lib/fluent/plugin/out_bigquery_load.rb, line 149
def poll
  job_reference = @polling_mutex.synchronize do
    @polling_targets.shift
  end
  return unless job_reference

  begin
    response = writer.fetch_load_job(job_reference)
    if response
      writer.commit_load_job(job_reference.chunk_id_hex, response)
      commit_write(job_reference.chunk_id)
      log.debug("commit chunk", chunk: job_reference.chunk_id_hex, **job_reference.as_hash(:job_id, :project_id, :dataset_id, :table_id))
    else
      @polling_mutex.synchronize do
        @polling_targets << job_reference
      end
    end
  rescue Fluent::BigQuery::Error => e
    # RetryableError comes from only `commit_load_job`
    # if error is retryable, takeback chunk and do next `try_flush`
    # if error is not retryable, create custom retry_state and takeback chunk do next `try_flush`
    if e.retryable?
      log.warn("failed to poll load job", error: e, chunk: job_reference.chunk_id_hex, **job_reference.as_hash(:job_id, :project_id, :dataset_id, :table_id))
    else
      log.error("failed to poll load job", error: e, chunk: job_reference.chunk_id_hex, **job_reference.as_hash(:job_id, :project_id, :dataset_id, :table_id))
      @retry_mutex.synchronize do
        if @secondary
          # TODO: find better way
          @retry = retry_state_create(
            :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
            forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
            max_interval: @buffer_config.retry_max_interval,
            secondary: true, secondary_threshold: Float::EPSILON,
            randomize: @buffer_config.retry_randomize
          )
        else
          @retry = retry_state_create(
            :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
            forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
            max_interval: @buffer_config.retry_max_interval,
            randomize: @buffer_config.retry_randomize
          )
        end
      end
    end

    rollback_write(job_reference.chunk_id)
  rescue => e
    log.error("unexpected error while polling", error: e)
    log.error_backtrace
    rollback_write(job_reference.chunk_id)
  end
end