class Workerholic::JobScheduler

Attributes

alive[RW]
scheduler_thread[R]
sorted_set[R]

Public Class Methods

new(opts={}) click to toggle source
# File lib/workerholic/job_scheduler.rb, line 6
def initialize(opts={})
  @sorted_set = opts[:sorted_set] || SortedSet.new
  @alive = true
end

Public Instance Methods

enqueue_due_jobs() click to toggle source
# File lib/workerholic/job_scheduler.rb, line 29
def enqueue_due_jobs
  if job_due?
    while job_due?
      serialized_job, job_execution_time = sorted_set.peek
      job = JobSerializer.deserialize(serialized_job)
      queue = job.queue ? Queue.new(job.queue) : Queue.new

      queue.enqueue(serialized_job)

      sorted_set.remove(job_execution_time)
    end
  else
    sleep(2)
  end
end
job_due?() click to toggle source
# File lib/workerholic/job_scheduler.rb, line 17
def job_due?
  scheduled_job = sorted_set.peek
  return false unless scheduled_job

  job_execution_time = scheduled_job.last
  Time.now.to_f >= job_execution_time
end
join() click to toggle source
# File lib/workerholic/job_scheduler.rb, line 49
def join
  scheduler_thread.join if scheduler_thread
end
kill() click to toggle source
# File lib/workerholic/job_scheduler.rb, line 45
def kill
  self.alive = false
end
schedule(serialized_job, score) click to toggle source
# File lib/workerholic/job_scheduler.rb, line 25
def schedule(serialized_job, score)
  sorted_set.add(serialized_job, score)
end
start() click to toggle source
# File lib/workerholic/job_scheduler.rb, line 11
def start
  @scheduler_thread = Thread.new do
    enqueue_due_jobs while alive
  end
end