class Kronos::Runner::Synchronous

Constants

ID_FUNC
METADATA_COLLECTORS

TODO: Let consumer configure it

Public Class Methods

new(tasks, dependencies) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 24
def initialize(tasks, dependencies)
  @tasks = tasks
  @dependencies = dependencies
  @lock_manager = LockManager.new(dependencies.storage)
end

Public Instance Methods

start() click to toggle source
# File lib/kronos/runner/synchronous.rb, line 30
def start
  loop do
    run_resolved_tasks
    schedule_current_tasks

    # TODO: Configurable sleep between runs
    sleep(1)
  end
end

Private Instance Methods

collect_metadata(&block) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 89
def collect_metadata(&block)
  METADATA_COLLECTORS
    .reverse
    .reduce(ID_FUNC[block]) { |chain, collector| -> () { collector[chain] } }
    .call
end
find_task(task_id) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 108
def find_task(task_id)
  @tasks.find { |task| task.id == task_id }
end
process_task(task_id) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 54
def process_task(task_id)
  task = find_task(task_id)
  return remove_task_from_schedule(task_id) unless task
  run_task(task)
  schedule_next_run(task)
end
raw_execute_task(task) click to toggle source

rubocop:enable

# File lib/kronos/runner/synchronous.rb, line 77
def raw_execute_task(task)
  metadata = collect_metadata { task.block.call }
  task_id = task.id
  @dependencies.logger.success("Task `#{task_id}` ran successfully.")
  @dependencies.storage.register_report(Kronos::Report.success_from(task_id, metadata))
end
register_task_failure(task_id, error) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 84
def register_task_failure(task_id, error)
  @dependencies.logger.error("Task `#{task_id}` failed.")
  @dependencies.storage.register_report(Kronos::Report.failure_from(task_id, error))
end
remove_task_from_schedule(task_id) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 61
def remove_task_from_schedule(task_id)
  @dependencies.logger.info("Task `#{task_id}` was removed from definitions. Removing from schedule too.")
  storage = @dependencies.storage
  storage.remove(task_id)
  storage.remove_reports_for(task_id)
end
run_resolved_tasks() click to toggle source
# File lib/kronos/runner/synchronous.rb, line 50
def run_resolved_tasks
  @dependencies.storage.resolved_tasks.each(&method(:process_task))
end
run_task(task) click to toggle source

rubocop:disable RescueException

# File lib/kronos/runner/synchronous.rb, line 69
def run_task(task)
  task_id = task.id
  @lock_manager.lock_and_execute(task_id) { raw_execute_task(task) }
rescue ::Exception => error
  register_task_failure(task_id, error)
end
schedule(task, next_run) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 102
def schedule(task, next_run)
  task_id = task.id
  @dependencies.logger.info("Scheduling `#{task_id}` to run `#{next_run.iso8601}`")
  @dependencies.storage.schedule(Kronos::ScheduledTask.new(task_id, next_run))
end
schedule_current_tasks() click to toggle source
# File lib/kronos/runner/synchronous.rb, line 42
def schedule_current_tasks
  @tasks.each(&method(:schedule_task))
end
schedule_next_run(task) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 96
def schedule_next_run(task)
  next_run = task.time
  return @dependencies.storage.remove(task.id) if next_run < Time.now
  schedule(task, next_run)
end
schedule_task(task) click to toggle source
# File lib/kronos/runner/synchronous.rb, line 46
def schedule_task(task)
  schedule_next_run(task) unless @dependencies.storage.pending?(task)
end