class RailwayIpc::ProcessIncomingMessage
Attributes
consumer[R]
incoming_message[R]
logger[R]
Public Class Methods
call(consumer, incoming_message)
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 64 def self.call(consumer, incoming_message) new(consumer, incoming_message).call end
new(consumer, incoming_message, logger: RailwayIpc.logger)
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 68 def initialize(consumer, incoming_message, logger: RailwayIpc.logger) @consumer = consumer @incoming_message = incoming_message @logger = logger end
Public Instance Methods
call()
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 74 def call raise_message_invalid_error unless incoming_message.valid? message = find_or_create_consumed_message return if message.processed? message.update_with_lock(classify_message) end
Private Instance Methods
classify_message()
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 101 def classify_message if incoming_message.decoded.is_a?(RailwayIpc::Messages::Unknown) UnknownMessageJob.new(incoming_message, logger) elsif (handler = consumer.get_handler(incoming_message.type)) NormalMessageJob.new(incoming_message, handler) else IgnoredMessageJob.new(incoming_message, logger) end end
find_or_create_consumed_message()
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 96 def find_or_create_consumed_message RailwayIpc::ConsumedMessage.find_by(uuid: incoming_message.uuid, queue: consumer.queue_name) || RailwayIpc::ConsumedMessage.create_processing(consumer, incoming_message) end
raise_message_invalid_error()
click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 84 def raise_message_invalid_error error = "Message is invalid: #{incoming_message.stringify_errors}." logger.error( error, feature: 'railway_ipc_consumer', exchange: consumer.exchange_name, queue: consumer.queue_name, protobuf: { type: incoming_message.class, data: incoming_message.decoded } ) raise RailwayIpc::IncomingMessage::InvalidMessage.new(error) end