class RailwayIpc::Client

Attributes

message[R]
rabbit_connection[R]
request_message[RW]
response_message[RW]

Public Class Methods

handle_response(response_type) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 20
def self.handle_response(response_type)
  RPC::ClientResponseHandlers.instance.register(response_type)
end
new(request_message, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 24
def initialize(request_message, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter)
  @rabbit_connection = rabbit_adapter.new(exchange_name: self.class.exchange_name, options: opts)
  @request_message = request_message
end
request(message) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 16
def self.request(message)
  new(message).request
end

Public Instance Methods

await_response(timeout) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 68
def await_response(timeout)
  rabbit_connection.check_for_message(timeout: timeout) do |_, _, payload|
    self.response_message = process_payload(payload)
  end
rescue RailwayIpc::Rabbitmq::Adapter::TimeoutError
  # rubocop:disable Style/RedundantSelf
  error = self.class.rpc_error_adapter_class.error_message(TimeoutError.new, self.request_message)
  # rubocop:enable Style/RedundantSelf
  self.response_message = RailwayIpc::Response.new(error, success: false)
rescue StandardError
  self.response_message = RailwayIpc::Response.new(message, success: false)
ensure
  rabbit_connection.disconnect
end
process_payload(response) click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/railway_ipc/rpc/client/client.rb, line 42
def process_payload(response)
  decoded_payload = decode_payload(response)
  case decoded_payload.type
  when *registered_handlers
    @message = get_message_class(decoded_payload).decode(decoded_payload.message)
    RailwayIpc.logger.info(
      'Handling response',
      feature: 'railway_ipc_consumer',
      exchange: self.class.exchange_name,
      protobuf: { type: message.class, data: message }
    )
    RailwayIpc::Response.new(message, success: true)
  else
    @message = LearnIpc::ErrorMessage.decode(decoded_payload.message)
    raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}")
  end
end
registered_handlers() click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 37
def registered_handlers
  RailwayIpc::RPC::ClientResponseHandlers.instance.registered
end
request(timeout=10) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 29
def request(timeout=10)
  setup_rabbit_connection
  attach_reply_queue_to_message
  publish_message
  await_response(timeout)
  response_message
end
setup_rabbit_connection() click to toggle source

rubocop:enable Metrics/AbcSize

# File lib/railway_ipc/rpc/client/client.rb, line 61
def setup_rabbit_connection
  rabbit_connection
    .connect
    .create_exchange
    .create_queue(auto_delete: true, exclusive: true)
end

Private Instance Methods

attach_reply_queue_to_message() click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 103
def attach_reply_queue_to_message
  request_message.reply_to = rabbit_connection.queue.name
end
decode_for_error(exception, payload) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 117
def decode_for_error(exception, payload)
  return exception.message unless payload

  # rubocop:disable Style/RedundantSelf
  self.class.rpc_error_adapter_class.error_message(payload, self.request_message)
  # rubocop:enable Style/RedundantSelf
end
decode_payload(response) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 99
def decode_payload(response)
  RailwayIpc::Rabbitmq::Payload.decode(response)
end
get_message_class(decoded_payload) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 95
def get_message_class(decoded_payload)
  RailwayIpc::RPC::ClientResponseHandlers.instance.get(decoded_payload.type)
end
log_exception(exception, payload) click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 85
def log_exception(exception, payload)
  RailwayIpc.logger.error(
    exception.message,
    feature: 'railway_ipc_consumer',
    exchange: self.class.exchange_name,
    error: exception.class,
    payload: decode_for_error(exception, payload)
  )
end
publish_message() click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 107
def publish_message
  RailwayIpc.logger.info(
    'Sending request',
    feature: 'railway_ipc_publisher',
    exchange: self.class.exchange_name,
    protobuf: { type: request_message.class, data: request_message }
  )
  rabbit_connection.publish(RailwayIpc::Rabbitmq::Payload.encode(request_message), routing_key: '')
end