class Cloudtasker::Batch::Job

Handle batch management

Constants

BATCH_MAX_LOCK_WAIT

The maximum number of seconds to wait for a batch state lock to be acquired.

COMPLETION_STATUSES

List of statuses triggering a completion callback

IGNORED_ERRORED_CALLBACKS

These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped

JOBS_NAMESPACE

Key Namespace used for object saved under this class

STATES_NAMESPACE

Attributes

worker[R]

Public Class Methods

find(worker_id) click to toggle source

Find a batch by id.

@param [String] batch_id The batch id.

@return [Cloudtasker::Batch::Job, nil] The batch.

# File lib/cloudtasker/batch/job.rb, line 40
def self.find(worker_id)
  return nil unless worker_id

  # Retrieve related worker
  payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}"))
  worker = Cloudtasker::Worker.from_hash(payload)
  return nil unless worker

  # Build batch job
  self.for(worker)
end
for(worker) click to toggle source

Attach a batch to a worker

@param [Cloudtasker::Worker] worker The worker on which the batch must be attached.

@return [Cloudtasker::Batch::Job] The attached batch.

# File lib/cloudtasker/batch/job.rb, line 72
def self.for(worker)
  # Load extension if not loaded already on the worker class
  worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker

  # Add batch and parent batch to worker
  worker.batch = new(worker)
  worker.parent_batch = worker.batch.parent_batch

  # Return the batch
  worker.batch
end
key(val) click to toggle source

Return a namespaced key.

@param [String, Symbol] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/batch/job.rb, line 59
def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end
new(worker) click to toggle source

Build a new instance of the class.

@param [Cloudtasker::Worker] worker The batch worker

# File lib/cloudtasker/batch/job.rb, line 89
def initialize(worker)
  @worker = worker
end
redis() click to toggle source

Return the cloudtasker redis client

@return [Cloudtasker::RedisClient] The cloudtasker redis client..

# File lib/cloudtasker/batch/job.rb, line 29
def self.redis
  @redis ||= RedisClient.new
end

Public Instance Methods

==(other) click to toggle source

Equality operator.

@param [Any] other The object to compare.

@return [Boolean] True if the object is equal.

# File lib/cloudtasker/batch/job.rb, line 119
def ==(other)
  other.is_a?(self.class) && other.batch_id == batch_id
end
add(worker_klass, *args) click to toggle source

Add a worker to the batch

@param [Class] worker_klass The worker class. @param [Array<any>] *args The worker arguments.

@return [Array<Cloudtasker::Worker>] The updated list of jobs.

# File lib/cloudtasker/batch/job.rb, line 200
def add(worker_klass, *args)
  add_to_queue(worker.job_queue, worker_klass, *args)
end
add_to_queue(queue, worker_klass, *args) click to toggle source

Add a worker to the batch using a specific queue.

@param [String, Symbol] queue The name of the queue @param [Class] worker_klass The worker class. @param [Array<any>] *args The worker arguments.

@return [Array<Cloudtasker::Worker>] The updated list of jobs.

# File lib/cloudtasker/batch/job.rb, line 213
def add_to_queue(queue, worker_klass, *args)
  jobs << worker_klass.new(
    job_args: args,
    job_meta: { key(:parent_id) => batch_id },
    job_queue: queue
  )
end
batch_gid() click to toggle source

Return the namespaced worker id.

@return [String] The worker namespaced id.

# File lib/cloudtasker/batch/job.rb, line 159
def batch_gid
  key("#{JOBS_NAMESPACE}/#{batch_id}")
end
batch_id() click to toggle source

Return the worker id.

@return [String] The worker id.

# File lib/cloudtasker/batch/job.rb, line 150
def batch_id
  worker&.job_id
end
batch_state() click to toggle source

Return the batch state

@return [Hash] The state of each child worker.

# File lib/cloudtasker/batch/job.rb, line 186
def batch_state
  migrate_batch_state_to_redis_hash

  redis.hgetall(batch_state_gid)
end
batch_state_gid() click to toggle source

