class Deimos::KafkaMessage

Store Kafka messages into the database.

Public Class Methods

decoded(messages=[]) click to toggle source

Decoded payloads for a list of messages. @param messages [Array<Deimos::KafkaMessage>] @return [Array<Hash>]

# File lib/deimos/kafka_message.rb, line 38
def self.decoded(messages=[])
  return [] if messages.empty?

  decoder = self.decoder(messages.first.topic)&.new
  messages.map do |m|
    {
      key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil,
      payload: decoder&.decoder&.decode(m.message) || m.message
    }
  end
end
decoder(topic) click to toggle source

Get a decoder to decode a set of messages on the given topic. @param topic [String] @return [Deimos::Consumer]

# File lib/deimos/kafka_message.rb, line 26
def self.decoder(topic)
  producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
  return nil unless producer

  consumer = Class.new(Deimos::Consumer)
  consumer.config.merge!(producer.config)
  consumer
end

Public Instance Methods

decoded_message() click to toggle source

Decoded payload for this message. @return [Hash]

# File lib/deimos/kafka_message.rb, line 19
def decoded_message
  self.class.decoded([self]).first
end
message=(mess) click to toggle source

Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string. @param mess [Object]

# File lib/deimos/kafka_message.rb, line 13
def message=(mess)
  write_attribute(:message, mess ? mess.to_s : nil)
end
phobos_message() click to toggle source

@return [Hash]

# File lib/deimos/kafka_message.rb, line 51
def phobos_message
  {
    payload: self.message,
    partition_key: self.partition_key,
    key: self.key,
    topic: self.topic
  }
end