class Lambdakiq::Job
Attributes
error[R]
record[R]
Public Class Methods
handler(event)
click to toggle source
# File lib/lambdakiq/job.rb, line 8 def handler(event) records = Event.records(event) jobs = records.map { |record| new(record) } jobs.each(&:perform) jwerror = jobs.detect{ |j| j.error } return unless jwerror raise JobError.new(jwerror.error) end
new(record)
click to toggle source
# File lib/lambdakiq/job.rb, line 19 def initialize(record) @record = Record.new(record) @error = false end
Public Instance Methods
active_job()
click to toggle source
# File lib/lambdakiq/job.rb, line 31 def active_job @active_job ||= ActiveJob::Base.deserialize(job_data) end
execute()
click to toggle source
# File lib/lambdakiq/job.rb, line 51 def execute ActiveJob::Base.execute(job_data) delete_message rescue Exception => e increment_executions perform_error(e) end
executions()
click to toggle source
# File lib/lambdakiq/job.rb, line 39 def executions active_job.executions end
job_data()
click to toggle source
# File lib/lambdakiq/job.rb, line 24 def job_data @job_data ||= JSON.parse(record.body).tap do |data| data['provider_job_id'] = record.message_id data['executions'] = record.receive_count - 1 end end
perform()
click to toggle source
# File lib/lambdakiq/job.rb, line 43 def perform if fifo_delay? fifo_delay raise FifoDelayError, active_job.job_id end execute end
queue()
click to toggle source
# File lib/lambdakiq/job.rb, line 35 def queue Lambdakiq.client.queues[active_job.queue_name] end
Private Instance Methods
change_message_visibility()
click to toggle source
# File lib/lambdakiq/job.rb, line 81 def change_message_visibility return false if max_receive_count? params = client_params.merge visibility_timeout: record.next_visibility_timeout client.change_message_visibility(params) true end
client()
click to toggle source
# File lib/lambdakiq/job.rb, line 88 def client Lambdakiq.client.sqs end
client_params()
click to toggle source
# File lib/lambdakiq/job.rb, line 61 def client_params { queue_url: queue.queue_url, receipt_handle: record.receipt_handle } end
delete_message()
click to toggle source
# File lib/lambdakiq/job.rb, line 75 def delete_message client.delete_message(client_params) rescue Exception => e true end
fifo_delay()
click to toggle source
# File lib/lambdakiq/job.rb, line 106 def fifo_delay params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout client.change_message_visibility(params) end
fifo_delay?()
click to toggle source
# File lib/lambdakiq/job.rb, line 102 def fifo_delay? queue.fifo? && record.fifo_delay_seconds? end
increment_executions()
click to toggle source
# File lib/lambdakiq/job.rb, line 111 def increment_executions active_job.executions = active_job.executions + 1 end
instrument(name, error: nil, wait: nil)
click to toggle source
# File lib/lambdakiq/job.rb, line 115 def instrument(name, error: nil, wait: nil) active_job.send :instrument, name, error: error, wait: wait end
max_receive_count?()
click to toggle source
# File lib/lambdakiq/job.rb, line 92 def max_receive_count? executions > retry_limit end
perform_error(e)
click to toggle source
# File lib/lambdakiq/job.rb, line 65 def perform_error(e) if change_message_visibility instrument :enqueue_retry, error: e, wait: record.next_visibility_timeout @error = e else instrument :retry_stopped, error: e delete_message end end
retry_limit()
click to toggle source
# File lib/lambdakiq/job.rb, line 96 def retry_limit config_retry = [Lambdakiq.config.max_retries, 12].min [ (active_job.lambdakiq_retry || config_retry), (queue.max_receive_count - 1) ].min end