class Freddy::Consumers::TapIntoConsumer
Public Class Methods
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 6 def self.consume(**attrs, &) new(**attrs).consume(&) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 10 def initialize(thread_pool:, patterns:, channel:, options:) @consume_thread_pool = thread_pool @patterns = patterns @channel = channel @options = options end
Public Instance Methods
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 17 def consume(&block) queue = create_queue consumer = queue.subscribe(manual_ack: true) do |delivery| process_message(queue, delivery, &block) end ResponderHandler.new(consumer, @consume_thread_pool) end
Private Instance Methods
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 29 def create_queue topic_exchange = @channel.topic(exchange_name) queue = if group @channel.queue("groups.#{group}", durable: durable?, no_declare: no_declare?) else @channel.queue('', exclusive: true) end @patterns.each do |pattern| queue.bind(topic_exchange, routing_key: pattern) end queue end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 70 def durable? @options.fetch(:durable, true) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 82 def exchange_name @options.fetch(:exchange_name, Freddy::FREDDY_TOPIC_EXCHANGE_NAME) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 66 def group @options.fetch(:group, nil) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 74 def no_declare? @options.fetch(:no_declare, Freddy::DEFAULT_NO_DECLARE) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 78 def on_exception @options.fetch(:on_exception, :ack) end
Source
# File lib/freddy/consumers/tap_into_consumer.rb, line 46 def process_message(_queue, delivery) @consume_thread_pool.post do delivery.in_span do yield delivery.payload, delivery.routing_key, delivery.timestamp @channel.acknowledge(delivery.tag) end rescue StandardError case on_exception when :reject @channel.reject(delivery.tag) when :requeue @channel.reject(delivery.tag, true) else @channel.acknowledge(delivery.tag) end raise end end