class Lambdakiq::Job

Attributes

error[R]
record[R]

Public Class Methods

handler(event) click to toggle source
# File lib/lambdakiq/job.rb, line 8
def handler(event)
  records = Event.records(event)
  jobs = records.map { |record| new(record) }
  jobs.each(&:perform)
  jwerror = jobs.detect{ |j| j.error }
  return unless jwerror
  raise JobError.new(jwerror.error)
end
new(record) click to toggle source
# File lib/lambdakiq/job.rb, line 19
def initialize(record)
  @record = Record.new(record)
  @error = false
end

Public Instance Methods

active_job() click to toggle source
# File lib/lambdakiq/job.rb, line 31
def active_job
  @active_job ||= ActiveJob::Base.deserialize(job_data)
end
execute() click to toggle source
# File lib/lambdakiq/job.rb, line 51
def execute
  ActiveJob::Base.execute(job_data)
  delete_message
rescue Exception => e
  increment_executions
  perform_error(e)
end
executions() click to toggle source
# File lib/lambdakiq/job.rb, line 39
def executions
  active_job.executions
end
job_data() click to toggle source
# File lib/lambdakiq/job.rb, line 24
def job_data
  @job_data ||= JSON.parse(record.body).tap do |data|
    data['provider_job_id'] = record.message_id
    data['executions'] = record.receive_count - 1
  end
end
perform() click to toggle source
# File lib/lambdakiq/job.rb, line 43
def perform
  if fifo_delay?
    fifo_delay
    raise FifoDelayError, active_job.job_id
  end
  execute
end
queue() click to toggle source
# File lib/lambdakiq/job.rb, line 35
def queue
  Lambdakiq.client.queues[active_job.queue_name]
end

Private Instance Methods

change_message_visibility() click to toggle source
# File lib/lambdakiq/job.rb, line 81
def change_message_visibility
  return false if max_receive_count?
  params = client_params.merge visibility_timeout: record.next_visibility_timeout
  client.change_message_visibility(params)
  true
end
client() click to toggle source
# File lib/lambdakiq/job.rb, line 88
def client
  Lambdakiq.client.sqs
end
client_params() click to toggle source
# File lib/lambdakiq/job.rb, line 61
def client_params
  { queue_url: queue.queue_url, receipt_handle: record.receipt_handle }
end
delete_message() click to toggle source
# File lib/lambdakiq/job.rb, line 75
def delete_message
  client.delete_message(client_params)
rescue Exception => e
  true
end
fifo_delay() click to toggle source
# File lib/lambdakiq/job.rb, line 106
def fifo_delay
  params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout
  client.change_message_visibility(params)
end
fifo_delay?() click to toggle source
# File lib/lambdakiq/job.rb, line 102
def fifo_delay?
  queue.fifo? && record.fifo_delay_seconds?
end
increment_executions() click to toggle source
# File lib/lambdakiq/job.rb, line 111
def increment_executions
  active_job.executions = active_job.executions + 1
end
instrument(name, error: nil, wait: nil) click to toggle source
# File lib/lambdakiq/job.rb, line 115
def instrument(name, error: nil, wait: nil)
  active_job.send :instrument, name, error: error, wait: wait
end
max_receive_count?() click to toggle source
# File lib/lambdakiq/job.rb, line 92
def max_receive_count?
  executions > retry_limit
end
perform_error(e) click to toggle source
# File lib/lambdakiq/job.rb, line 65
def perform_error(e)
  if change_message_visibility
    instrument :enqueue_retry, error: e, wait: record.next_visibility_timeout
    @error = e
  else
    instrument :retry_stopped, error: e
    delete_message
  end
end
retry_limit() click to toggle source
# File lib/lambdakiq/job.rb, line 96
def retry_limit
  config_retry = [Lambdakiq.config.max_retries, 12].min
  [ (active_job.lambdakiq_retry || config_retry),
    (queue.max_receive_count - 1) ].min
end