class Workerholic::JobScheduler
Attributes
alive[RW]
scheduler_thread[R]
sorted_set[R]
Public Class Methods
new(opts={})
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 6 def initialize(opts={}) @sorted_set = opts[:sorted_set] || SortedSet.new @alive = true end
Public Instance Methods
enqueue_due_jobs()
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 29 def enqueue_due_jobs if job_due? while job_due? serialized_job, job_execution_time = sorted_set.peek job = JobSerializer.deserialize(serialized_job) queue = job.queue ? Queue.new(job.queue) : Queue.new queue.enqueue(serialized_job) sorted_set.remove(job_execution_time) end else sleep(2) end end
job_due?()
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 17 def job_due? scheduled_job = sorted_set.peek return false unless scheduled_job job_execution_time = scheduled_job.last Time.now.to_f >= job_execution_time end
join()
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 49 def join scheduler_thread.join if scheduler_thread end
kill()
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 45 def kill self.alive = false end
schedule(serialized_job, score)
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 25 def schedule(serialized_job, score) sorted_set.add(serialized_job, score) end
start()
click to toggle source
# File lib/workerholic/job_scheduler.rb, line 11 def start @scheduler_thread = Thread.new do enqueue_due_jobs while alive end end