class Maitredee::Adapters::SnsSqsAdapter
Attributes
access_key_id[R]
region[R]
secret_access_key[R]
Public Class Methods
new(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil)
click to toggle source
@param access_key_id
[String] if `nil` will look in `ENV` @param secret_access_key
[String] if `nil` will look in `ENV` @param region [String] if `nil` will look in `ENV` @param default_shoryuken_options
[Hash] default options of the shoryuken job listening to the queues
defaults to `{ body_parser: :json, auto_delete: true }`
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 15 def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"] @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"] @region = region || ENV["MAITREDEE_AWS_REGION"] @default_shoryuken_options = default_shoryuken_options Shoryuken.sqs_client = sqs_client end
Public Instance Methods
add_worker(subscriber_class)
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 107 def add_worker(subscriber_class) worker_name = "#{subscriber_class.name}Worker" worker_class = self.class.const_defined?(worker_name) unless worker_class worker_class = Class.new(Worker) worker_class.shoryuken_options default_shoryuken_options.merge( queue: subscriber_class.queue_resource_name ) worker_class.subscriber_class = subscriber_class self.class.const_set worker_name, worker_class end worker_class end
configure_broker(config)
click to toggle source
creates topics from keys and queues from values, and subscribes queues to topics @param config [Hash{String => Array<String>}]
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 45 def configure_broker(config) config.each do |topic_resource_name, queue_resource_names| queue_resource_names.each do |queue_resource_name| subscribe( topic_resource_name: topic_resource_name, queue_resource_name: queue_resource_name ) end end end
default_shoryuken_options()
click to toggle source
@api private
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 122 def default_shoryuken_options @default_shoryuken_options ||= { body_parser: :json, auto_delete: true, retry_intervals: ->(attempts) { ((attempts - 1) ** 4) + 15 + (rand(30) * (attempts)) } } end
publish(message)
click to toggle source
publishes message to SNS @param message [PublisherMessage]
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 26 def publish(message) message_attributes = { message_id: message.message_id, topic_name: message.topic_name, event_name: message.event_name, primary_key: message.primary_key, schema_name: message.schema_name, maitredee_version: Maitredee::VERSION }.compact sns_client.publish( topic_arn: topics[message.topic_resource_name].arn, message: JSON.dump(message.body), message_attributes: sns_message_attributes(message_attributes) ) end
queues()
click to toggle source
@api private
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 67 def queues @queues ||= Hash.new do |hash, key| queue_url = sqs_client.create_queue(queue_name: key).queue_url hash[key] = Aws::SQS::Queue.new(queue_url, client: sqs_client) end end
reset()
click to toggle source
deletes all topics, queues, and subscriptions @api private
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 132 def reset [topics, queues, subscriptions].each do |resource| resource.values.each(&:delete) resource.clear end end
subscribe(queue_resource_name:, topic_resource_name:)
click to toggle source
subscribes a queue to a topic @param queue_resource_name [String] @param topic_resource_name [String]
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 82 def subscribe(queue_resource_name:, topic_resource_name:) topic = topics[topic_resource_name] queue = queues[queue_resource_name] queue_arn = queue.attributes["QueueArn"] resp = sns_client.subscribe( topic_arn: topic.arn, protocol: "sqs", endpoint: queue_arn, attributes: { "RawMessageDelivery" => "true" } ) subscriptions[resp.subscription_arn] = Aws::SNS::Subscription.new(resp.subscription_arn, client: sns_client) queue.set_attributes( attributes: { "Policy" => sqs_policy( queue_arn: queue_arn, topic_arn: topic.arn ) } ) end
subscriptions()
click to toggle source
@api private
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 75 def subscriptions @subscriptions ||= {} end
topics()
click to toggle source
@api private
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 57 def topics @topics ||= Hash.new do |hash, key| topic = sns_client.create_topic( name: key ) hash[key] = Aws::SNS::Topic.new(topic.topic_arn, client: sns_client) end end
Private Instance Methods
new_client(klass)
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 149 def new_client(klass) options = {} if access_key_id && secret_access_key options.merge!( access_key_id: access_key_id, secret_access_key: secret_access_key ) end options[:region] = region if region klass.new(options) end
sns_client()
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 141 def sns_client @sns_client ||= new_client(Aws::SNS::Client) end
sns_message_attributes(hash)
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 164 def sns_message_attributes(hash) hash.compact.each_with_object({}) do |(key, val), new_hash| new_hash[key.to_s] = { data_type: "String", string_value: val } end end
sqs_client()
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 145 def sqs_client @sqs_client ||= new_client(Aws::SQS::Client) end
sqs_policy(queue_arn:, topic_arn:)
click to toggle source
# File lib/maitredee/adapters/sns_sqs_adapter.rb, line 173 def sqs_policy(queue_arn:, topic_arn:) <<~POLICY { "Version": "2008-10-17", "Id": "#{queue_arn}/SQSDefaultPolicy", "Statement": [ { "Sid": "#{queue_arn}-Sid", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:*", "Resource": "#{queue_arn}", "Condition": { "StringEquals": { "aws:SourceArn": "#{topic_arn}" } } } ] } POLICY end