class Avromatic::Model::MessageDecoder
This class is used to decode messages encoded using Avro to their corresponding models.
Constants
- MAGIC_BYTE
Attributes
Public Class Methods
Source
# File lib/avromatic/model/message_decoder.rb, line 34 def self.model_key(model) [model.key_avro_schema && model.key_avro_schema.fullname, model.value_avro_schema.fullname] end
Source
# File lib/avromatic/model/message_decoder.rb, line 45 def initialize(*models, schema_registry: nil, registry_url: nil) @model_map = build_model_map(models) @schema_names_by_id = {} @schema_registry = schema_registry || Avromatic.schema_registry || (registry_url && AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: Avromatic.logger)) || Avromatic.build_schema_registry end
@param *models [generated models] Models to register for decoding. @param schema_registry
[AvroTurf::ConfluentSchemaRegistry] Optional schema
registry client.
@param registry_url [String] Optional URL for schema registry server.
Public Instance Methods
Source
# File lib/avromatic/model/message_decoder.rb, line 55 def decode(*args) model, message_key, message_value = extract_decode_args(*args) model.avro_message_decode(message_key, message_value) end
@return [Avromatic model]
Source
# File lib/avromatic/model/message_decoder.rb, line 61 def decode_hash(*args) model, message_key, message_value = extract_decode_args(*args) model.avro_message_attributes(message_key, message_value) end
@return [Hash]
Source
# File lib/avromatic/model/message_decoder.rb, line 67 def model(*args) extract_decode_args(*args).first end
@return [Avromatic model class]
Private Instance Methods
Source
# File lib/avromatic/model/message_decoder.rb, line 118 def build_model_map(models) models.each_with_object(Hash.new) do |model, map| key = model_key(model) raise DuplicateKeyError.new(map[key], model) if map.key?(key) && !model.equal?(map[key]) map[key] = model end end
Source
# File lib/avromatic/model/message_decoder.rb, line 88 def extract_decode_args(*args) message_key, message_value = extract_key_and_value(*args) model_key = model_key_for_message(message_key, message_value) raise UnexpectedKeyError.new(*model_key) unless model_map.key?(model_key) [model_map[model_key], message_key, message_value] end
If two arguments are specified then the first is interpreted as the message key and the second is the message value. If there is only one arg then it is used as the message value.
Source
# File lib/avromatic/model/message_decoder.rb, line 75 def extract_key_and_value(*args) args.size > 1 ? args.take(2) : [nil, args.first] end
Source
# File lib/avromatic/model/message_decoder.rb, line 109 def extract_schema_id(data) data[1..4].unpack1('N') end
Source
# File lib/avromatic/model/message_decoder.rb, line 102 def lookup_schema_name(schema_id) schema_names_by_id.fetch(schema_id) do schema = Avro::Schema.parse(schema_registry.fetch(schema_id)) schema_names_by_id[schema_id] = schema.fullname end end
Source
# File lib/avromatic/model/message_decoder.rb, line 79 def model_key_for_message(message_key, message_value) value_schema_name = schema_name_for_data(message_value) key_schema_name = schema_name_for_data(message_key) if message_key [key_schema_name, value_schema_name] end
Source
# File lib/avromatic/model/message_decoder.rb, line 96 def schema_name_for_data(data) validate_magic_byte!(data) schema_id = extract_schema_id(data) lookup_schema_name(schema_id) end
Source
# File lib/avromatic/model/message_decoder.rb, line 113 def validate_magic_byte!(data) first_byte = data[0] raise MagicByteError.new(first_byte) if first_byte != MAGIC_BYTE end