module SkyRunner
Constants
- SQS_MAX_BATCH_SIZE
- stop_consuming_flag
Public Class Methods
consume!(&block)
click to toggle source
# File lib/skyrunner.rb, line 65 def self.consume!(&block) raise "Queue #{SkyRunner::sqs_queue_name} not found. Try running 'skyrunner init'" unless sqs_queue raise "DynamoDB table #{SkyRunner::dynamo_db_table_name} not found. Try running 'skyrunner init'" unless dynamo_db_table && dynamo_db_table.exists? local_queue = Queue.new threads = [] 1.upto(SkyRunner::consumer_threads) do threads << Thread.new do table = SkyRunner::dynamo_db_table loop do begin if local_queue.empty? break if SkyRunner::stop_consuming? sleep 1 next end klass, job_id, task_id, job_args, task_args, is_solo, message = local_queue.pop if klass begin unless is_solo # Avoid running the same task twice, enter record and raise error if exists already. SkyRunner::retry_dynamo_db do table.items.put({ id: "#{job_id}-tasks", task_id: task_id }, unless_exists: ["id", "task_id"]) end end SkyRunner::log :info, "Run Task: #{task_args} Job: #{job_id} Message: #{message.id}" job = klass.new job.skyrunner_job_id = job_id job.skyrunner_job_is_solo = is_solo begin job.consume!(job_args, task_args) message.delete rescue Exception => e message.delete rescue nil block.call(e) if block_given? SkyRunner::log :error, "Task Failed: #{task_args} Job: #{job_id} #{e.message} #{e.backtrace.join("\n")}" end rescue AWS::DynamoDB::Errors::ConditionalCheckFailedException => e message.delete rescue nil end end rescue Exception => e puts e.message puts e.backtrace.join("\n") raise e end end end end 1.upto((SkyRunner::consumer_threads.to_f / SQS_MAX_BATCH_SIZE).ceil + 1) do threads << Thread.new do begin loop do table = SkyRunner::dynamo_db_table queue = sqs_queue break if SkyRunner::stop_consuming? sleep 1 while local_queue.size >= SkyRunner::consumer_threads received_messages = [] queue.receive_messages(limit: SQS_MAX_BATCH_SIZE, wait_time_seconds: 5) do |message| received_messages << [message, JSON.parse(message.body)] end next unless received_messages.size > 0 job_ids = received_messages.select { |m| !m[1]["is_solo"] }.map { |m| [m[1]["job_id"], m[1]["job_id"]] }.uniq job_records = {} if job_ids.uniq.size > 0 SkyRunner::retry_dynamo_db do # Read DynamoDB records into job and task lookup tables. table.batch_get(["id", "task_id", "failed"], job_ids.uniq, consistent_read: true) do |record| job_records[record["id"]] = record end end end received_messages.each do |received_message| message, message_data = received_message job_id = message_data["job_id"] task_id = message_data["task_id"] is_solo = message_data["is_solo"] job_args = message_data["job_args"] task_args = message_data["task_args"] job_record = job_records[job_id] if is_solo || (job_record && job_record["failed"] == 0) begin klass = Kernel.const_get(message_data["job_class"]) local_queue.push([klass, job_id, task_id, job_args, task_args, is_solo, message]) rescue NameError => e block.call(e) if block_given? message.delete rescue nil log :error, "Task Failed: No such class #{message_data["job_class"]} #{e.message}" end else message.delete rescue nil end end end rescue Exception => e puts e.message puts e.backtrace.join("\n") raise e end end end log :info, "Consumer started." threads.each(&:join) true end
dynamo_db_table()
click to toggle source
# File lib/skyrunner.rb, line 196 def self.dynamo_db_table table = Thread.current.thread_variable_get(:skyrunner_dyn_table) return table if table dynamo_db.tables[SkyRunner::dynamo_db_table_name].tap do |table| table.load_schema if table && table.exists? Thread.current.thread_variable_set(:skyrunner_dyn_table, table) end end
init!(params = {})
click to toggle source
# File lib/skyrunner.rb, line 19 def self.init!(params = {}) table = self.dynamo_db_table if !table.exists? || params[:purge] table_name = SkyRunner::dynamo_db_table_name if table.exists? && params[:purge] SkyRunner.log :warn, "Purging DynamoDB table #{table_name}." table.delete sleep 1 while table.exists? end SkyRunner.log :info, "Creating DynamoDB table #{table_name}." table = dynamo_db.tables.create(table_name, SkyRunner::dynamo_db_read_capacity, SkyRunner::dynamo_db_write_capacity, hash_key: { id: :string }, range_key: { task_id: :string }) sleep 1 while table.status == :creating end queue = self.sqs_queue if !queue || params[:purge] queue_name = SkyRunner::sqs_queue_name if queue && params[:purge] SkyRunner.log :warn, "Purging SQS queue #{queue_name}. Waiting 65 seconds to re-create." queue.delete sleep 65 end SkyRunner.log :info, "Creating SQS queue #{queue_name}." queue = sqs.queues.create(queue_name, visibility_timeout: SkyRunner::sqs_visibility_timeout, message_retention_period: SkyRunner::sqs_message_retention_period) end true end
log(type, message)
click to toggle source
# File lib/skyrunner.rb, line 214 def self.log(type, message) SkyRunner::logger.send(type, "[SkyRunner] #{message}") end
retry_dynamo_db(&block)
click to toggle source
# File lib/skyrunner.rb, line 265 def self.retry_dynamo_db(&block) handler = Proc.new do |exception, num, delay| if exception SkyRunner.log :warn, "Having to retry DynamoDB requests. #{exception.message}" end end with_retries(handler: handler, max_tries: 100, rescue: AWS::DynamoDB::Errors::ProvisionedThroughputExceededException, base_sleep_seconds: 2, max_sleep_seconds: 60) do block.call end end
setup() { |self| ... }
click to toggle source
# File lib/skyrunner.rb, line 15 def self.setup yield self end
sqs_queue()
click to toggle source
# File lib/skyrunner.rb, line 206 def self.sqs_queue begin sqs.queues.named(SkyRunner::sqs_queue_name) rescue AWS::SQS::Errors::NonExistentQueue => e return nil end end
stop_consuming!(its_a_trap=false)
click to toggle source
# File lib/skyrunner.rb, line 255 def self.stop_consuming!(its_a_trap=false) if its_a_trap SkyRunner::stop_consuming_flag = true else @@stop_consuming_mutex.synchronize do SkyRunner::stop_consuming_flag = true end end end
stop_consuming?()
click to toggle source
# File lib/skyrunner.rb, line 249 def self.stop_consuming? @@stop_consuming_mutex.synchronize do SkyRunner::stop_consuming_flag end end
Private Class Methods
dynamo_db()
click to toggle source
# File lib/skyrunner.rb, line 279 def self.dynamo_db @dynamo_db ||= AWS::DynamoDB.new end
sqs()
click to toggle source
# File lib/skyrunner.rb, line 283 def self.sqs @sqs ||= AWS::SQS.new end