class RailwayIpc::Publisher
Attributes
exchange_name[R]
message_store[R]
Public Class Methods
new(opts={})
click to toggle source
# File lib/railway_ipc/publisher.rb, line 7 def initialize(opts={}) @exchange_name = opts.fetch(:exchange_name) @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage) end
Public Instance Methods
exchange()
click to toggle source
rubocop:enable Metrics/AbcSize
# File lib/railway_ipc/publisher.rb, line 31 def exchange @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {}) end
publish(message, format='binary_protobuf')
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/railway_ipc/publisher.rb, line 13 def publish(message, format='binary_protobuf') outgoing_message = OutgoingMessage.new(message, exchange_name, format) stored_message = message_store.store_message(outgoing_message) RailwayIpc.logger.info('Publishing message', log_message_options(message)) exchange.publish(outgoing_message.encoded, headers: { message_format: format }) outgoing_message rescue RailwayIpc::InvalidProtobuf => e RailwayIpc.logger.error('Invalid protobuf', log_message_options(message)) raise e rescue ActiveRecord::RecordInvalid => e RailwayIpc.logger.error('Failed to store outgoing message', log_message_options(message)) raise RailwayIpc::FailedToStoreOutgoingMessage.new(e) rescue StandardError => e stored_message&.destroy raise e end
Private Instance Methods
channel()
click to toggle source
# File lib/railway_ipc/publisher.rb, line 37 def channel RailwayIpc::ConnectionManager.instance.channel end
log_message_options(message)
click to toggle source
# File lib/railway_ipc/publisher.rb, line 41 def log_message_options(message) { feature: 'railway_ipc_publisher', exchange: exchange_name, protobuf: { type: message.class, data: message } } end