class Kronos::Runner::Synchronous
Constants
- ID_FUNC
- METADATA_COLLECTORS
TODO: Let consumer configure it
Public Class Methods
new(tasks, dependencies)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 24 def initialize(tasks, dependencies) @tasks = tasks @dependencies = dependencies @lock_manager = LockManager.new(dependencies.storage) end
Public Instance Methods
start()
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 30 def start loop do run_resolved_tasks schedule_current_tasks # TODO: Configurable sleep between runs sleep(1) end end
Private Instance Methods
collect_metadata(&block)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 89 def collect_metadata(&block) METADATA_COLLECTORS .reverse .reduce(ID_FUNC[block]) { |chain, collector| -> () { collector[chain] } } .call end
find_task(task_id)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 108 def find_task(task_id) @tasks.find { |task| task.id == task_id } end
process_task(task_id)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 54 def process_task(task_id) task = find_task(task_id) return remove_task_from_schedule(task_id) unless task run_task(task) schedule_next_run(task) end
raw_execute_task(task)
click to toggle source
rubocop:enable
# File lib/kronos/runner/synchronous.rb, line 77 def raw_execute_task(task) metadata = collect_metadata { task.block.call } task_id = task.id @dependencies.logger.success("Task `#{task_id}` ran successfully.") @dependencies.storage.register_report(Kronos::Report.success_from(task_id, metadata)) end
register_task_failure(task_id, error)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 84 def register_task_failure(task_id, error) @dependencies.logger.error("Task `#{task_id}` failed.") @dependencies.storage.register_report(Kronos::Report.failure_from(task_id, error)) end
remove_task_from_schedule(task_id)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 61 def remove_task_from_schedule(task_id) @dependencies.logger.info("Task `#{task_id}` was removed from definitions. Removing from schedule too.") storage = @dependencies.storage storage.remove(task_id) storage.remove_reports_for(task_id) end
run_resolved_tasks()
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 50 def run_resolved_tasks @dependencies.storage.resolved_tasks.each(&method(:process_task)) end
run_task(task)
click to toggle source
rubocop:disable RescueException
# File lib/kronos/runner/synchronous.rb, line 69 def run_task(task) task_id = task.id @lock_manager.lock_and_execute(task_id) { raw_execute_task(task) } rescue ::Exception => error register_task_failure(task_id, error) end
schedule(task, next_run)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 102 def schedule(task, next_run) task_id = task.id @dependencies.logger.info("Scheduling `#{task_id}` to run `#{next_run.iso8601}`") @dependencies.storage.schedule(Kronos::ScheduledTask.new(task_id, next_run)) end
schedule_current_tasks()
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 42 def schedule_current_tasks @tasks.each(&method(:schedule_task)) end
schedule_next_run(task)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 96 def schedule_next_run(task) next_run = task.time return @dependencies.storage.remove(task.id) if next_run < Time.now schedule(task, next_run) end
schedule_task(task)
click to toggle source
# File lib/kronos/runner/synchronous.rb, line 46 def schedule_task(task) schedule_next_run(task) unless @dependencies.storage.pending?(task) end