class Avromatic::Model::MessageDecoder

This class is used to decode messages encoded using Avro to their corresponding models.

Constants

MAGIC_BYTE

Attributes

model_map[R]
schema_names_by_id[R]
schema_registry[R]

Public Class Methods

model_key(model) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 34
def self.model_key(model)
  [model.key_avro_schema && model.key_avro_schema.fullname,
   model.value_avro_schema.fullname]
end
new(*models, schema_registry: nil, registry_url: nil) click to toggle source

@param *models [generated models] Models to register for decoding. @param schema_registry [AvroTurf::ConfluentSchemaRegistry] Optional schema

registry client.

@param registry_url [String] Optional URL for schema registry server.

# File lib/avromatic/model/message_decoder.rb, line 45
def initialize(*models, schema_registry: nil, registry_url: nil)
  @model_map = build_model_map(models)
  @schema_names_by_id = {}
  @schema_registry = schema_registry ||
    Avromatic.schema_registry ||
    (registry_url && AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: Avromatic.logger)) ||
    Avromatic.build_schema_registry
end

Public Instance Methods

decode(*args) click to toggle source

@return [Avromatic model]

# File lib/avromatic/model/message_decoder.rb, line 55
def decode(*args)
  model, message_key, message_value = extract_decode_args(*args)
  model.avro_message_decode(message_key, message_value)
end
decode_hash(*args) click to toggle source

@return [Hash]

# File lib/avromatic/model/message_decoder.rb, line 61
def decode_hash(*args)
  model, message_key, message_value = extract_decode_args(*args)
  model.avro_message_attributes(message_key, message_value)
end
model(*args) click to toggle source

@return [Avromatic model class]

# File lib/avromatic/model/message_decoder.rb, line 67
def model(*args)
  extract_decode_args(*args).first
end

Private Instance Methods

build_model_map(models) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 118
def build_model_map(models)
  models.each_with_object(Hash.new) do |model, map|
    key = model_key(model)
    raise DuplicateKeyError.new(map[key], model) if map.key?(key) && !model.equal?(map[key])

    map[key] = model
  end
end
extract_decode_args(*args) click to toggle source

If two arguments are specified then the first is interpreted as the message key and the second is the message value. If there is only one arg then it is used as the message value.

# File lib/avromatic/model/message_decoder.rb, line 88
def extract_decode_args(*args)
  message_key, message_value = extract_key_and_value(*args)
  model_key = model_key_for_message(message_key, message_value)
  raise UnexpectedKeyError.new(*model_key) unless model_map.key?(model_key)

  [model_map[model_key], message_key, message_value]
end
extract_key_and_value(*args) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 75
def extract_key_and_value(*args)
  args.size > 1 ? args.take(2) : [nil, args.first]
end
extract_schema_id(data) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 109
def extract_schema_id(data)
  data[1..4].unpack1('N')
end
lookup_schema_name(schema_id) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 102
def lookup_schema_name(schema_id)
  schema_names_by_id.fetch(schema_id) do
    schema = Avro::Schema.parse(schema_registry.fetch(schema_id))
    schema_names_by_id[schema_id] = schema.fullname
  end
end
model_key_for_message(message_key, message_value) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 79
def model_key_for_message(message_key, message_value)
  value_schema_name = schema_name_for_data(message_value)
  key_schema_name = schema_name_for_data(message_key) if message_key
  [key_schema_name, value_schema_name]
end
schema_name_for_data(data) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 96
def schema_name_for_data(data)
  validate_magic_byte!(data)
  schema_id = extract_schema_id(data)
  lookup_schema_name(schema_id)
end
validate_magic_byte!(data) click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 113
def validate_magic_byte!(data)
  first_byte = data[0]
  raise MagicByteError.new(first_byte) if first_byte != MAGIC_BYTE
end