class RailwayIpc::ProcessIncomingMessage

Attributes

consumer[R]
incoming_message[R]
logger[R]

Public Class Methods

call(consumer, incoming_message) click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 64
def self.call(consumer, incoming_message)
  new(consumer, incoming_message).call
end
new(consumer, incoming_message, logger: RailwayIpc.logger) click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 68
def initialize(consumer, incoming_message, logger: RailwayIpc.logger)
  @consumer = consumer
  @incoming_message = incoming_message
  @logger = logger
end

Public Instance Methods

call() click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 74
def call
  raise_message_invalid_error unless incoming_message.valid?
  message = find_or_create_consumed_message
  return if message.processed?

  message.update_with_lock(classify_message)
end

Private Instance Methods

classify_message() click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 101
def classify_message
  if incoming_message.decoded.is_a?(RailwayIpc::Messages::Unknown)
    UnknownMessageJob.new(incoming_message, logger)
  elsif (handler = consumer.get_handler(incoming_message.type))
    NormalMessageJob.new(incoming_message, handler)
  else
    IgnoredMessageJob.new(incoming_message, logger)
  end
end
find_or_create_consumed_message() click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 96
def find_or_create_consumed_message
  RailwayIpc::ConsumedMessage.find_by(uuid: incoming_message.uuid, queue: consumer.queue_name) ||
    RailwayIpc::ConsumedMessage.create_processing(consumer, incoming_message)
end
raise_message_invalid_error() click to toggle source
# File lib/railway_ipc/consumer/process_incoming_message.rb, line 84
def raise_message_invalid_error
  error = "Message is invalid: #{incoming_message.stringify_errors}."
  logger.error(
    error,
    feature: 'railway_ipc_consumer',
    exchange: consumer.exchange_name,
    queue: consumer.queue_name,
    protobuf: { type: incoming_message.class, data: incoming_message.decoded }
  )
  raise RailwayIpc::IncomingMessage::InvalidMessage.new(error)
end