class Cloudtasker::Cron::Schedule

Manage cron schedules

Attributes

args[RW]
cron[RW]
id[RW]
job_id[RW]
queue[RW]
task_id[RW]
worker[RW]

Public Class Methods

all() click to toggle source

Return all schedules

@return [Array<Cloudtasker::Batch::Schedule>] The list of stored schedules.

# File lib/cloudtasker/cron/schedule.rb, line 37
def self.all
  if redis.exists?(key)
    # Use Schedule Set if available
    redis.smembers(key).map { |id| find(id) }
  else
    # Fallback to redis key matching and migrate schedules
    # to use Schedule Set instead.
    redis.search(key('*')).map do |gid|
      schedule_id = gid.sub(key(''), '')
      redis.sadd(key, schedule_id)
      find(schedule_id)
    end
  end
end
create(**opts) click to toggle source

Create a new cron schedule (or update an existing one).

@param [Hash] **opts Init opts. See initialize

@return [Cloudtasker::Cron::Schedule] The schedule instance.

# File lib/cloudtasker/cron/schedule.rb, line 79
def self.create(**opts)
  redis.with_lock(key(opts[:id])) do
    config = find(opts[:id]).to_h.merge(opts)
    new(config).tap(&:save)
  end
end
delete(id) click to toggle source

Delete a schedule by id.

@param [String] id The schedule id.

# File lib/cloudtasker/cron/schedule.rb, line 104
def self.delete(id)
  redis.with_lock(key(id)) do
    schedule = find(id)
    return false unless schedule

    # Delete task and stored schedule
    CloudTask.delete(schedule.task_id) if schedule.task_id
    redis.srem(key, schedule.id)
    redis.del(schedule.gid)
  end
end
find(id) click to toggle source

Return a saved cron schedule.

@param [String] id The schedule id.

@return [Cloudtasker::Cron::Schedule] The schedule instance.

# File lib/cloudtasker/cron/schedule.rb, line 93
def self.find(id)
  return nil unless (schedule_config = redis.fetch(key(id)))

  new(schedule_config)
end
key(val = nil) click to toggle source

Return a namespaced key.

@param [String, Symbol, nil] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/cron/schedule.rb, line 28
def self.key(val = nil)
  [to_s.underscore, val].compact.map(&:to_s).join('/')
end
load_from_hash!(hash) click to toggle source

Synchronize list of cron schedules from a Hash. Schedules not listed in this hash will be removed.

@example

Cloudtasker::Cron::Schedule.load_from_hash!(
  my_job: { cron: '0 0 * * *', worker: 'MyWorker' }
  my_other_job: { cron: '0 10 * * *', worker: 'MyOtherWorker' }
)
# File lib/cloudtasker/cron/schedule.rb, line 62
def self.load_from_hash!(hash)
  schedules = hash.map do |id, config|
    schedule_config = JSON.parse(config.to_json, symbolize_names: true).merge(id: id.to_s)
    create(schedule_config)
  end

  # Remove existing schedules which are not part of the list
  all.reject { |e| schedules.include?(e) }.each { |e| delete(e.id) }
end
new(id:, cron:, worker:, **opts) click to toggle source

Build a new instance of the class.

@param [String] id The schedule id. @param [String] cron The cron expression. @param [Class] worker The worker class to run. @param [Array<any>] args The worker arguments. @param [String] queue The queue to use for the cron job. @param [String] task_id The ID of the actual backend task. @param [String] job_id The ID of the Cloudtasker worker.

# File lib/cloudtasker/cron/schedule.rb, line 127
def initialize(id:, cron:, worker:, **opts)
  @id = id
  @cron = cron
  @worker = worker
  @args = opts[:args]
  @queue = opts[:queue]
  @task_id = opts[:task_id]
  @job_id = opts[:job_id]
end
redis() click to toggle source

Return the redis client.

@return [Cloudtasker::RedisClient] The cloudtasker redis client.

# File lib/cloudtasker/cron/schedule.rb, line 17
def self.redis
  @redis ||= RedisClient.new
end

Public Instance Methods

==(other) click to toggle source

Equality operator.

@param [Any] other The object to compare.

@return [Boolean] True if the object is equal.

# File lib/cloudtasker/cron/schedule.rb, line 171
def ==(other)
  other.is_a?(self.class) && other.id == id