Return the key under which the batch state is stored.

@return [String] The batch state namespaced id.

# File lib/cloudtasker/batch/job.rb, line 168
def batch_state_gid
  key("#{STATES_NAMESPACE}/#{batch_id}")
end
cleanup() click to toggle source

Remove all batch and sub-batch keys from Redis.

# File lib/cloudtasker/batch/job.rb, line 358
def cleanup
  migrate_batch_state_to_redis_hash

  # Delete child batches recursively
  redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup }

  # Delete batch redis entries
  redis.del(batch_gid)
  redis.del(batch_state_gid)
end
complete(status = :completed) click to toggle source

Post-perform logic. The parent batch is notified if the job is complete.

# File lib/cloudtasker/batch/job.rb, line 409
def complete(status = :completed)
  return true if reenqueued? || jobs.any?

  # Notify the parent batch that a child is complete
  on_complete(status) if complete?

  # Notify the parent that a batch node has completed
  parent_batch&.on_batch_node_complete(self, status)
end
complete?() click to toggle source

Return true if all the child workers have completed.

@return [Boolean] True if the batch is complete.

# File lib/cloudtasker/batch/job.rb, line 274
def complete?
  migrate_batch_state_to_redis_hash

  # Check that all child jobs have completed
  redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) }
end
execute() { || ... } click to toggle source

Execute the batch.

# File lib/cloudtasker/batch/job.rb, line 422
def execute
  # Update parent batch state
  parent_batch&.update_state(batch_id, :processing)

  # Perform job
  yield

  # Save batch if child jobs added
  setup if jobs.any?

  # Save parent batch if batch expanded
  parent_batch&.setup if parent_batch&.jobs&.any?

  # Complete batch
  complete(:completed)
rescue DeadWorkerError => e
  complete(:dead)
  raise(e)
rescue StandardError => e
  complete(:errored)
  raise(e)
end
jobs() click to toggle source

The list of jobs in the batch

@return [Array<Cloudtasker::Worker>] The jobs to enqueue at the end of the batch.

# File lib/cloudtasker/batch/job.rb, line 177
def jobs
  @jobs ||= []
end
key(val) click to toggle source

Return a namespaced key.

@param [String, Symbol] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/batch/job.rb, line 130
def key(val)
  self.class.key(val)
end
migrate_batch_state_to_redis_hash() click to toggle source

This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.

# File lib/cloudtasker/batch/job.rb, line 225
def migrate_batch_state_to_redis_hash
  return unless redis.type(batch_state_gid) == 'string'

  # Migrate batch state to Redis hash if it is still using a legacy string key
  # We acquire a lock then check again
  redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do
    if redis.type(batch_state_gid) == 'string'
      state = redis.fetch(batch_state_gid)
      redis.del(batch_state_gid)
      redis.hset(batch_state_gid, state) if state.any?
    end
  end
end
on_batch_node_complete(child_batch, status = :completed) click to toggle source

Callback invoked when any batch in the tree gets completed.

@param [Cloudtasker::Batch::Job] child_batch The completed child batch.

# File lib/cloudtasker/batch/job.rb, line 345
def on_batch_node_complete(child_batch, status = :completed)
  return false unless status == :completed

  # Notify the worker that a batch node worker has completed
  run_worker_callback(:on_batch_node_complete, child_batch.worker)

  # Notify the parent batch that a node is complete
  parent_batch&.on_batch_node_complete(child_batch)
end
on_child_complete(child_batch, status = :completed) click to toggle source

Callback invoked when a direct child batch is complete.

@param [Cloudtasker::Batch::Job] child_batch The completed child batch.

# File lib/cloudtasker/batch/job.rb, line 322
def on_child_complete(child_batch, status = :completed)
  # Update batch state
  update_state(child_batch.batch_id, status)

  # Notify the worker that a direct batch child worker has completed
  case status
  when :completed
    run_worker_callback(:on_child_complete, child_batch.worker)
  when :errored
    run_worker_callback(:on_child_error, child_batch.worker)
  when :dead
    run_worker_callback(:on_child_dead, child_batch.worker)
  end

  # Notify the parent batch that we are done with this batch
  on_complete if status != :errored && complete?
