class MagicPipe::Transports::Sqs

Public Class Methods

new(config, metrics) click to toggle source
Calls superclass method MagicPipe::Transports::Base::new
# File lib/magic_pipe/transports/sqs.rb, line 11
def initialize(config, metrics)
  super(config, metrics)
  @options = @config.sqs_transport_options
  @client = Aws::SQS::Client.new
end

Public Instance Methods

submit!(payload, metadata) click to toggle source

The AWS SQS client will raise an error if it can't submit the message.

# File lib/magic_pipe/transports/sqs.rb, line 21
def submit!(payload, metadata)
  send_message(payload, metadata)
end

Private Instance Methods

meta_attributes(metadata) click to toggle source
# File lib/magic_pipe/transports/sqs.rb, line 49
def meta_attributes(metadata)
  {
    "topic" => {
      string_value: metadata[:topic],
      data_type: "String", # required
    },
    "producer" => {
      string_value: metadata[:producer],
      data_type: "String", # required
    },
    "sent_at" => {
      string_value: metadata[:time].to_s,
      data_type: "Number", # required
    },
    "mime" => {
      string_value: metadata[:mime],
      data_type: "String", # required
    },
  }
end
queue_name() click to toggle source
# File lib/magic_pipe/transports/sqs.rb, line 29
def queue_name
  @options.fetch(:queue)
end
queue_url() click to toggle source
# File lib/magic_pipe/transports/sqs.rb, line 34
def queue_url
  @queue_url ||= @client.get_queue_url(queue_name: queue_name).queue_url
end
send_message(payload, metadata) click to toggle source
# File lib/magic_pipe/transports/sqs.rb, line 39
def send_message(payload, metadata)
  @client.send_message({
    queue_url: queue_url, # required
    message_body: payload, # required
    delay_seconds: 0,
    message_attributes: meta_attributes(metadata)
  })
end