class Rufus::Scheduler::Job
Constants
- EoTime
Attributes
anything with a call
(job[, timet]) method, what gets actually triggered
a reference to the instance whose call method is the @callable
next trigger time
previous “next trigger time”
Public Class Methods
# File lib/rufus/scheduler/jobs_core.rb, line 46 def initialize(scheduler, original, opts, block) @scheduler = scheduler @original = original @opts = opts @handler = block @callable = if block.respond_to?(:arity) block elsif block.respond_to?(:call) block.method(:call) elsif block.is_a?(Class) @handler = block.new @handler.method(:call) rescue nil else nil end @scheduled_at = EoTime.now @unscheduled_at = nil @last_time = nil @discard_past = opts[:discard_past] @locals = opts[:locals] || opts[:l] || {} @local_mutex = Mutex.new @id = determine_id @name = opts[:name] || opts[:n] fail( ArgumentError, 'missing block or callable to schedule', caller[2..-1] ) unless @callable @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s } @count = 0 @last_work_time = 0.0 @mean_work_time = 0.0 # tidy up options if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false @opts[:overlap] = false end if m = @opts[:mutex] @opts[:mutex] = Array(m) end end
Public Instance Methods
# File lib/rufus/scheduler/jobs_core.rb, line 172 def [](key) @local_mutex.synchronize { @locals[key] } end
# File lib/rufus/scheduler/jobs_core.rb, line 167 def []=(key, value) @local_mutex.synchronize { @locals[key] = value } end
Calls the callable (usually a block) wrapped in this Job
instance.
Warning: error rescueing is the responsibity of the caller.
# File lib/rufus/scheduler/jobs_core.rb, line 205 def call(do_rescue=false) do_call(EoTime.now, do_rescue) end
Will fail with an ArgumentError if the job frequency is higher than the scheduler frequency.
# File lib/rufus/scheduler/jobs_core.rb, line 117 def check_frequency # this parent implementation never fails end
# File lib/rufus/scheduler/jobs_core.rb, line 185 def entries; @local_mutex.synchronize { @locals.entries }; end
# File lib/rufus/scheduler/jobs_core.rb, line 177 def has_key?(key) @local_mutex.synchronize { @locals.has_key?(key) } end
# File lib/rufus/scheduler/jobs_core.rb, line 183 def keys; @local_mutex.synchronize { @locals.keys }; end
Kills all the threads this Job
currently has going on.
# File lib/rufus/scheduler/jobs_core.rb, line 152 def kill threads.each { |t| t.raise(KillSignal) } end
def hash
self.object_id
end def eql?(o)
o.class == self.class && o.hash == self.hash
end
might be necessary at some point
# File lib/rufus/scheduler/jobs_core.rb, line 196 def next_times(count) next_time ? [ next_time ] : [] end
Returns true if the job is scheduled in the past. Used for OneTimeJob when discard_past == true
# File lib/rufus/scheduler/jobs_core.rb, line 109 def past? false # by default end
Default, core, implementation has no effect. Repeat jobs do override it.
# File lib/rufus/scheduler/jobs_core.rb, line 44 def resume_discard_past=(v); end
# File lib/rufus/scheduler/jobs_core.rb, line 157 def running? threads.any? end
# File lib/rufus/scheduler/jobs_core.rb, line 162 def scheduled? @scheduler.scheduled?(self) end
# File lib/rufus/scheduler/jobs_core.rb, line 100 def source_location @callable.source_location end
# File lib/rufus/scheduler/jobs_core.rb, line 145 def threads Thread.list.select { |t| t[:rufus_scheduler_job] == self } end
# File lib/rufus/scheduler/jobs_core.rb, line 122 def trigger(time) @previous_time = @next_time set_next_time(time) do_trigger(time) end
Trigger the job right now, off of its schedule.
Done in collaboration with Piavka in github.com/jmettraux/rufus-scheduler/issues/214
# File lib/rufus/scheduler/jobs_core.rb, line 135 def trigger_off_schedule(time=EoTime.now) do_trigger(time) end
# File lib/rufus/scheduler/jobs_core.rb, line 140 def unschedule @unscheduled_at = EoTime.now end
# File lib/rufus/scheduler/jobs_core.rb, line 184 def values; @local_mutex.synchronize { @locals.values }; end
Protected Instance Methods
# File lib/rufus/scheduler/jobs_core.rb, line 212 def callback(meth, time) return true unless @scheduler.respond_to?(meth) arity = @scheduler.method(meth).arity args = [ self, time ][0, (arity < 0 ? 2 : arity)] @scheduler.send(meth, *args) end
# File lib/rufus/scheduler/jobs_core.rb, line 222 def compute_timeout if to = @opts[:timeout] Rufus::Scheduler.parse(to) else nil end end
Scheduler level < Job
level < this resume()‘s level
# File lib/rufus/scheduler/jobs_core.rb, line 379 def discard_past? dp = @scheduler.discard_past dp = @discard_past if @discard_past != nil dp = @resume_discard_past if @resume_discard_past != nil dp end
# File lib/rufus/scheduler/jobs_core.rb, line 236 def do_call(time, do_rescue) args = [ self, time ][0, @callable.arity] @scheduler.around_trigger(self) do @callable.call(*args) end rescue StandardError => se fail se unless do_rescue return if se.is_a?(KillSignal) # discard @scheduler.on_error(self, se) # exceptions above StandardError do pass through end
# File lib/rufus/scheduler/jobs_core.rb, line 255 def do_trigger(time) return if ( opts[:overlap] == false && running? ) return if ( callback(:confirm_lock, time) && callback(:on_pre_trigger, time) ) == false @count += 1 if opts[:blocking] trigger_now(time) else trigger_queue(time) end end
# File lib/rufus/scheduler/jobs_core.rb, line 231 def mutex(m) m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new) end
# File lib/rufus/scheduler/jobs_core.rb, line 304 def post_trigger(time) set_next_time(time, true) # except IntervalJob instances, jobs will ignore this call callback(:on_post_trigger, time) end
# File lib/rufus/scheduler/jobs_core.rb, line 312 def start_work_thread thread = Thread.new do ct = Thread.current ct[:rufus_scheduler_job] = true # indicates that the thread is going to be assigned immediately ct[@scheduler.thread_key] = true ct[:rufus_scheduler_work_thread] = true loop do break if @scheduler.started_at == nil job, time = @scheduler.work_queue.pop break if job == :shutdown break if @scheduler.started_at == nil next if job.unscheduled_at begin (job.opts[:mutex] || []).reduce( lambda { job.trigger_now(time) } ) do |b, m| lambda { mutex(m).synchronize { b.call } } end.call rescue KillSignal # simply go on looping end end end thread[@scheduler.thread_key] = true thread[:rufus_scheduler_work_thread] = true # # same as above (in the thead block), # but since it has to be done as quickly as possible. # So, whoever is running first (scheduler thread vs job thread) # sets this information thread end
# File lib/rufus/scheduler/jobs_core.rb, line 275 def trigger_now(time) ct = Thread.current t = EoTime.now # if there are mutexes, t might be really bigger than time ct[:rufus_scheduler_job] = self ct[:rufus_scheduler_time] = t ct[:rufus_scheduler_timeout] = compute_timeout @last_time = t do_call(time, true) ensure @last_work_time = EoTime.now - ct[:rufus_scheduler_time] @mean_work_time = ((@count - 1) * @mean_work_time + @last_work_time) / @count post_trigger(time) ct[:rufus_scheduler_job] = nil ct[:rufus_scheduler_time] = nil ct[:rufus_scheduler_timeout] = nil end
# File lib/rufus/scheduler/jobs_core.rb, line 362 def trigger_queue(time) threads = @scheduler.work_threads vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size que = @scheduler.work_queue.size cur = threads.size max = @scheduler.max_work_threads start_work_thread if vac - que < 1 && cur < max @scheduler.work_queue << [ self, time ] end