module Cloudtasker::Worker

Add extra methods for testing purpose

Cloud Task based workers

Public Class Methods

clear_all() click to toggle source

Clear all jobs.

# File lib/cloudtasker/testing.rb, line 99
def self.clear_all
  Backend::MemoryTask.clear
end
drain_all() click to toggle source

Run all the jobs.

@return [Array<any>] The return values of the workers perform method.

# File lib/cloudtasker/testing.rb, line 108
def self.drain_all
  Backend::MemoryTask.drain
end
from_hash(hash) click to toggle source

Return a worker instance from a worker hash description. A worker hash description is typically generated by calling `MyWorker#to_h`

@param [Hash] hash A worker hash description.

@return [Cloudtasker::Worker, nil] The instantiated worker.

# File lib/cloudtasker/worker.rb, line 36
def self.from_hash(hash)
  # Symbolize metadata keys and stringify job arguments
  payload = JSON.parse(hash.to_json, symbolize_names: true)
  payload[:job_args] = JSON.parse(payload[:job_args].to_json)

  # Extract worker parameters
  klass_name = payload&.dig(:worker)
  return nil unless klass_name

  # Check that worker class is a valid worker
  worker_klass = Object.const_get(klass_name)
  return nil unless worker_klass.include?(self)

  # Return instantiated worker
  worker_klass.new(payload.slice(:job_queue, :job_args, :job_id, :job_meta, :job_retries, :task_id))
rescue NameError
  nil
end
from_json(json) click to toggle source

Return a worker instance from a serialized worker. A worker can be serialized by calling `MyWorker#to_json`

@param [String] json Worker serialized as json.

@return [Cloudtasker::Worker, nil] The instantiated worker.

# File lib/cloudtasker/worker.rb, line 22
def self.from_json(json)
  from_hash(JSON.parse(json))
rescue JSON::ParserError
  nil
end
included(base) click to toggle source

Add class method to including class

# File lib/cloudtasker/worker.rb, line 7
def self.included(base)
  base.extend(ClassMethods)
  base.attr_writer :job_queue
  base.attr_accessor :job_args, :job_id, :job_meta, :job_reenqueued, :job_retries,
                     :perform_started_at, :perform_ended_at, :task_id
end
new(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil) click to toggle source

Build a new worker instance.

@param [Array<any>] job_args The list of perform args. @param [String] job_id A unique ID identifying this job.

# File lib/cloudtasker/worker.rb, line 143
def initialize(job_queue: nil, job_args: nil, job_id: nil, job_meta: {}, job_retries: 0, task_id: nil)
  @job_args = job_args || []
  @job_id = job_id || SecureRandom.uuid
  @job_meta = MetaStore.new(job_meta)
  @job_retries = job_retries || 0
  @job_queue = job_queue
  @task_id = task_id
end

Public Instance Methods

==(other) click to toggle source

Equality operator.

@param [Any] other The object to compare.

@return [Boolean] True if the object is equal.

# File lib/cloudtasker/worker.rb, line 311
def ==(other)
  other.is_a?(self.class) && other.job_id == job_id
end
arguments_missing?() click to toggle source

Return true if the job arguments are missing.

This may happen if a job was successfully run but retried due to Cloud Task dispatch deadline exceeded. If the arguments were stored in Redis then they may have been flushed already after the successful completion.

If job arguments are missing then the job will simply be declared dead.

@return [Boolean] True if the arguments are missing.

# File lib/cloudtasker/worker.rb, line 363
def arguments_missing?
  job_args.empty? && [0, -1].exclude?(method(:perform).arity)
end
dispatch_deadline() click to toggle source

Return the Dispatch deadline duration. Cloud Tasks will timeout the job after this duration is elapsed.

@return [Integer] The value in seconds.

# File lib/cloudtasker/worker.rb, line 176
def dispatch_deadline
  @dispatch_deadline ||= [
    [
      Config::MIN_DISPATCH_DEADLINE,
      (self.class.cloudtasker_options_hash[:dispatch_deadline] || Cloudtasker.config.dispatch_deadline).to_i
    ].max,
    Config::MAX_DISPATCH_DEADLINE
  ].min
end
execute() click to toggle source

Execute the worker by calling the `perform` with the args.

@return [Any] The result of the perform.

# File lib/cloudtasker/worker.rb, line 200
def execute
  logger.info('Starting job...')

  # Perform job logic
  resp = execute_middleware_chain

  # Log job completion and return result
  logger.info("Job done after #{job_duration}s") { { duration: job_duration } }
  resp
rescue DeadWorkerError => e
  logger.info("Job dead after #{job_duration}s and #{job_retries} retries") { { duration: job_duration } }
  raise(e)
rescue StandardError => e
  logger.info("Job failed after #{job_duration}s") { { duration: job_duration } }
  raise(e)
end
job_class_name() click to toggle source

Return the class name of the worker.

@return [String] The class name.

# File lib/cloudtasker/worker.rb, line 157
def job_class_name
  self.class.to_s
end
job_dead?() click to toggle source

Return true if the job has strictly excceeded its maximum number of retries.

Used a preemptive filter when running the job.

@return [Boolean] True if the job is dead

# File lib/cloudtasker/worker.rb, line 347
def job_dead?
  job_retries > job_max_retries
end
job_duration() click to toggle source

Return the time taken (in seconds) to perform the job. This duration includes the middlewares and the actual perform method.

@return [Float] The time taken in seconds as a floating point number.

