class Avromatic::Model::MessageDecoder
This class is used to decode messages encoded using Avro to their corresponding models.
Constants
- MAGIC_BYTE
Attributes
model_map[R]
schema_names_by_id[R]
schema_registry[R]
Public Class Methods
model_key(model)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 34 def self.model_key(model) [model.key_avro_schema && model.key_avro_schema.fullname, model.value_avro_schema.fullname] end
new(*models, schema_registry: nil, registry_url: nil)
click to toggle source
@param *models [generated models] Models to register for decoding. @param schema_registry
[AvroTurf::ConfluentSchemaRegistry] Optional schema
registry client.
@param registry_url [String] Optional URL for schema registry server.
# File lib/avromatic/model/message_decoder.rb, line 45 def initialize(*models, schema_registry: nil, registry_url: nil) @model_map = build_model_map(models) @schema_names_by_id = {} @schema_registry = schema_registry || Avromatic.schema_registry || (registry_url && AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: Avromatic.logger)) || Avromatic.build_schema_registry end
Public Instance Methods
decode(*args)
click to toggle source
@return [Avromatic model]
# File lib/avromatic/model/message_decoder.rb, line 55 def decode(*args) model, message_key, message_value = extract_decode_args(*args) model.avro_message_decode(message_key, message_value) end
decode_hash(*args)
click to toggle source
@return [Hash]
# File lib/avromatic/model/message_decoder.rb, line 61 def decode_hash(*args) model, message_key, message_value = extract_decode_args(*args) model.avro_message_attributes(message_key, message_value) end
model(*args)
click to toggle source
@return [Avromatic model class]
# File lib/avromatic/model/message_decoder.rb, line 67 def model(*args) extract_decode_args(*args).first end
Private Instance Methods
build_model_map(models)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 118 def build_model_map(models) models.each_with_object(Hash.new) do |model, map| key = model_key(model) raise DuplicateKeyError.new(map[key], model) if map.key?(key) && !model.equal?(map[key]) map[key] = model end end
extract_decode_args(*args)
click to toggle source
If two arguments are specified then the first is interpreted as the message key and the second is the message value. If there is only one arg then it is used as the message value.
# File lib/avromatic/model/message_decoder.rb, line 88 def extract_decode_args(*args) message_key, message_value = extract_key_and_value(*args) model_key = model_key_for_message(message_key, message_value) raise UnexpectedKeyError.new(*model_key) unless model_map.key?(model_key) [model_map[model_key], message_key, message_value] end
extract_key_and_value(*args)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 75 def extract_key_and_value(*args) args.size > 1 ? args.take(2) : [nil, args.first] end
extract_schema_id(data)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 109 def extract_schema_id(data) data[1..4].unpack1('N') end
lookup_schema_name(schema_id)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 102 def lookup_schema_name(schema_id) schema_names_by_id.fetch(schema_id) do schema = Avro::Schema.parse(schema_registry.fetch(schema_id)) schema_names_by_id[schema_id] = schema.fullname end end
model_key_for_message(message_key, message_value)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 79 def model_key_for_message(message_key, message_value) value_schema_name = schema_name_for_data(message_value) key_schema_name = schema_name_for_data(message_key) if message_key [key_schema_name, value_schema_name] end
schema_name_for_data(data)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 96 def schema_name_for_data(data) validate_magic_byte!(data) schema_id = extract_schema_id(data) lookup_schema_name(schema_id) end
validate_magic_byte!(data)
click to toggle source
# File lib/avromatic/model/message_decoder.rb, line 113 def validate_magic_byte!(data) first_byte = data[0] raise MagicByteError.new(first_byte) if first_byte != MAGIC_BYTE end