end
assign_attributes(**opts) click to toggle source

Buld edit the object attributes.

@param [Hash] **opts The attributes to edit.

# File lib/cloudtasker/cron/schedule.rb, line 256
def assign_attributes(**opts)
  opts
    .select { |k, _| instance_variables.include?("@#{k}".to_sym) }
    .each { |k, v| instance_variable_set("@#{k}", v) }
end
changed?() click to toggle source

RReturn true if the instance attributes were changed compared to the schedule saved in Redis.

@return [Boolean] True if the schedule was modified.

# File lib/cloudtasker/cron/schedule.rb, line 191
def changed?
  to_h != self.class.find(id).to_h
end
config_changed?() click to toggle source

Return true if the configuration of the schedule was changed (cron expression or worker).

@return [Boolean] True if the schedule config was changed.

# File lib/cloudtasker/cron/schedule.rb, line 181
def config_changed?
  self.class.find(id)&.to_config != to_config
end
cron_schedule() click to toggle source

Return the cron schedule to use for the job.

@return [Fugit::Cron] The cron schedule.

# File lib/cloudtasker/cron/schedule.rb, line 227
def cron_schedule
  @cron_schedule ||= Fugit::Cron.parse(cron)
end
gid() click to toggle source

Return the namespaced schedule id.

@return [String] The namespaced schedule id.

# File lib/cloudtasker/cron/schedule.rb, line 151
def gid
  self.class.key(id)
end
next_time(*args) click to toggle source

Return the next time a job should run.

@param [Time] time An optional reference in time (instead of Time.now)

@return [EtOrbi::EoTime] The time the schedule job should run next.

# File lib/cloudtasker/cron/schedule.rb, line 247
def next_time(*args)
  cron_schedule.next_time(*args)
end
redis() click to toggle source

Return the redis client.

@return [Cloudtasker::RedisClient] The cloudtasker redis client.

# File lib/cloudtasker/cron/schedule.rb, line 142
def redis
  self.class.redis
end
save(update_task: true) click to toggle source

Save the object in Redis. If the configuration was changed then any existing cloud task is removed and a task is recreated.

# File lib/cloudtasker/cron/schedule.rb, line 276
def save(update_task: true)
  return false unless valid?

  # Save schedule
  config_was_changed = config_changed?
  redis.sadd(self.class.key, id)
  redis.write(gid, to_h)

  # Stop there if backend does not need update
  return true unless update_task && (config_was_changed || !task_id || !CloudTask.find(task_id))

  # Update backend
  persist_cloud_task
end
to_config() click to toggle source

Return a hash describing the configuration of this schedule.

@return [Hash] The config description hash.

# File lib/cloudtasker/cron/schedule.rb, line 200
def to_config
  {
    id: id,
    cron: cron,
    worker: worker,
    args: args,
    queue: queue
  }
end
to_h() click to toggle source

Return a hash with all the schedule attributes.

@return [Hash] The attributes hash.

# File lib/cloudtasker/cron/schedule.rb, line 215
def to_h
  to_config.merge(
    task_id: task_id,
    job_id: job_id
  )
end
update(**opts) click to toggle source

Edit the object attributes and save the object in Redis.

@param [Hash] **opts The attributes to edit.

# File lib/cloudtasker/cron/schedule.rb, line 267
def update(**opts)
  assign_attributes(opts)
  save
end
valid?() click to toggle source

Return true if the schedule is valid.

@return [Boolean] True if the schedule is valid.

# File lib/cloudtasker/cron/schedule.rb, line 160
def valid?
  id && cron_schedule && worker
end
worker_instance() click to toggle source

Return an instance of the underlying worker.

@return [Cloudtasker::WorkerWrapper] The worker instance

# File lib/cloudtasker/cron/schedule.rb, line 236
def worker_instance
  WorkerWrapper.new(worker_name: worker, job_args: args, job_queue: queue)
end

Private Instance Methods

persist_cloud_task() click to toggle source

Update the task in backend.

# File lib/cloudtasker/cron/schedule.rb, line 296
def persist_cloud_task
  # Delete previous instance
  CloudTask.delete(task_id) if task_id

  # Schedule worker
  Job.new(worker_instance).set(schedule_id: id).schedule!
end