end
on_complete(status = :completed) click to toggle source

Callback invoked when the batch is complete

# File lib/cloudtasker/batch/job.rb, line 306
def on_complete(status = :completed)
  # Invoke worker callback
  run_worker_callback(:on_batch_complete) if status == :completed

  # Propagate event
  parent_batch&.on_child_complete(self, status)

  # The batch tree is complete. Cleanup the downstream tree.
  cleanup
end
parent_batch() click to toggle source

Return the parent batch, if any.

@return [Cloudtasker::Batch::Job, nil] The parent batch.

# File lib/cloudtasker/batch/job.rb, line 139
def parent_batch
  return nil unless (parent_id = worker.job_meta.get(key(:parent_id)))

  @parent_batch ||= self.class.find(parent_id)
end
progress(depth: 0) click to toggle source

Calculate the progress of the batch.

@return [Cloudtasker::Batch::BatchProgress] The batch progress.

# File lib/cloudtasker/batch/job.rb, line 374
def progress(depth: 0)
  depth = depth.to_i

  # Capture batch state
  state = batch_state

  # Return immediately if we do not need to go down the tree
  return BatchProgress.new(state) if depth <= 0

  # Sum batch progress of current batch and sub-batches up to the specified
  # depth
  state.to_h.reduce(BatchProgress.new(state)) do |memo, (child_id, child_status)|
    memo + (self.class.find(child_id)&.progress(depth: depth - 1) ||
      BatchProgress.new(child_id => child_status))
  end
end
redis() click to toggle source

Return the cloudtasker redis client

@return [Cloudtasker::RedisClient] The cloudtasker redis client..

# File lib/cloudtasker/batch/job.rb, line 108
def redis
  self.class.redis
end
reenqueued?() click to toggle source

Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.

@return [Boolean] Return true if the job was reequeued.

# File lib/cloudtasker/batch/job.rb, line 99
def reenqueued?
  worker.job_reenqueued
end
run_worker_callback(callback, *args) click to toggle source

Run worker callback. The error and dead callbacks get silenced should they raise an error.

@param [String, Symbol] callback The callback to run. @param [Array<any>] *args The callback arguments.

@return [any] The callback return value

# File lib/cloudtasker/batch/job.rb, line 290
def run_worker_callback(callback, *args)
  worker.try(callback, *args)
rescue StandardError => e
  # There is no point in retrying jobs due to failure callbacks failing
  # Only completion callbacks will trigger a re-run of the job because
  # these do matter for batch completion
  raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback)

  # Log error instead
  worker.logger.error(e)
  worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.")
end
save() click to toggle source

Save the batch.

# File lib/cloudtasker/batch/job.rb, line 242
def save
  # Save serialized version of the worker. This is required to
  # be able to invoke callback methods in the context of
  # the worker (= instantiated worker) when child workers
  # complete (success or failure).
  redis.write(batch_gid, worker.to_h)

  # Stop there if no jobs to save
  return if jobs.empty?

  # Save list of child workers
  redis.hset(batch_state_gid, jobs.map { |e| [e.job_id, 'scheduled'] }.to_h)
end
setup() click to toggle source

Save the batch and enqueue all child workers attached to it.

@return [Array<Cloudtasker::CloudTask>] The Google Task responses

# File lib/cloudtasker/batch/job.rb, line 396
def setup
  return true if jobs.empty?

  # Save batch
  save

  # Enqueue all child workers
  jobs.map(&:schedule)
end
update_state(batch_id, status) click to toggle source

Update the batch state.

@param [String] job_id The batch id. @param [String] status The status of the sub-batch.

# File lib/cloudtasker/batch/job.rb, line 262
def update_state(batch_id, status)
  migrate_batch_state_to_redis_hash

  # Update the batch state batch_id entry with the new status
  redis.hset(batch_state_gid, batch_id, status) if redis.hexists(batch_state_gid, batch_id)
end