class Eventboss::LongPoller

LongPoller fetches messages from SQS using Long Polling docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html It starts one thread per queue (handled by Launcher)

Constants

TIME_WAIT

Attributes

id[R]
listener[R]
queue[R]

Public Class Methods

new(launcher, bus, client, queue, listener) click to toggle source
# File lib/eventboss/long_poller.rb, line 13
def initialize(launcher, bus, client, queue, listener)
  @id = "poller-#{queue.name}"
  @launcher = launcher
  @bus = bus
  @client = client
  @queue = queue
  @listener = listener
  @thread = nil
  @stop = false
end

Public Instance Methods

fetch_and_dispatch() click to toggle source
# File lib/eventboss/long_poller.rb, line 44
def fetch_and_dispatch
  fetch_messages.each do |message|
    logger.debug(id) { "enqueueing message #{message.message_id}" }
    @bus << UnitOfWork.new(@client, queue, listener, message)
  rescue ClosedQueueError
    logger.info(id) { "skip message #{message.message_id} enqueuing due to closed queue" }
  end
end
kill(wait = false) click to toggle source
# File lib/eventboss/long_poller.rb, line 34
def kill(wait = false)
  @stop = true
  return unless @thread
  @thread.value if wait

  # Force shutdown of poller, in case the loop is stuck
  @thread.raise Eventboss::Shutdown
  @thread.value if wait
end
run() click to toggle source
# File lib/eventboss/long_poller.rb, line 53
def run
  fetch_and_dispatch until @stop
  @launcher.poller_stopped(self)
rescue Eventboss::Shutdown
  @launcher.poller_stopped(self)
rescue Aws::SQS::Errors::NonExistentQueue
  handle_exception(exception, poller_id: id)
  @launcher.poller_stopped(self)
rescue StandardError => exception
  handle_exception(exception, poller_id: id)
  # Give a chance for temporary AWS errors to be resolved
  # Sleep guarantees against repeating fast failure errors
  sleep TIME_WAIT
  @launcher.poller_stopped(self, restart: @stop == false)
end
start() click to toggle source
# File lib/eventboss/long_poller.rb, line 24
def start
  @thread = safe_thread(id, &method(:run))
end
terminate(wait = false) click to toggle source
# File lib/eventboss/long_poller.rb, line 28
def terminate(wait = false)
  @stop = true
  return unless @thread
  @thread.value if wait
end

Private Instance Methods

fetch_messages() click to toggle source
# File lib/eventboss/long_poller.rb, line 71
def fetch_messages
  logger.debug(id) { 'fetching messages' }
  @client.receive_message(
    queue_url: queue.url,
    max_number_of_messages: 10,
    wait_time_seconds: TIME_WAIT
  ).messages
end