class Maitredee::Adapters::SnsSqsAdapter

Attributes

access_key_id[R]
region[R]
secret_access_key[R]

Public Class Methods

new(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) click to toggle source

@param access_key_id [String] if `nil` will look in `ENV` @param secret_access_key [String] if `nil` will look in `ENV` @param region [String] if `nil` will look in `ENV` @param default_shoryuken_options [Hash] default options of the shoryuken job listening to the queues

defaults to `{ body_parser: :json, auto_delete: true }`
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 15
def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil)
  @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]
  @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]
  @region = region || ENV["MAITREDEE_AWS_REGION"]
  @default_shoryuken_options = default_shoryuken_options

  Shoryuken.sqs_client = sqs_client
end

Public Instance Methods

add_worker(subscriber_class) click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 107
def add_worker(subscriber_class)
  worker_name = "#{subscriber_class.name}Worker"
  worker_class = self.class.const_defined?(worker_name)
  unless worker_class
    worker_class = Class.new(Worker)
    worker_class.shoryuken_options default_shoryuken_options.merge(
      queue: subscriber_class.queue_resource_name
    )
    worker_class.subscriber_class = subscriber_class
    self.class.const_set worker_name, worker_class
  end
  worker_class
end
configure_broker(config) click to toggle source

creates topics from keys and queues from values, and subscribes queues to topics @param config [Hash{String => Array<String>}]

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 45
def configure_broker(config)
  config.each do |topic_resource_name, queue_resource_names|
    queue_resource_names.each do |queue_resource_name|
      subscribe(
        topic_resource_name: topic_resource_name,
        queue_resource_name: queue_resource_name
      )
    end
  end
end
default_shoryuken_options() click to toggle source

@api private

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 122
def default_shoryuken_options
  @default_shoryuken_options ||= {
    body_parser: :json,
    auto_delete: true,
    retry_intervals: ->(attempts) { ((attempts - 1) ** 4) + 15 + (rand(30) * (attempts)) }
  }
end
publish(message) click to toggle source

publishes message to SNS @param message [PublisherMessage]

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 26
def publish(message)
  message_attributes = {
    message_id: message.message_id,
    topic_name: message.topic_name,
    event_name: message.event_name,
    primary_key: message.primary_key,
    schema_name: message.schema_name,
    maitredee_version: Maitredee::VERSION
  }.compact

  sns_client.publish(
    topic_arn: topics[message.topic_resource_name].arn,
    message: JSON.dump(message.body),
    message_attributes: sns_message_attributes(message_attributes)
  )
end
queues() click to toggle source

@api private

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 67
def queues
  @queues ||= Hash.new do |hash, key|
    queue_url = sqs_client.create_queue(queue_name: key).queue_url
    hash[key] = Aws::SQS::Queue.new(queue_url, client: sqs_client)
  end
end
reset() click to toggle source

deletes all topics, queues, and subscriptions @api private

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 132
def reset
  [topics, queues, subscriptions].each do |resource|
    resource.values.each(&:delete)
    resource.clear
  end
end
subscribe(queue_resource_name:, topic_resource_name:) click to toggle source

subscribes a queue to a topic @param queue_resource_name [String] @param topic_resource_name [String]

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 82
def subscribe(queue_resource_name:, topic_resource_name:)
  topic = topics[topic_resource_name]
  queue = queues[queue_resource_name]
  queue_arn = queue.attributes["QueueArn"]

  resp = sns_client.subscribe(
    topic_arn: topic.arn,
    protocol: "sqs",
    endpoint: queue_arn,
    attributes: { "RawMessageDelivery" => "true" }
  )

  subscriptions[resp.subscription_arn] =
    Aws::SNS::Subscription.new(resp.subscription_arn, client: sns_client)

  queue.set_attributes(
    attributes: {
      "Policy" => sqs_policy(
        queue_arn: queue_arn,
        topic_arn: topic.arn
      )
    }
  )
end
subscriptions() click to toggle source

@api private

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 75
def subscriptions
  @subscriptions ||= {}
end
topics() click to toggle source

@api private

# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 57
def topics
  @topics ||= Hash.new do |hash, key|
    topic = sns_client.create_topic(
      name: key
    )
    hash[key] = Aws::SNS::Topic.new(topic.topic_arn, client: sns_client)
  end
end

Private Instance Methods

new_client(klass) click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 149
def new_client(klass)
  options = {}

  if access_key_id && secret_access_key
    options.merge!(
      access_key_id: access_key_id,
      secret_access_key: secret_access_key
    )
  end

  options[:region] = region if region

  klass.new(options)
end
sns_client() click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 141
def sns_client
  @sns_client ||= new_client(Aws::SNS::Client)
end
sns_message_attributes(hash) click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 164
def sns_message_attributes(hash)
  hash.compact.each_with_object({}) do |(key, val), new_hash|
    new_hash[key.to_s] = {
      data_type: "String",
      string_value: val
    }
  end
end
sqs_client() click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 145
def sqs_client
  @sqs_client ||= new_client(Aws::SQS::Client)
end
sqs_policy(queue_arn:, topic_arn:) click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 173
      def sqs_policy(queue_arn:, topic_arn:)
        <<~POLICY
          {
            "Version": "2008-10-17",
            "Id": "#{queue_arn}/SQSDefaultPolicy",
            "Statement": [
              {
                "Sid": "#{queue_arn}-Sid",
                "Effect": "Allow",
                "Principal": {
                  "AWS": "*"
                },
                "Action": "SQS:*",
                "Resource": "#{queue_arn}",
                "Condition": {
                  "StringEquals": {
                    "aws:SourceArn": "#{topic_arn}"
                  }
                }
              }
            ]
          }
        POLICY
      end