class Rufus::Scheduler::Job

Constants

EoTime

Attributes

callable[R]

anything with a call(job[, timet]) method, what gets actually triggered

count[R]
handler[R]

a reference to the instance whose call method is the @callable

id[R]
job_id[R]
last_time[R]
last_work_time[R]
locals[R]
mean_work_time[R]
name[RW]
next_time[RW]

next trigger time

opts[R]
original[R]
previous_time[RW]

previous “next trigger time”

scheduled_at[R]
tags[R]
unscheduled_at[R]

Public Class Methods

new(scheduler, original, opts, block) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 46
def initialize(scheduler, original, opts, block)

  @scheduler = scheduler
  @original = original
  @opts = opts

  @handler = block

  @callable =
    if block.respond_to?(:arity)
      block
    elsif block.respond_to?(:call)
      block.method(:call)
    elsif block.is_a?(Class)
      @handler = block.new
      @handler.method(:call) rescue nil
    else
      nil
    end

  @scheduled_at = EoTime.now
  @unscheduled_at = nil
  @last_time = nil

  @discard_past = opts[:discard_past]

  @locals = opts[:locals] || opts[:l] || {}
  @local_mutex = Mutex.new

  @id = determine_id
  @name = opts[:name] || opts[:n]

  fail(
    ArgumentError, 'missing block or callable to schedule', caller[2..-1]
  ) unless @callable

  @tags = Array(opts[:tag] || opts[:tags]).collect { |t| t.to_s }

  @count = 0
  @last_work_time = 0.0
  @mean_work_time = 0.0

  # tidy up options

  if @opts[:allow_overlap] == false || @opts[:allow_overlapping] == false
    @opts[:overlap] = false
  end
  if m = @opts[:mutex]
    @opts[:mutex] = Array(m)
  end
end

Public Instance Methods

[](key) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 172
def [](key)

  @local_mutex.synchronize { @locals[key] }
end
[]=(key, value) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 167
def []=(key, value)

  @local_mutex.synchronize { @locals[key] = value }
end
call(do_rescue=false) click to toggle source

Calls the callable (usually a block) wrapped in this Job instance.

Warning: error rescueing is the responsibity of the caller.

# File lib/rufus/scheduler/jobs_core.rb, line 205
def call(do_rescue=false)

  do_call(EoTime.now, do_rescue)
end
check_frequency() click to toggle source

Will fail with an ArgumentError if the job frequency is higher than the scheduler frequency.

# File lib/rufus/scheduler/jobs_core.rb, line 117
def check_frequency

  # this parent implementation never fails
end
entries() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 185
def entries; @local_mutex.synchronize { @locals.entries }; end
has_key?(key) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 177
def has_key?(key)

  @local_mutex.synchronize { @locals.has_key?(key) }
end
Also aliased as: key?
key?(key)
Alias for: has_key?
keys() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 183
def keys; @local_mutex.synchronize { @locals.keys }; end
kill() click to toggle source

Kills all the threads this Job currently has going on.

# File lib/rufus/scheduler/jobs_core.rb, line 152
def kill

  threads.each { |t| t.raise(KillSignal) }
end
location()
Alias for: source_location
next_times(count) click to toggle source

def hash

self.object_id

end def eql?(o)

o.class == self.class && o.hash == self.hash

end

might be necessary at some point
# File lib/rufus/scheduler/jobs_core.rb, line 196
def next_times(count)

  next_time ? [ next_time ] : []
end
past?() click to toggle source

Returns true if the job is scheduled in the past. Used for OneTimeJob when discard_past == true

# File lib/rufus/scheduler/jobs_core.rb, line 109
def past?

  false # by default
end
resume_discard_past=(v) click to toggle source

Default, core, implementation has no effect. Repeat jobs do override it.

# File lib/rufus/scheduler/jobs_core.rb, line 44
def resume_discard_past=(v); end
running?() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 157
def running?

  threads.any?
end
scheduled?() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 162
def scheduled?

  @scheduler.scheduled?(self)
end
source_location() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 100
def source_location

  @callable.source_location
end
Also aliased as: location
threads() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 145
def threads

  Thread.list.select { |t| t[:rufus_scheduler_job] == self }
