class Cloudtasker::Cron::Job

Manage cron jobs

Attributes

worker[R]

Public Class Methods

new(worker) click to toggle source

Build a new instance of the class

@param [Cloudtasker::Worker] worker The cloudtasker worker

# File lib/cloudtasker/cron/job.rb, line 16
def initialize(worker)
  @worker = worker
end

Public Instance Methods

cron_job?() click to toggle source

Return true if the worker is tagged as a cron job.

@return [Boolean] True if the worker relates to a cron schedule.

# File lib/cloudtasker/cron/job.rb, line 78
def cron_job?
  cron_schedule
end
cron_schedule() click to toggle source

Return the cron schedule to use for the job.

@return [Fugit::Cron] The cron schedule.

# File lib/cloudtasker/cron/job.rb, line 114
def cron_schedule
  return nil unless schedule_id

  @cron_schedule ||= Cron::Schedule.find(schedule_id)
end
current_time() click to toggle source

Return the time this cron instance is expected to run at.

@return [Time] The current cron instance time.

# File lib/cloudtasker/cron/job.rb, line 125
def current_time
  @current_time ||=
    begin
      Time.parse(worker.job_meta.get(key(:time_at)).to_s)
    rescue ArgumentError
      Time.try(:current) || Time.now
    end
end
execute() { || ... } click to toggle source

Execute the (cron) job. This method is invoked by the cron middleware.

# File lib/cloudtasker/cron/job.rb, line 184
def execute
  # Execute the job immediately if this worker is not flagged as a cron job.
  return yield unless cron_job?

  # Abort and reject job if this cron instance is not expected.
  return true unless expected_instance?

  # Schedule the next instance of the job
  schedule! unless retry_instance?

  # Flag the cron instance as processing.
  flag(:processing)

  # Execute the cron instance
  yield

  # Flag the cron instance as done
  flag(:done)
end
expected_instance?() click to toggle source

Return true if the cron job is the one we are expecting. This method is used to ensure that jobs related to outdated cron schedules do not get processed.

@return [Boolean] True if the cron job is expected.

# File lib/cloudtasker/cron/job.rb, line 150
def expected_instance?
  retry_instance? || cron_schedule.job_id == job_id
end
flag(state) click to toggle source

Store the cron job instance state.

@param [String, Symbol] state The worker state.

# File lib/cloudtasker/cron/job.rb, line 159
def flag(state)
  state.to_sym == :done ? redis.del(job_gid) : redis.set(job_gid, state.to_s)
end
job_gid() click to toggle source

Return the namespaced worker id.

@return [String] The worker namespaced id.

# File lib/cloudtasker/cron/job.rb, line 60
def job_gid
  key(job_id)
end
job_id() click to toggle source

Return the worker id.

@return [String] The worker id.

# File lib/cloudtasker/cron/job.rb, line 51
def job_id
  worker.job_id
end
key(val) click to toggle source

Return a namespaced key

@param [String, Symbol] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/cron/job.rb, line 27
def key(val)
  return nil if val.nil?

  [self.class.to_s.underscore, val.to_s].join('/')
end
next_time() click to toggle source

Return the Time when the job should run next.

@return [EtOrbi::EoTime] The time the job should run next.

# File lib/cloudtasker/cron/job.rb, line 139
def next_time
  @next_time ||= cron_schedule&.next_time(current_time)
end
redis() click to toggle source

Return the cloudtasker redis client

@return [Cloudtasker::RedisClient] The cloudtasker redis client..

# File lib/cloudtasker/cron/job.rb, line 105
def redis
  @redis ||= RedisClient.new
end
retry_instance?() click to toggle source

Return true if the worker is currently processing (includes retries).

@return [Boolean] True f the worker is processing.

# File lib/cloudtasker/cron/job.rb, line 87
def retry_instance?
  cron_job? && state
end
schedule!() click to toggle source

Schedule the next cron instance.

The task only gets scheduled the first time a worker runs for a given cron instance (Typically a cron worker failing and retrying will not lead to a new task getting scheduled).

# File lib/cloudtasker/cron/job.rb, line 170
def schedule!
  return false unless cron_schedule

  # Configure next cron worker
  next_worker = worker.new_instance.tap { |e| e.job_meta.set(key(:time_at), next_time.iso8601) }

  # Schedule next worker
  task = next_worker.schedule(time_at: next_time)
  cron_schedule.update(task_id: task.id, job_id: next_worker.job_id)
end
schedule_id() click to toggle source

Return the cron schedule id.

@return [String] The schedule id.

# File lib/cloudtasker/cron/job.rb, line 69
def schedule_id
  @schedule_id ||= worker.job_meta.get(key(:schedule_id))
end
set(schedule_id:) click to toggle source

Add cron metadata to the worker.

@param [String, Symbol] name The name of the cron task. @param [String] cron The cron expression.

@return [Cloudtasker::Cron::Job] self.

# File lib/cloudtasker/cron/job.rb, line 41
def set(schedule_id:)
  worker.job_meta.set(key(:schedule_id), schedule_id.to_s)
  self
end
state() click to toggle source

Return the job processing state.

@return [String, nil] The processing state.

# File lib/cloudtasker/cron/job.rb, line 96
def state
  redis.get(job_gid)&.to_sym
end