class RailwayIpc::Publisher

Attributes

exchange_name[R]
message_store[R]

Public Class Methods

new(opts={}) click to toggle source
# File lib/railway_ipc/publisher.rb, line 7
def initialize(opts={})
  @exchange_name = opts.fetch(:exchange_name)
  @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage)
end

Public Instance Methods

exchange() click to toggle source

rubocop:enable Metrics/AbcSize

# File lib/railway_ipc/publisher.rb, line 31
def exchange
  @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {})
end
publish(message, format='binary_protobuf') click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/railway_ipc/publisher.rb, line 13
def publish(message, format='binary_protobuf')
  outgoing_message = OutgoingMessage.new(message, exchange_name, format)
  stored_message = message_store.store_message(outgoing_message)
  RailwayIpc.logger.info('Publishing message', log_message_options(message))
  exchange.publish(outgoing_message.encoded, headers: { message_format: format })
  outgoing_message
rescue RailwayIpc::InvalidProtobuf => e
  RailwayIpc.logger.error('Invalid protobuf', log_message_options(message))
  raise e
rescue ActiveRecord::RecordInvalid => e
  RailwayIpc.logger.error('Failed to store outgoing message', log_message_options(message))
  raise RailwayIpc::FailedToStoreOutgoingMessage.new(e)
rescue StandardError => e
  stored_message&.destroy
  raise e
end

Private Instance Methods

channel() click to toggle source
# File lib/railway_ipc/publisher.rb, line 37
def channel
  RailwayIpc::ConnectionManager.instance.channel
end
log_message_options(message) click to toggle source
# File lib/railway_ipc/publisher.rb, line 41
def log_message_options(message)
  {
    feature: 'railway_ipc_publisher',
    exchange: exchange_name,
    protobuf: {
      type: message.class,
      data: message
    }
  }
end