class Freddy::Consumers::RespondToConsumer
Public Class Methods
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 6 def self.consume(**attrs, &) new(**attrs).consume(&) end
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 10 def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:, options:) @consume_thread_pool = thread_pool @destination = destination @channel = channel @handler_adapter_factory = handler_adapter_factory @options = options end
Public Instance Methods
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 18 def consume consumer = consume_from_destination do |delivery| adapter = @handler_adapter_factory.for(delivery) msg_handler = MessageHandler.new(adapter, delivery) yield(delivery.payload, msg_handler) end ResponderHandler.new(consumer, @consume_thread_pool) end
Private Instance Methods
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 31 def consume_from_destination(&block) @channel.queue( @destination, durable: durable?, no_declare: no_declare? ).subscribe(manual_ack: true) do |delivery| process_message(delivery, &block) end end
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 49 def durable? @options.fetch(:durable, true) end
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 53 def no_declare? @options.fetch(:no_declare, Freddy::DEFAULT_NO_DECLARE) end
Source
# File lib/freddy/consumers/respond_to_consumer.rb, line 39 def process_message(delivery) @consume_thread_pool.post do delivery.in_span do yield(delivery) end ensure @channel.acknowledge(delivery.tag, false) end end