class Cloudtasker::Backend::RedisTask

Manage local tasks pushed to Redis

Constants

RETRY_INTERVAL

Attributes

dispatch_deadline[R]
http_request[R]
id[R]
queue[R]
retries[R]
schedule_time[R]

Public Class Methods

all() click to toggle source

Return all tasks stored in Redis.

@return [Array<Cloudtasker::Backend::RedisTask>] All the tasks.

# File lib/cloudtasker/backend/redis_task.rb, line 39
def self.all
  if redis.exists?(key)
    # Use Schedule Set if available
    redis.smembers(key).map { |id| find(id) }.compact
  else
    # Fallback to redis key matching and migrate tasks
    # to use Task Set instead.
    redis.search(key('*')).map do |gid|
      task_id = gid.sub(key(''), '')
      redis.sadd(key, task_id)
      find(task_id)
    end
  end
end
create(payload) click to toggle source

Push a job to the queue.

@param [Hash] payload The Cloud Task payload.

# File lib/cloudtasker/backend/redis_task.rb, line 85
def self.create(payload)
  id = SecureRandom.uuid
  payload = payload.merge(schedule_time: payload[:schedule_time].to_i)

  # Save job
  redis.write(key(id), payload)
  redis.sadd(key, id)
  new(payload.merge(id: id))
end
delete(id) click to toggle source

Delete a task by id.

@param [String] id The task id.

# File lib/cloudtasker/backend/redis_task.rb, line 114
def self.delete(id)
  redis.srem(key, id)
  redis.del(key(id))
end
find(id) click to toggle source

Get a task by id.

@param [String] id The id of the task.

@return [Cloudtasker::Backend::RedisTask, nil] The task.

# File lib/cloudtasker/backend/redis_task.rb, line 102
def self.find(id)
  gid = key(id)
  return nil unless (payload = redis.fetch(gid))

  new(payload.merge(id: id))
end
key(val = nil) click to toggle source

Return a namespaced key.

@param [String, Symbol, nil] val The key to namespace

@return [String] The namespaced key.

# File lib/cloudtasker/backend/redis_task.rb, line 30
def self.key(val = nil)
  [to_s.underscore, val].compact.map(&:to_s).join('/')
end
new(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil) click to toggle source

Build a new instance of the class.

@param [String] id The ID of the task. @param [Hash] http_request The HTTP request content. @param [Integer] schedule_time When to run the task (Unix timestamp) @param [Integer] retries The number of times the job failed. @param [Integer] dispatch_deadline The dispatch_deadline in seconds.

# File lib/cloudtasker/backend/redis_task.rb, line 128
def initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil)
  @id = id
  @http_request = http_request
  @schedule_time = Time.at(schedule_time || 0)
  @retries = retries || 0
  @queue = queue || Config::DEFAULT_JOB_QUEUE
  @dispatch_deadline = dispatch_deadline || Config::DEFAULT_DISPATCH_DEADLINE
end
pop(queue = nil) click to toggle source

Retrieve and remove a task from the queue.

@param [String] queue The queue to retrieve items from.

@return [Cloudtasker::Backend::RedisTask] A task ready to process.

# File lib/cloudtasker/backend/redis_task.rb, line 74
def self.pop(queue = nil)
  redis.with_lock('cloudtasker/server') do
    ready_to_process(queue).first&.tap(&:destroy)
  end
end
ready_to_process(queue = nil) click to toggle source

Reeturn all tasks ready to process.

@param [String] queue The queue to retrieve items from.

@return [Array<Cloudtasker::Backend::RedisTask>] All the tasks ready to process.

# File lib/cloudtasker/backend/redis_task.rb, line 61
def self.ready_to_process(queue = nil)
  list = all.select { |e| e.schedule_time <= Time.now }
  list = list.select { |e| e.queue == queue } if queue
  list
end
redis() click to toggle source

Return the cloudtasker redis client

@return [Cloudtasker::RedisClient] The cloudtasker redis client..

