class Cloudtasker::WorkerHandler

Build, serialize and schedule tasks on the processing backend.

Constants

JWT_ALG

Alrogith used to sign the verification token

REDIS_PAYLOAD_NAMESPACE

Sub-namespace to use for redis keys when storing payloads in Redis

Attributes

worker[R]

Public Class Methods

execute_from_payload!(input_payload) click to toggle source

Execute a task worker from a task payload

@param [Hash] input_payload The Cloud Task payload.

@return [Any] The return value of the worker perform method.

# File lib/cloudtasker/worker_handler.rb, line 71
def self.execute_from_payload!(input_payload)
  with_worker_handling(input_payload, &:execute)
end
extract_payload(input_payload) click to toggle source

Return the argument payload key (if present) along with the actual worker payload.

If the payload was stored in Redis then retrieve it.

@return [Hash] Hash

# File lib/cloudtasker/worker_handler.rb, line 120
def self.extract_payload(input_payload)
  # Get references
  payload = JSON.parse(input_payload.to_json, symbolize_names: true)
  args_payload_id = payload.delete(:job_args_payload_id)
  args_payload_key = args_payload_id ? key([REDIS_PAYLOAD_NAMESPACE, args_payload_id].join('/')) : nil

  # Retrieve the actual worker args payload
  args_payload = args_payload_key ? redis.fetch(args_payload_key) : payload[:job_args]

  # Return the payload
  {
    args_payload_key: args_payload_key,
    payload: payload.merge(job_args: args_payload)
  }
end
key(val) click to toggle source

Return a namespaced key

@param [String, Symbol] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/worker_handler.rb, line 24
def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end
log_execution_error(worker, error) click to toggle source

Log error on execution failure.

@param [Cloudtasker::Worker, nil] worker The worker. @param [Exception] error The error to log.

@void

# File lib/cloudtasker/worker_handler.rb, line 50
def self.log_execution_error(worker, error)
  # ActiveJob has its own error logging. No need to double log the error.
  # Note: we use string matching instead of class matching as
  # ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper might not be loaded
  return if worker.class.to_s =~ /^ActiveJob::/

  # Choose logger to use based on context
  # Worker will be nil on InvalidWorkerError - in that case we use generic logging
  logger = worker&.logger || Cloudtasker.logger

  # Log error
  logger.error(error)
end
new(worker) click to toggle source

Prepare a new cloud task.

@param [Cloudtasker::Worker] worker The worker instance.

# File lib/cloudtasker/worker_handler.rb, line 141
def initialize(worker)
  @worker = worker
end
redis() click to toggle source

Return the cloudtasker redis client

@return [Cloudtasker::RedisClient] The cloudtasker redis client.

# File lib/cloudtasker/worker_handler.rb, line 35
def self.redis
  @redis ||= begin
    require 'cloudtasker/redis_client'
    RedisClient.new
  end
end
with_worker_handling(input_payload) { |worker| ... } click to toggle source

Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.

@param [Hash] payload The full job payload

@yield [Hash] The actual payload to use to process the job.

@return [Any] The block result

# File lib/cloudtasker/worker_handler.rb, line 85
def self.with_worker_handling(input_payload)
  # Extract payload information
  extracted_payload = extract_payload(input_payload)
  payload = extracted_payload[:payload]
  args_payload_key = extracted_payload[:args_payload_key]

  # Build worker
  worker = Cloudtasker::Worker.from_hash(payload) || raise(InvalidWorkerError)

  # Yied worker
  resp = yield(worker)

  # Delete stored args payload if job has completed
  redis.del(args_payload_key) if args_payload_key && !worker.job_reenqueued

  resp
rescue DeadWorkerError => e
  # Delete stored args payload if job is dead
  redis.del(args_payload_key) if args_payload_key
  log_execution_error(worker, e)
  Cloudtasker.config.on_dead.call(e, worker)
  raise(e)
rescue StandardError => e
  log_execution_error(worker, e)
  Cloudtasker.config.on_error.call(e, worker)
  raise(e)
end

Public Instance Methods

schedule(time_at: nil) click to toggle source

Schedule the task on GCP Cloud Task.

@param [Integer, nil] time_at A unix timestamp specifying when to run the job.

Leave to `nil` to run now.

@return [Cloudtasker::CloudTask] The Google Task response

# File lib/cloudtasker/worker_handler.rb, line 236
def schedule(time_at: nil)
  # Generate task payload
  task = task_payload.merge(schedule_time: time_at).compact

  # Create and return remote task
  CloudTask.create(task)
end
store_payload_in_redis?() click to toggle source

Return true if the worker args must be stored in Redis.

@return [Boolean] True if the payload must be stored in redis.

# File lib/cloudtasker/worker_handler.rb, line 171
def store_payload_in_redis?
  Cloudtasker.config.redis_payload_storage_threshold &&
    worker.job_args.to_json.bytesize > (Cloudtasker.config.redis_payload_storage_threshold * 1024)
end
task_payload() click to toggle source

Return the full task configuration sent to Cloud Task

@return [Hash] The task body

# File lib/cloudtasker/worker_handler.rb, line 150
def task_payload
  {
    http_request: {
      http_method: 'POST',
      url: Cloudtasker.config.processor_url,
      headers: {
        Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
        Cloudtasker::Config::AUTHORIZATION_HEADER => "Bearer #{Authenticator.verification_token}"
      },
      body: worker_payload.to_json
    },
    dispatch_deadline: worker.dispatch_deadline.to_i,
    queue: worker.job_queue
  }
end
worker_args_payload() click to toggle source

Return the payload to use for job arguments. This payload is merged inside the worker_payload.

If the argument payload must be stored in Redis then returns: `{ job_args_payload_id: <worker_id> }`

If the argument payload must be natively handled by the backend then returns: `{ job_args: […] }`

@return [Hash] The worker args payload.

# File lib/cloudtasker/worker_handler.rb, line 189
def worker_args_payload
  @worker_args_payload ||= begin
    if store_payload_in_redis?
      # Store payload in Redis
      self.class.redis.write(
        self.class.key([REDIS_PAYLOAD_NAMESPACE, worker.job_id].join('/')),
        worker.job_args
      )

      # Return reference to args payload
      { job_args_payload_id: worker.job_id }
    else
      # Return regular job args payload
      { job_args: worker.job_args }
    end
  end
end
worker_payload() click to toggle source

Return the task payload that Google Task will eventually send to the job processor.

The payload includes the worker name and the arguments to pass to the worker.

The worker arguments should use primitive types as much as possible as all arguments will be serialized to JSON.

@return [Hash] The job payload

# File lib/cloudtasker/worker_handler.rb, line 219
def worker_payload
  @worker_payload ||= {
    worker: worker.job_class_name,
    job_queue: worker.job_queue,
    job_id: worker.job_id,
    job_meta: worker.job_meta.to_h
  }.merge(worker_args_payload)
end