class ActiveJob::QueueAdapters::LambdakiqAdapter

Public Instance Methods

enqueue(job, options = {}) click to toggle source
# File lib/lambdakiq/adapter.rb, line 5
def enqueue(job, options = {})
  job.lambdakiq_async? ? _enqueue_async(job, options) : _enqueue(job, options)
end
enqueue_at(job, timestamp) click to toggle source
# File lib/lambdakiq/adapter.rb, line 9
def enqueue_at(job, timestamp)
  enqueue job, delay_seconds: delay_seconds(timestamp)
end

Private Instance Methods

_enqueue(job, options = {}) click to toggle source
# File lib/lambdakiq/adapter.rb, line 20
def _enqueue(job, options = {})
  queue = Lambdakiq.client.queues[job.queue_name]
  queue.send_message job, options
end
_enqueue_async(job, options = {}) click to toggle source
# File lib/lambdakiq/adapter.rb, line 25
def _enqueue_async(job, options = {})
  Concurrent::Promise
    .execute { _enqueue(job, options) }
    .on_error { |e| async_enqueue_error(e) }
end
async_enqueue_error(e) click to toggle source
# File lib/lambdakiq/adapter.rb, line 31
def async_enqueue_error(e)
  msg = "[Lambdakiq] Failed to queue job #{job}. Reason: #{e}"
  Rails.logger.error(msg)
end
delay_seconds(timestamp) click to toggle source
# File lib/lambdakiq/adapter.rb, line 15
def delay_seconds(timestamp)
  ds = (timestamp - Time.current.to_i).to_i
  [ds, 900].min
end