# File lib/cloudtasker/worker.rb, line 373
def job_duration
  return 0.0 unless perform_ended_at && perform_started_at

  (perform_ended_at - perform_started_at).ceil(3)
end
job_max_retries() click to toggle source

Return the max number of retries allowed for this job.

The order of precedence for retry lookup is:

  • Worker `max_retries` method

  • Class `max_retries` option

  • Cloudtasker `max_retries` config option

@return [Integer] The number of retries

# File lib/cloudtasker/worker.rb, line 325
def job_max_retries
  @job_max_retries ||= (try(:max_retries, *job_args) || self.class.max_retries)
end
job_must_die?() click to toggle source

Return true if the job must declared dead upon raising an error.

@return [Boolean] True if the job must die on error.

# File lib/cloudtasker/worker.rb, line 335
def job_must_die?
  job_retries >= job_max_retries
end
job_queue() click to toggle source

Return the queue to use for this worker.

@return [String] The name of queue.

# File lib/cloudtasker/worker.rb, line 166
def job_queue
  (@job_queue ||= self.class.cloudtasker_options_hash[:queue] || Config::DEFAULT_JOB_QUEUE).to_s
end
logger() click to toggle source

Return the Cloudtasker logger instance.

@return [Logger, any] The cloudtasker logger.

# File lib/cloudtasker/worker.rb, line 191
def logger
  @logger ||= WorkerLogger.new(self)
end
new_instance() click to toggle source

Return a new instance of the worker with the same args and metadata but with a different id.

@return [Cloudtasker::Worker] <description>

# File lib/cloudtasker/worker.rb, line 272
def new_instance
  self.class.new(job_queue: job_queue, job_args: job_args, job_meta: job_meta)
end
reenqueue(interval) click to toggle source

Helper method used to re-enqueue the job. Re-enqueued jobs keep the same job_id.

This helper may be useful when jobs must pause activity due to external factors such as when a third-party API is throttling the rate of API calls.

@param [Integer] interval Delay to wait before processing the job again (in seconds).

@return [Cloudtasker::CloudTask] The Google Task response

# File lib/cloudtasker/worker.rb, line 261
def reenqueue(interval)
  @job_reenqueued = true
  schedule(interval: interval)
end
run_callback(callback, *args) click to toggle source

Run worker callback.

@param [String, Symbol] callback The callback to run. @param [Array<any>] *args The callback arguments.

@return [any] The callback return value

# File lib/cloudtasker/worker.rb, line 387
def run_callback(callback, *args)
  try(callback, *args)
end
schedule(**args) click to toggle source

Enqueue a worker, with or without delay.

@param [Integer] interval The delay in seconds. @param [Time, Integer] interval The time at which the job should run

@return [Cloudtasker::CloudTask] The Google Task response

# File lib/cloudtasker/worker.rb, line 240
def schedule(**args)
  # Evaluate when to schedule the job
  time_at = schedule_time(args)

  # Schedule job through client middlewares
  Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do
    WorkerHandler.new(self).schedule(time_at: time_at)
  end
end
schedule_time(interval: nil, time_at: nil) click to toggle source

Return a unix timestamp specifying when to run the task.

@param [Integer, nil] interval The time to wait. @param [Integer, nil] time_at The time at which the job should run.

@return [Integer, nil] The Unix timestamp.

# File lib/cloudtasker/worker.rb, line 225
def schedule_time(interval: nil, time_at: nil)
  return nil unless interval || time_at

  # Generate the complete Unix timestamp
  (time_at || Time.now).to_i + interval.to_i
end
to_h() click to toggle source

Return a hash description of the worker.

@return [Hash] The worker hash description.

# File lib/cloudtasker/worker.rb, line 281
def to_h
  {
    worker: self.class.to_s,
    job_id: job_id,
    job_args: job_args,
    job_meta: job_meta.to_h,
    job_retries: job_retries,
    job_queue: job_queue,
    task_id: task_id
  }
end
to_json(*args) click to toggle source

Return a json representation of the worker.

@param [Array<any>] *args Arguments passed to to_json.

@return [String] The worker json representation.

# File lib/cloudtasker/worker.rb, line 300
def to_json(*args)
  to_h.to_json(*args)
end

Private Instance Methods

execute_middleware_chain() click to toggle source

Execute the worker perform method through the middleware chain.

@return [Any] The result of the perform method.

# File lib/cloudtasker/worker.rb, line 413
def execute_middleware_chain
  self.perform_started_at = Time.now

  Cloudtasker.config.server_middleware.invoke(self) do
    # Immediately abort the job if it is already dead
    flag_as_dead if job_dead?
    flag_as_dead(MissingWorkerArgumentsError.new('worker arguments are missing')) if arguments_missing?

    begin
      # Perform the job
      perform(*job_args)
    rescue StandardError => e
      run_callback(:on_error, e)
      return raise(e) unless job_must_die?

      # Flag job as dead
      flag_as_dead(e)
    end
  end
ensure
  self.perform_ended_at = Time.now
end
flag_as_dead(error = nil) click to toggle source

Flag the worker as dead by invoking the on_dead hook and raising a DeadWorkerError

@param [Exception, nil] error An optional exception to be passed to the DeadWorkerError.

# File lib/cloudtasker/worker.rb, line 402
def flag_as_dead(error = nil)
  run_callback(:on_dead, error || DeadWorkerError.new)
ensure
  raise(DeadWorkerError, error)
end