class RailwayIpc::Client
Attributes
message[R]
rabbit_connection[R]
request_message[RW]
response_message[RW]
Public Class Methods
handle_response(response_type)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 20 def self.handle_response(response_type) RPC::ClientResponseHandlers.instance.register(response_type) end
new(request_message, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 24 def initialize(request_message, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) @rabbit_connection = rabbit_adapter.new(exchange_name: self.class.exchange_name, options: opts) @request_message = request_message end
request(message)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 16 def self.request(message) new(message).request end
Public Instance Methods
await_response(timeout)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 68 def await_response(timeout) rabbit_connection.check_for_message(timeout: timeout) do |_, _, payload| self.response_message = process_payload(payload) end rescue RailwayIpc::Rabbitmq::Adapter::TimeoutError # rubocop:disable Style/RedundantSelf error = self.class.rpc_error_adapter_class.error_message(TimeoutError.new, self.request_message) # rubocop:enable Style/RedundantSelf self.response_message = RailwayIpc::Response.new(error, success: false) rescue StandardError self.response_message = RailwayIpc::Response.new(message, success: false) ensure rabbit_connection.disconnect end
process_payload(response)
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/railway_ipc/rpc/client/client.rb, line 42 def process_payload(response) decoded_payload = decode_payload(response) case decoded_payload.type when *registered_handlers @message = get_message_class(decoded_payload).decode(decoded_payload.message) RailwayIpc.logger.info( 'Handling response', feature: 'railway_ipc_consumer', exchange: self.class.exchange_name, protobuf: { type: message.class, data: message } ) RailwayIpc::Response.new(message, success: true) else @message = LearnIpc::ErrorMessage.decode(decoded_payload.message) raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}") end end
registered_handlers()
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 37 def registered_handlers RailwayIpc::RPC::ClientResponseHandlers.instance.registered end
request(timeout=10)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 29 def request(timeout=10) setup_rabbit_connection attach_reply_queue_to_message publish_message await_response(timeout) response_message end
setup_rabbit_connection()
click to toggle source
rubocop:enable Metrics/AbcSize
# File lib/railway_ipc/rpc/client/client.rb, line 61 def setup_rabbit_connection rabbit_connection .connect .create_exchange .create_queue(auto_delete: true, exclusive: true) end
Private Instance Methods
attach_reply_queue_to_message()
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 103 def attach_reply_queue_to_message request_message.reply_to = rabbit_connection.queue.name end
decode_for_error(exception, payload)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 117 def decode_for_error(exception, payload) return exception.message unless payload # rubocop:disable Style/RedundantSelf self.class.rpc_error_adapter_class.error_message(payload, self.request_message) # rubocop:enable Style/RedundantSelf end
decode_payload(response)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 99 def decode_payload(response) RailwayIpc::Rabbitmq::Payload.decode(response) end
get_message_class(decoded_payload)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 95 def get_message_class(decoded_payload) RailwayIpc::RPC::ClientResponseHandlers.instance.get(decoded_payload.type) end
log_exception(exception, payload)
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 85 def log_exception(exception, payload) RailwayIpc.logger.error( exception.message, feature: 'railway_ipc_consumer', exchange: self.class.exchange_name, error: exception.class, payload: decode_for_error(exception, payload) ) end
publish_message()
click to toggle source
# File lib/railway_ipc/rpc/client/client.rb, line 107 def publish_message RailwayIpc.logger.info( 'Sending request', feature: 'railway_ipc_publisher', exchange: self.class.exchange_name, protobuf: { type: request_message.class, data: request_message } ) rabbit_connection.publish(RailwayIpc::Rabbitmq::Payload.encode(request_message), routing_key: '') end