class Cloudtasker::Backend::GoogleCloudTask

Manage tasks pushed to GCP Cloud Task

Attributes

gcp_task[RW]

Public Class Methods

client() click to toggle source

Return the Google Cloud Task client.

@return [Google::Cloud::Tasks] The Google Cloud Task client.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 45
def self.client
  @client ||= ::Google::Cloud::Tasks.new(version: :v2beta3)
end
config() click to toggle source

Return the cloudtasker configuration. See Cloudtasker#configure.

@return [Cloudtasker::Config] The library configuration.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 54
def self.config
  Cloudtasker.config
end
create(payload) click to toggle source

Create a new task.

@param [Hash] payload The task payload.

@return [Cloudtasker::Backend::GoogleCloudTask, nil] The created task.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 133
def self.create(payload)
  payload = format_task_payload(payload)

  # Extract relative queue name
  relative_queue = payload.delete(:queue)

  # Create task
  resp = with_gax_retries { client.create_task(queue_path(relative_queue), payload) }
  resp ? new(resp) : nil
end
delete(id) click to toggle source

Delete a task by id.

@param [String] id The id of the task.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 149
def self.delete(id)
  with_gax_retries { client.delete_task(id) }
rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound, Google::Gax::PermissionDeniedError
  # The ID does not exist
  nil
end
find(id) click to toggle source

Find a task by id.

@param [String] id The task id.

@return [Cloudtasker::Backend::GoogleCloudTask, nil] The retrieved task.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 118
def self.find(id)
  resp = with_gax_retries { client.get_task(id) }
  resp ? new(resp) : nil
rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound
  # The ID does not exist
  nil
end
format_schedule_time(schedule_time) click to toggle source

Return a protobuf timestamp specifying how to wait before running a task.

@param [Integer, nil] schedule_time A unix timestamp.

@return [Google::Protobuf::Timestamp, nil] The protobuff timestamp

# File lib/cloudtasker/backend/google_cloud_task.rb, line 81
def self.format_schedule_time(schedule_time)
  return nil unless schedule_time

  # Generate protobuf timestamp
  Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i }
end
format_task_payload(payload) click to toggle source

Format the job payload sent to Cloud Tasks.

@param [Hash] hash The worker payload.

@return [Hash] The Cloud Task payloadd.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 95
def self.format_task_payload(payload)
  payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup

  # Format schedule time to Google Protobuf timestamp
  payload[:schedule_time] = format_schedule_time(payload[:schedule_time])

  # Encode job content to support UTF-8. Google Cloud Task
  # expect content to be ASCII-8BIT compatible (binary)
  payload[:http_request][:headers] ||= {}
  payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json'
  payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64'
  payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body])

  payload
end
new(gcp_task) click to toggle source

Build a new instance of the class.

@param [Google::Cloud::Tasks::V2beta3::Task] resp The GCP Cloud Task response

# File lib/cloudtasker/backend/google_cloud_task.rb, line 170
def initialize(gcp_task)
  @gcp_task = gcp_task
end
queue_path(queue_name) click to toggle source

Return the fully qualified path for the Cloud Task queue.

@param [String] queue_name The relative name of the queue.

@return [String] The queue path.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 65
def self.queue_path(queue_name)
  client.queue_path(
    config.gcp_project_id,
    config.gcp_location_id,
    [config.gcp_queue_prefix, queue_name].join('-')
  )
end
setup_queue(**opts) click to toggle source

Create the queue configured in Cloudtasker if it does not already exist.

@param [String] queue_name The relative name of the queue.

@return [Google::Cloud::Tasks::V2beta3::Queue] The queue

# File lib/cloudtasker/backend/google_cloud_task.rb, line 19
def self.setup_queue(**opts)
  # Build full queue path
  queue_name = opts[:name] || Cloudtasker::Config::DEFAULT_JOB_QUEUE
  full_queue_name = queue_path(queue_name)

  # Try to get existing queue
  client.get_queue(full_queue_name)
rescue Google::Gax::RetryError
  # Extract options
  concurrency = (opts[:concurrency] || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i
  retries = (opts[:retries] || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i

  # Create queue on 'not found' error
  client.create_queue(
    client.location_path(config.gcp_project_id, config.gcp_location_id),
    name: full_queue_name,
    retry_config: { max_attempts: retries },
    rate_limits: { max_concurrent_dispatches: concurrency }
  )
end
with_gax_retries() { || ... } click to toggle source

Helper method encapsulating the retry strategy for GAX calls

# File lib/cloudtasker/backend/google_cloud_task.rb, line 159
def self.with_gax_retries
  Retriable.retriable(on: [Google::Gax::UnavailableError], tries: 3) do
    yield
  end
end

Public Instance Methods

relative_queue() click to toggle source

Return the relative queue (queue name minus prefix) the task is in.

@return [String] The relative queue name

# File lib/cloudtasker/backend/google_cloud_task.rb, line 179
def relative_queue
  gcp_task
    .name
    .match(%r{/queues/([^/]+)})
    &.captures
    &.first
    &.sub("#{self.class.config.gcp_queue_prefix}-", '')
end
to_h() click to toggle source

Return a hash description of the task.

@return [Hash] A hash description of the task.

# File lib/cloudtasker/backend/google_cloud_task.rb, line 193
def to_h
  {
    id: gcp_task.name,
    http_request: gcp_task.to_h[:http_request],
    schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i,
    retries: gcp_task.to_h[:response_count],
    queue: relative_queue
  }
end