# File lib/cloudtasker/backend/redis_task.rb, line 19
def self.redis
  @redis ||= RedisClient.new
end

Public Instance Methods

==(other) click to toggle source

Equality operator.

@param [Any] other The object to compare.

@return [Boolean] True if the object is equal.

# File lib/cloudtasker/backend/redis_task.rb, line 230
def ==(other)
  other.is_a?(self.class) && other.id == id
end
deliver() click to toggle source

Deliver the task to the processing endpoint.

# File lib/cloudtasker/backend/redis_task.rb, line 198
def deliver
  Cloudtasker.logger.info(format_log_message('Processing task...'))

  # Send request
  resp = http_client.request(request_content)

  # Delete task if successful
  if resp.code.to_s =~ /20\d/
    destroy
    Cloudtasker.logger.info(format_log_message('Task handled successfully'))
  else
    retry_later(RETRY_INTERVAL)
    Cloudtasker.logger.info(format_log_message("Task failure - Retry in #{RETRY_INTERVAL} seconds..."))
  end

  resp
rescue Net::ReadTimeout
  retry_later(RETRY_INTERVAL)
  Cloudtasker.logger.info(
    format_log_message(
      "Task deadline exceeded (#{dispatch_deadline}s) - Retry in #{RETRY_INTERVAL} seconds..."
    )
  )
end
destroy() click to toggle source

Remove the task from the queue.

# File lib/cloudtasker/backend/redis_task.rb, line 191
def destroy
  self.class.delete(id)
end
gid() click to toggle source

Return the namespaced task id

@return [<Type>] The namespaced task id

# File lib/cloudtasker/backend/redis_task.rb, line 167
def gid
  self.class.key(id)
end
redis() click to toggle source

Return the redis client.

@return [Class] The RedisClient.

# File lib/cloudtasker/backend/redis_task.rb, line 142
def redis
  self.class.redis
end
retry_later(interval, is_error: true) click to toggle source

Retry the task later.

@param [Integer] interval The delay in seconds before retrying the task

# File lib/cloudtasker/backend/redis_task.rb, line 176
def retry_later(interval, is_error: true)
  redis.write(
    gid,
    retries: is_error ? retries + 1 : retries,
    http_request: http_request,
    schedule_time: (Time.now + interval).to_i,
    queue: queue,
    dispatch_deadline: dispatch_deadline
  )
  redis.sadd(self.class.key, id)
end
to_h() click to toggle source

Return a hash description of the task.

@return [Hash] A hash description of the task.

# File lib/cloudtasker/backend/redis_task.rb, line 151
def to_h
  {
    id: id,
    http_request: http_request,
    schedule_time: schedule_time.to_i,
    retries: retries,
    queue: queue,
    dispatch_deadline: dispatch_deadline
  }
end

Private Instance Methods

format_log_message(msg) click to toggle source

Format a log message

@param [String] msg The message to log.

@return [String] The formatted message

# File lib/cloudtasker/backend/redis_task.rb, line 243
def format_log_message(msg)
  "[Cloudtasker/Server][#{id}] #{msg}"
end
http_client() click to toggle source

Return the HTTP client.

@return [Net::HTTP] The http_client.

# File lib/cloudtasker/backend/redis_task.rb, line 252
def http_client
  @http_client ||=
    begin
      uri = URI(http_request[:url])
      Net::HTTP.new(uri.host, uri.port).tap { |e| e.read_timeout = dispatch_deadline }
    end
end
request_content() click to toggle source

Return the HTTP request to send

@return [Net::HTTP::Post] The http request

# File lib/cloudtasker/backend/redis_task.rb, line 265
def request_content
  @request_content ||= begin
    uri = URI(http_request[:url])
    req = Net::HTTP::Post.new(uri.path, http_request[:headers])

    # Add task headers
    req[Cloudtasker::Config::TASK_ID_HEADER] = id
    req[Cloudtasker::Config::RETRY_HEADER] = retries

    # Set job payload
    req.body = http_request[:body]
    req
  end
end