class Realm::SNS::Gateway::Worker
Public Class Methods
new(queue_map, event_factory:, event_processing_attempts: 3, logger: nil)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 7 def initialize(queue_map, event_factory:, event_processing_attempts: 3, logger: nil) @queue_map = queue_map @event_factory = event_factory @event_processing_attempts = event_processing_attempts @logger = logger || Logger.new($stdout) @threads = [] end
Public Instance Methods
join(timeout = nil)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 31 def join(timeout = nil) @threads.each { |thread| thread.join(timeout) } end
start(poller_options: {})
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 15 def start(poller_options: {}) @signaler = { exiting: false } @queue_map.each_pair do |queue, listener| @threads << Thread.new { run_poller(queue, listener, @signaler, poller_options) } end self end
stop(timeout: 30)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 23 def stop(timeout: 30) Thread.new { @logger.info("Stopping worker (timeout: #{timeout}s)") }.join # Cannot log from trap context @signaler[:exiting] = true join(timeout) @threads.clear self end
Private Instance Methods
before_request_proc(queue, signaler)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 58 def before_request_proc(queue, signaler) proc { if signaler[:exiting] @logger.info("Stopping polling #{queue.arn}") throw :stop_polling end } end
handle_message(listener, msg)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 76 def handle_message(listener, msg) event = message_to_event(msg) listener.(event) rescue StandardError => e log_error(e, event, msg) # Picks up the message again after visibility_timeout runs out: throw :skip_delete if event && message_receive_count(msg) < @event_processing_attempts end
init_poller(queue, signaler, options = {})
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 46 def init_poller(queue, signaler, options = {}) Aws::SQS::QueuePoller.new( queue.url, max_number_of_messages: 10, visibility_timeout: 60, attribute_names: ['ApproximateReceiveCount'], message_attribute_names: ['event_type'], before_request: before_request_proc(queue, signaler), **options, ) end
log_error(error, event, msg)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 97 def log_error(error, event, msg) return @logger.fatal("Unexpected message in queue: #{msg}; error: #{error.full_message}") unless event attempt = message_receive_count(msg) will_retry = attempt < @event_processing_attempts log_line = [ "Processing of event failed type=#{event.type} id=#{event.head.id} attempt=#{attempt},", "#{will_retry ? 'will retry' : 'final'}):\n#{error.full_message}", ].join(' ') @logger.send(will_retry ? :warn : :error, log_line) end
log_poller_stats(queue, stats)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 67 def log_poller_stats(queue, stats) @logger.info( message: "Poller #{queue.arn} stats", request_count: stats.request_count, message_count: stats.received_message_count, last_message_received_at: stats.last_message_received_at, ) end
message_receive_count(msg)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 93 def message_receive_count(msg) msg.attributes['ApproximateReceiveCount'].to_i end
message_to_event(msg)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 85 def message_to_event(msg) event_type = msg.message_attributes['event_type'].string_value raise 'Message is missing event type' unless event_type payload = JSON.parse(msg.body).deep_symbolize_keys @event_factory.create_event(event_type, payload) end
run_poller(queue, listener, signaler, options)
click to toggle source
# File lib/realm/sns/gateway/worker.rb, line 37 def run_poller(queue, listener, signaler, options) @logger.info("Start polling #{queue.arn}") init_poller(queue, signaler, options).poll do |messages, stats| log_poller_stats(queue, stats) messages.each { |msg| handle_message(listener, msg) } end @logger.info("Polling stopped #{queue.arn}") end