class Cloudtasker::UniqueJob::Job

Wrapper class for Cloudtasker::Worker delegating to lock and conflict strategies

Constants

DEFAULT_LOCK

The default lock strategy to use. Defaults to “no lock”.

Attributes

call_opts[R]
worker[R]

Public Class Methods

new(worker, **kwargs) click to toggle source

Build a new instance of the class.

@param [Cloudtasker::Worker] worker The worker at hand

# File lib/cloudtasker/unique_job/job.rb, line 18
def initialize(worker, **kwargs)
  @worker = worker
  @call_opts = kwargs
end

Public Instance Methods

digest_hash() click to toggle source

Return a unique description of the job in hash format.

@return [Hash] Representation of the unique job in hash format.

# File lib/cloudtasker/unique_job/job.rb, line 100
def digest_hash
  @digest_hash ||= {
    class: worker.class.to_s,
    unique_args: unique_args
  }
end
id() click to toggle source

Return the worker job ID.

@return [String] The worker job ID.

# File lib/cloudtasker/unique_job/job.rb, line 112
def id
  worker.job_id
end
lock!() click to toggle source

Acquire a new unique job lock or check that the lock is currently allocated to this job.

Raise a `Cloudtasker::UniqueJob::LockError` if the lock if taken by another job.

# File lib/cloudtasker/unique_job/job.rb, line 151
def lock!
  lock_acquired = redis.set(unique_gid, id, nx: true, ex: lock_ttl)
  lock_already_acquired = !lock_acquired && redis.get(unique_gid) == id

  raise(LockError) unless lock_acquired || lock_already_acquired
end
lock_instance() click to toggle source

Return the instantiated lock.

@return [Any] The instantiated lock

# File lib/cloudtasker/unique_job/job.rb, line 74
def lock_instance
  @lock_instance ||=
    begin
      # Infer lock class and get instance
      lock_name = options[:lock]
      lock_klass = Lock.const_get(lock_name.to_s.split('_').collect(&:capitalize).join)
      lock_klass.new(self)
    rescue NameError
      DEFAULT_LOCK.new(self)
    end
end
lock_ttl() click to toggle source

Return the Time To Live (TTL) that should be set in Redis for the lock key. Having a TTL on lock keys ensures that jobs do not end up stuck due to a dead lock situation.

The TTL is calculated using schedule time + expected max job duration.

The expected max job duration is set to 10 minutes by default. This value was chosen because it's twice the default request timeout value in Cloud Run. This leaves enough room for queue lag (5 minutes) + job processing (5 minutes).

Queue lag is certainly the most unpredictable factor here. Job processing time is less of a factor. Jobs running for more than 5 minutes should be split into sub-jobs to limit invocation time over HTTP. Cloudtasker batch jobs can help achieve that if you need to make one big job split into sub-jobs “atomic”.

The default lock key expiration of “time_at + 10 minutes” may look aggressive but it is still a better choice than potentially having real-time jobs stuck for X hours.

The expected max job duration can be configured via the `lock_ttl` option on the job itself.

@return [Integer] The TTL in seconds

# File lib/cloudtasker/unique_job/job.rb, line 58
def lock_ttl
  now = Time.now.to_i

  # Get scheduled at and lock duration
  scheduled_at = [call_opts[:time_at].to_i, now].compact.max
  lock_duration = (options[:lock_ttl] || Cloudtasker::UniqueJob.lock_ttl).to_i

  # Return TTL
  scheduled_at + lock_duration - now
end
options() click to toggle source

Return the worker configuration options.

@return [Hash] The worker configuration options.

# File lib/cloudtasker/unique_job/job.rb, line 28
def options
  worker.class.cloudtasker_options_hash
end
redis() click to toggle source

Return the Cloudtasker redis client.

@return [Cloudtasker::RedisClient] The cloudtasker redis client.

# File lib/cloudtasker/unique_job/job.rb, line 140
def redis
  @redis ||= Cloudtasker::RedisClient.new
end
unique_args() click to toggle source

Return the list of arguments used for job uniqueness.

@return [Array<any>] The list of unique arguments

# File lib/cloudtasker/unique_job/job.rb, line 91
def unique_args
  worker.try(:unique_args, worker.job_args) || worker.job_args
end
unique_gid() click to toggle source

Return the Global ID of the unique job. The gid includes the UniqueJob namespace.

@return [String] The global ID of the job

# File lib/cloudtasker/unique_job/job.rb, line 131
def unique_gid
  [self.class.to_s.underscore, unique_id].join('/')
end
unique_id() click to toggle source

Return the ID of the unique job.

@return [String] The ID of the job.

# File lib/cloudtasker/unique_job/job.rb, line 121
def unique_id
  Digest::SHA256.hexdigest(digest_hash.to_json)
end
unlock!() click to toggle source

Delete the job lock.

# File lib/cloudtasker/unique_job/job.rb, line 161
def unlock!
  locked_id = redis.get(unique_gid)
  redis.del(unique_gid) if locked_id == id
end