module SkyRunner::Job
Attributes
skyrunner_job_id[RW]
skyrunner_job_is_solo[RW]
Public Class Methods
included(base)
click to toggle source
# File lib/skyrunner/job.rb, line 5 def self.included(base) base.extend(ClassMethods) end
Public Instance Methods
consume!(job_args, task_args)
click to toggle source
# File lib/skyrunner/job.rb, line 115 def consume!(job_args, task_args) begin self.send(task_args[0].to_sym, task_args[1].symbolize_keys) handle_task_completed!(job_args) rescue Exception => e handle_task_failed!(job_args) rescue nil raise end end
execute!(job_args = {})
click to toggle source
# File lib/skyrunner/job.rb, line 35 def execute!(job_args = {}) return execute_local!(job_args) if SkyRunner::run_locally job_id = SecureRandom.hex self.skyrunner_job_id = job_id table = nil record = nil queue = SkyRunner.sqs_queue pending_args = [] fired_solo = false flush = lambda do messages = pending_args.map do |task_args| { job_id: job_id, task_id: SecureRandom.hex, job_args: job_args, task_args: task_args, job_class: self.class.name } end if record.nil? # Run an un-coordinated solo job if only one message if messages.size > 1 SkyRunner::retry_dynamo_db do table = SkyRunner.dynamo_db_table record = table.items.put(id: job_id, created_at: Time.now.to_s, task_id: job_id, class: self.class.name, args: job_args.to_json, total_tasks: 1, completed_tasks: 0, done: 0, failed: 0) end else fired_solo = true messages.each do |m| m[:is_solo] = true end end end messages = messages.map(&:to_json) dropped_message_count = 0 pending_args.clear begin queue.batch_send(messages) rescue AWS::SQS::Errors::BatchSendError => e dropped_message_count = e.errors.size # Re-add dropped args e.errors.each do |error| pending_args << JSON.parse(error[:message_body])["task_args"] end end if record SkyRunner::retry_dynamo_db do record.attributes.add({ total_tasks: messages.size - dropped_message_count }) end end end self.run(job_args) do |*task_args| pending_args << task_args if pending_args.size >= SkyRunner::SQS_MAX_BATCH_SIZE 1.upto(5) do flush.() sleep 5 if pending_args.size > 0 break if pending_args.size == 0 end end end 1.upto(5) do flush.() if pending_args.size > 0 sleep 5 if pending_args.size > 0 break if pending_args.size == 0 end unless fired_solo handle_task_completed!(job_args) end end
fire_post_event_method(event_type, job_args)
click to toggle source
# File lib/skyrunner/job.rb, line 125 def fire_post_event_method(event_type, job_args) (self.class.job_event_methods[event_type] || []).each do |method| if self.method(method).arity == 0 && self.method(method).parameters.size == 0 self.send(method) else self.send(method, job_args.symbolize_keys) end end end
is_solo?()
click to toggle source
# File lib/skyrunner/job.rb, line 31 def is_solo? self.skyrunner_job_is_solo end
Private Instance Methods
delete_task_records!()
click to toggle source
# File lib/skyrunner/job.rb, line 229 def delete_task_records! return if is_solo? delete_batch_queue = Queue.new mutex = Mutex.new delete_items_queued = false threads = [] 1.upto([1, (SkyRunner::consumer_threads / 4.0).floor].max) do threads << Thread.new do db_table = SkyRunner.dynamo_db_table loop do should_break = false mutex.synchronize do should_break = (SkyRunner::stop_consuming? || delete_items_queued) && delete_batch_queue.empty? end break if should_break if delete_batch_queue.size > 0 batch = delete_batch_queue.pop if batch SkyRunner::retry_dynamo_db do db_table.batch_delete(batch) end end else sleep 1 end end end end items_to_delete = [] table = SkyRunner.dynamo_db_table table.items.query(hash_value: "#{self.skyrunner_job_id}-tasks", select: [:id, :task_id]) do |task_item| items_to_delete << [task_item.attributes["id"], task_item.attributes["task_id"]] if items_to_delete.size >= 25 delete_batch_queue << items_to_delete items_to_delete = [] end end delete_batch_queue << items_to_delete unless items_to_delete.empty? mutex.synchronize do delete_items_queued = true end threads.each(&:join) end
dynamo_db_record()
click to toggle source
# File lib/skyrunner/job.rb, line 166 def dynamo_db_record SkyRunner.dynamo_db_table.items[self.skyrunner_job_id, self.skyrunner_job_id] end
execute_local!(job_args = {})
click to toggle source
# File lib/skyrunner/job.rb, line 137 def execute_local!(job_args = {}) job_id = SecureRandom.hex self.skyrunner_job_id = job_id task_arg_list = [] self.run(job_args) do |*task_args| task_arg_list << task_args.deep_dup end task_arg_list.shuffle! task_arg_list.each_with_index do |task_args, task_index| task = self.class.new task.skyrunner_job_id = job_id begin task.send(task_args[0].to_sym, task_args[1].symbolize_keys) rescue Exception => e task.fire_post_event_method(:failed, job_args) raise e break end if task_index == task_arg_list.size - 1 task.fire_post_event_method(:completed, job_args) end end end
handle_task_completed!(job_args)
click to toggle source
# File lib/skyrunner/job.rb, line 191 def handle_task_completed!(job_args) return false unless self.skyrunner_job_id unless is_solo? record = dynamo_db_record new_attributes = nil SkyRunner::retry_dynamo_db do new_attributes = record.attributes.add({ completed_tasks: 1 }, return: :all_new) end end if is_solo? || new_attributes["total_tasks"] == new_attributes["completed_tasks"] begin unless is_solo? if_condition = { completed_tasks: new_attributes["total_tasks"], done: 0 } SkyRunner::retry_dynamo_db do record.attributes.update(if: if_condition) do |u| u.add(done: 1) u.set(completed_at: Time.now.to_s) end end end self.fire_post_event_method(:completed, job_args) unless is_solo? delete_task_records! rescue nil end rescue AWS::DynamoDB::Errors::ConditionalCheckFailedException => e # This is OK, we had a double finisher so lets block them. end end true end
handle_task_failed!(job_args)
click to toggle source
# File lib/skyrunner/job.rb, line 170 def handle_task_failed!(job_args) return false unless self.skyrunner_job_id begin unless is_solo? record = dynamo_db_record SkyRunner::retry_dynamo_db do record.attributes.add({ failed: 1 }) end end self.fire_post_event_method(:failed, job_args) unless is_solo? delete_task_records! rescue nil end rescue Exception => e end end