class RailwayIpc::Consumer
Public Class Methods
handle(message_type, with:)
click to toggle source
rubocop:enable Metrics/MethodLength
# File lib/railway_ipc/consumer/consumer.rb, line 39 def self.handle(message_type, with:) handlers.register(message: message_type, handler: with) end
inherited(base)
click to toggle source
Calls superclass method
# File lib/railway_ipc/consumer/consumer.rb, line 7 def self.inherited(base) super base.instance_eval do def handlers @handlers ||= RailwayIpc::HandlerStore.new end end end
listen_to(queue:, exchange:, options: {})
click to toggle source
rubocop:disable Metrics/MethodLength
# File lib/railway_ipc/consumer/consumer.rb, line 18 def self.listen_to(queue:, exchange:, options: {}) unless options.empty? RailwayIpc.logger.info( "Overriding configuration for #{queue} with new options", feature: 'railway_ipc_consumer', options: options ) end from_queue queue, { exchange: exchange, durable: true, exchange_type: :fanout, arguments: { 'x-dead-letter-exchange' => 'ipc:errors' }, connection: RailwayIpc.bunny_connection }.merge(options) end
Public Instance Methods
exchange_name()
click to toggle source
# File lib/railway_ipc/consumer/consumer.rb, line 55 def exchange_name queue.opts[:exchange] end
get_handler(type)
click to toggle source
# File lib/railway_ipc/consumer/consumer.rb, line 81 def get_handler(type) manifest = handlers.get(type) manifest ? manifest.handler.new : nil end
handlers()
click to toggle source
# File lib/railway_ipc/consumer/consumer.rb, line 11 def handlers @handlers ||= RailwayIpc::HandlerStore.new end
queue_name()
click to toggle source
# File lib/railway_ipc/consumer/consumer.rb, line 51 def queue_name queue.name end
registered_handlers()
click to toggle source
# File lib/railway_ipc/consumer/consumer.rb, line 47 def registered_handlers handlers.registered end
work_with_params(payload, _delivery_info, metadata)
click to toggle source
REFACTOR: Long term we should think about not leaking Sneakers methods as part of Railway's public API since clients can (and do) override them. -BN
# File lib/railway_ipc/consumer/consumer.rb, line 62 def work_with_params(payload, _delivery_info, metadata) headers = metadata.headers || {} message_format = headers.fetch('message_format', 'binary_protobuf') message = RailwayIpc::IncomingMessage.new(payload, message_format: message_format) RailwayIpc::ProcessIncomingMessage.call(self, message) ack! rescue StandardError => e RailwayIpc.logger.error( e.message, feature: 'railway_ipc_consumer', exchange: exchange_name, queue: queue_name, error: e.class, payload: payload ) reject! end