class Lambdakiq::Message

Constants

LAMBDAKIQ_ATTRIBUTE

Attributes

job[R]
options[R]
queue[R]

Public Class Methods

new(queue, job, options = {}) click to toggle source
# File lib/lambdakiq/message.rb, line 7
def initialize(queue, job, options = {})
  @queue = queue
  @job = job
  @options = options
end

Public Instance Methods

params() click to toggle source
# File lib/lambdakiq/message.rb, line 13
def params
  message_params.merge(message_options)
end

Private Instance Methods

delay_seconds() click to toggle source
# File lib/lambdakiq/message.rb, line 50
def delay_seconds
  options[:delay_seconds] || 0
end
delay_seconds?() click to toggle source
# File lib/lambdakiq/message.rb, line 54
def delay_seconds?
  !delay_seconds.zero?
end
delay_seconds_attribute() click to toggle source
# File lib/lambdakiq/message.rb, line 58
def delay_seconds_attribute
  if queue.fifo? && delay_seconds?
    { 'delay_seconds' => { string_value: delay_seconds.to_s, data_type: 'String' } }
  else
    {}
  end
end
message_attributes() click to toggle source
# File lib/lambdakiq/message.rb, line 46
def message_attributes
  LAMBDAKIQ_ATTRIBUTE.merge(delay_seconds_attribute)
end
message_body() click to toggle source
# File lib/lambdakiq/message.rb, line 33
def message_body
  JSON.dump(job.serialize)
end
message_options() click to toggle source
# File lib/lambdakiq/message.rb, line 25
def message_options
  if queue.fifo?
    options.except(:delay_seconds)
  else
    options
  end
end
message_params() click to toggle source
# File lib/lambdakiq/message.rb, line 19
def message_params
  { message_body: message_body,
    message_attributes: message_attributes }
    .merge(message_params_fifo)
end
message_params_fifo() click to toggle source
# File lib/lambdakiq/message.rb, line 37
def message_params_fifo
  if queue.fifo?
    { message_group_id: job.job_id,
      message_deduplication_id: job.job_id }
  else
    {}
  end
end