class Deimos::KafkaMessage
Store Kafka messages into the database.
Public Class Methods
decoded(messages=[])
click to toggle source
Decoded payloads for a list of messages. @param messages [Array<Deimos::KafkaMessage>] @return [Array<Hash>]
# File lib/deimos/kafka_message.rb, line 38 def self.decoded(messages=[]) return [] if messages.empty? decoder = self.decoder(messages.first.topic)&.new messages.map do |m| { key: m.key.present? ? decoder&.decode_key(m.key) || m.key : nil, payload: decoder&.decoder&.decode(m.message) || m.message } end end
decoder(topic)
click to toggle source
Get a decoder to decode a set of messages on the given topic. @param topic [String] @return [Deimos::Consumer]
# File lib/deimos/kafka_message.rb, line 26 def self.decoder(topic) producer = Deimos::Producer.descendants.find { |c| c.topic == topic } return nil unless producer consumer = Class.new(Deimos::Consumer) consumer.config.merge!(producer.config) consumer end
Public Instance Methods
decoded_message()
click to toggle source
Decoded payload for this message. @return [Hash]
# File lib/deimos/kafka_message.rb, line 19 def decoded_message self.class.decoded([self]).first end
message=(mess)
click to toggle source
Ensure it gets turned into a string, e.g. for testing purposes. It should already be a string. @param mess [Object]
# File lib/deimos/kafka_message.rb, line 13 def message=(mess) write_attribute(:message, mess ? mess.to_s : nil) end
phobos_message()
click to toggle source
@return [Hash]
# File lib/deimos/kafka_message.rb, line 51 def phobos_message { payload: self.message, partition_key: self.partition_key, key: self.key, topic: self.topic } end