class RailwayIpc::Server

Attributes

message[R]
rabbit_connection[R]

rubocop:enable Metrics/AbcSize rubocop:enable Metrics/MethodLength

responder[R]

Public Class Methods

new(_queue, _pool, opts={ automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 17
def initialize(_queue, _pool, opts={ automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter)
  @rabbit_connection = rabbit_adapter.new(
    queue_name: self.class.queue_name,
    exchange_name: self.class.exchange_name,
    options: opts
  )
end
respond_to(message_type, with:) click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 13
def self.respond_to(message_type, with:)
  RailwayIpc::RPC::ServerResponseHandlers.instance.register(handler: with, message: message_type)
end

Public Instance Methods

handle_request(payload) click to toggle source

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength

# File lib/railway_ipc/rpc/server/server.rb, line 67
def handle_request(payload)
  response = work(payload)
rescue StandardError => e
  RailwayIpc.logger.error(
    'Error responding to message.',
    exception: e,
    feature: 'railway_ipc_consumer',
    exchange: self.class.exchange_name,
    queue: self.class.queue_name,
    protobuf: { type: message.class, data: message }
  )
  response = self.class.rpc_error_adapter_class.error_message(e, message)
ensure
  if response
    rabbit_connection.reply(
      RailwayIpc::Rabbitmq::Payload.encode(response), message.reply_to
    )
  end
end
run() click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 25
def run
  rabbit_connection
    .connect
    .create_exchange
    .create_queue(durable: true)
    .bind_queue_to_exchange
  subscribe_to_queue
end
stop() click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 34
def stop
  rabbit_connection.disconnect
end
work(payload) click to toggle source

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength

# File lib/railway_ipc/rpc/server/server.rb, line 40
def work(payload)
  decoded_payload = RailwayIpc::Rabbitmq::Payload.decode(payload)
  case decoded_payload.type
  when *registered_handlers
    responder = get_responder(decoded_payload)
    @message = get_message_class(decoded_payload).decode(decoded_payload.message)
    responder.respond(message)
  else
    @message = LearnIpc::ErrorMessage.decode(decoded_payload.message)
    raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}")
  end
rescue StandardError => e
  RailwayIpc.logger.error(
    e.message,
    feature: 'railway_ipc_consumer',
    exchange: self.class.exchange_name,
    queue: self.class.queue_name,
    error: e.class,
    payload: payload
  )
  raise e
end

Private Instance Methods

get_message_class(decoded_payload) click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 93
def get_message_class(decoded_payload)
  RailwayIpc::RPC::ServerResponseHandlers.instance.get(decoded_payload.type).message
end
get_responder(decoded_payload) click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 97
def get_responder(decoded_payload)
  RailwayIpc::RPC::ServerResponseHandlers.instance.get(decoded_payload.type).handler.new
end
registered_handlers() click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 101
def registered_handlers
  RailwayIpc::RPC::ServerResponseHandlers.instance.registered
end
subscribe_to_queue() click to toggle source
# File lib/railway_ipc/rpc/server/server.rb, line 105
def subscribe_to_queue
  rabbit_connection.subscribe do |_delivery_info, _metadata, payload|
    handle_request(payload)
  end
end