class Resque::Plugins::Batch

Constants

JOB_HEARTBEAT
JOB_HEARTBEAT_TTL
VERSION

Attributes

batch_jobs[R]
id[R]
message_handler[R]

Public Class Methods

new(id: nil, message_handler: Resque::Plugins::Batch::MessageHandler.new) click to toggle source
# File lib/resque/plugins/batch.rb, line 21
def initialize(id: nil, message_handler: Resque::Plugins::Batch::MessageHandler.new)
  @id = id || get_id
  @message_handler = message_handler
  @batch_jobs = []
end

Public Instance Methods

enqueue(klass, *args) click to toggle source
# File lib/resque/plugins/batch.rb, line 27
def enqueue(klass, *args)
  batch_jobs << Resque::Plugins::Batch::BatchJobInfo.new(id, batch_jobs.count, klass, *args)
end
job_count() click to toggle source
# File lib/resque/plugins/batch.rb, line 119
def job_count
  batch_jobs.size
end
perform() click to toggle source
# File lib/resque/plugins/batch.rb, line 31
def perform
  # Make sure the incoming message queue is clear
  if redis.llen(batch_key) > 0
    raise "redis list #{batch_key} is not empty"
  end

  result = redis.multi do
    batch_jobs.each_with_index do |batch_job, job_id|
      klass = batch_job.klass
      queue = Resque.queue_from_class(klass) || batch_queue
      args = batch_job.args
      args = [id, job_id] + args

      if Resque.inline
        begin
          Resque::Job.create(queue, klass, *args)
        rescue StandardError => exception
          # NOTE: We still want to use the normal job messaging
        end
      else
        Resque::Job.create(queue, klass, *args)
      end
    end
  end

  unless Resque.inline
    unless result.last == job_count
      raise "not all jobs were queued"
    end
  end

  message_handler.send_message(self, :init)

  last_heartbeat_check = Time.now
  last_activity_check = Time.now

  while(batch_jobs.any?(&:incomplete?)) do
    msg = redis.lpop(batch_key)

    if msg
      decoded_msg = Resque.decode(msg)

      if decoded_msg["job_id"]
        job_id = decoded_msg["job_id"]
        batch_jobs[job_id].process_job_msg(decoded_msg)
      end

      last_activity_check = Time.now

      message_handler.send_message(self, :job, decoded_msg)
    else
      # Reasons there may be no message
      # No Workers - check worker count
      # Workers are processing another batch - register batches, check status
      # A Job takes a long time - send a heartbeat
      # A Job dies - send a heartbeat

      if Time.now - last_heartbeat_check > JOB_HEARTBEAT
        running_jobs = batch_jobs.select(&:running?)

        if running_jobs.any?(&:heartbeat_running?)
          last_activity_check = Time.now
        end

        last_heartbeat_check = Time.now

        running_jobs.reject(&:heartbeat_running?).each do |batch_job|
          decoded_msg = {"job_id" => batch_job.job_id, "msg" => "arrhythmia"}
          batch_jobs[job_id].process_job_msg(decoded_msg)
          message_handler.send_message(self, :job, decoded_msg)
        end
      end

      idle_duration = Time.now - last_activity_check
      message_handler.send_message(self, :idle, {duration: idle_duration})

      sleep(1)
    end
  end

  message_handler.send_message(self, :exit)

  batch_jobs.all?(&:success?)
ensure
  # Cleanup
  redis.del(batch_key)
end

Private Instance Methods

batch_key() click to toggle source
# File lib/resque/plugins/batch.rb, line 133
def batch_key
  "batch:#{id}"
end
batch_queue() click to toggle source
# File lib/resque/plugins/batch.rb, line 129
def batch_queue
  "batch"
end
get_id() click to toggle source

redis.io/commands/incr An atomic counter Used to identify the response list (batch_key)

# File lib/resque/plugins/batch.rb, line 140
def get_id
  redis.incr("batch:id")
end
redis() click to toggle source
# File lib/resque/plugins/batch.rb, line 125
def redis
  Resque.redis
end