end
trigger(time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 122
def trigger(time)

  @previous_time = @next_time
  set_next_time(time)

  do_trigger(time)
end
trigger_off_schedule(time=EoTime.now) click to toggle source

Trigger the job right now, off of its schedule.

Done in collaboration with Piavka in github.com/jmettraux/rufus-scheduler/issues/214

# File lib/rufus/scheduler/jobs_core.rb, line 135
def trigger_off_schedule(time=EoTime.now)

  do_trigger(time)
end
unschedule() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 140
def unschedule

  @unscheduled_at = EoTime.now
end
values() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 184
def values; @local_mutex.synchronize { @locals.values }; end

Protected Instance Methods

callback(meth, time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 212
def callback(meth, time)

  return true unless @scheduler.respond_to?(meth)

  arity = @scheduler.method(meth).arity
  args = [ self, time ][0, (arity < 0 ? 2 : arity)]

  @scheduler.send(meth, *args)
end
compute_timeout() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 222
def compute_timeout

  if to = @opts[:timeout]
    Rufus::Scheduler.parse(to)
  else
    nil
  end
end
discard_past?() click to toggle source

Scheduler level < Job level < this resume()‘s level

# File lib/rufus/scheduler/jobs_core.rb, line 379
def discard_past?

  dp = @scheduler.discard_past
  dp = @discard_past if @discard_past != nil
  dp = @resume_discard_past if @resume_discard_past != nil

  dp
end
do_call(time, do_rescue) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 236
def do_call(time, do_rescue)

  args = [ self, time ][0, @callable.arity]

  @scheduler.around_trigger(self) do
    @callable.call(*args)
  end

rescue StandardError => se

  fail se unless do_rescue

  return if se.is_a?(KillSignal) # discard

  @scheduler.on_error(self, se)

# exceptions above StandardError do pass through
end
do_trigger(time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 255
def do_trigger(time)

  return if (
    opts[:overlap] == false &&
    running?
  )
  return if (
    callback(:confirm_lock, time) &&
    callback(:on_pre_trigger, time)
  ) == false

  @count += 1

  if opts[:blocking]
    trigger_now(time)
  else
    trigger_queue(time)
  end
end
mutex(m) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 231
def mutex(m)

  m.is_a?(Mutex) ? m : (@scheduler.mutexes[m.to_s] ||= Mutex.new)
end
post_trigger(time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 304
def post_trigger(time)

  set_next_time(time, true)
    # except IntervalJob instances, jobs will ignore this call

  callback(:on_post_trigger, time)
end
start_work_thread() click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 312
def start_work_thread

  thread =
    Thread.new do

      ct = Thread.current

      ct[:rufus_scheduler_job] = true
        # indicates that the thread is going to be assigned immediately

      ct[@scheduler.thread_key] = true
      ct[:rufus_scheduler_work_thread] = true

      loop do

        break if @scheduler.started_at == nil

        job, time = @scheduler.work_queue.pop

        break if job == :shutdown
        break if @scheduler.started_at == nil

        next if job.unscheduled_at

        begin

          (job.opts[:mutex] || []).reduce(
            lambda { job.trigger_now(time) }
          ) do |b, m|
            lambda { mutex(m).synchronize { b.call } }
          end.call

        rescue KillSignal

          # simply go on looping
        end
      end
    end

  thread[@scheduler.thread_key] = true
  thread[:rufus_scheduler_work_thread] = true
    #
    # same as above (in the thead block),
    # but since it has to be done as quickly as possible.
    # So, whoever is running first (scheduler thread vs job thread)
    # sets this information

  thread
end
trigger_now(time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 275
def trigger_now(time)

  ct = Thread.current

  t = EoTime.now
    # if there are mutexes, t might be really bigger than time

  ct[:rufus_scheduler_job] = self
  ct[:rufus_scheduler_time] = t
  ct[:rufus_scheduler_timeout] = compute_timeout

  @last_time = t

  do_call(time, true)

ensure

  @last_work_time =
    EoTime.now - ct[:rufus_scheduler_time]
  @mean_work_time =
    ((@count - 1) * @mean_work_time + @last_work_time) / @count

  post_trigger(time)

  ct[:rufus_scheduler_job] = nil
  ct[:rufus_scheduler_time] = nil
  ct[:rufus_scheduler_timeout] = nil
end
trigger_queue(time) click to toggle source
# File lib/rufus/scheduler/jobs_core.rb, line 362
def trigger_queue(time)

  threads = @scheduler.work_threads

  vac = threads.select { |t| t[:rufus_scheduler_job] == nil }.size
  que = @scheduler.work_queue.size

  cur = threads.size
  max = @scheduler.max_work_threads

  start_work_thread if vac - que < 1 && cur < max

  @scheduler.work_queue << [ self, time ]
end