class Kafka::Protocol::Record
Attributes
attributes[R]
bytesize[R]
create_time[RW]
headers[R]
is_control_record[RW]
key[R]
offset[RW]
offset_delta[RW]
timestamp_delta[RW]
value[R]
Public Class Methods
decode(decoder)
click to toggle source
# File lib/kafka/protocol/record.rb, line 59 def self.decode(decoder) record_decoder = Decoder.from_string(decoder.varint_bytes) attributes = record_decoder.int8 timestamp_delta = record_decoder.varint offset_delta = record_decoder.varint key = record_decoder.varint_string value = record_decoder.varint_bytes headers = {} record_decoder.varint_array do header_key = record_decoder.varint_string header_value = record_decoder.varint_bytes headers[header_key] = header_value end new( key: key, value: value, headers: headers, attributes: attributes, offset_delta: offset_delta, timestamp_delta: timestamp_delta ) end
new( key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false )
click to toggle source
# File lib/kafka/protocol/record.rb, line 7 def initialize( key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false ) @key = key @value = value @headers = headers @attributes = attributes @offset_delta = offset_delta @offset = offset @timestamp_delta = timestamp_delta @create_time = create_time @is_control_record = is_control_record @bytesize = @key.to_s.bytesize + @value.to_s.bytesize end
Public Instance Methods
==(other)
click to toggle source
# File lib/kafka/protocol/record.rb, line 52 def ==(other) offset_delta == other.offset_delta && timestamp_delta == other.timestamp_delta && offset == other.offset && is_control_record == other.is_control_record end
encode(encoder)
click to toggle source
# File lib/kafka/protocol/record.rb, line 32 def encode(encoder) record_buffer = StringIO.new record_encoder = Encoder.new(record_buffer) record_encoder.write_int8(@attributes) record_encoder.write_varint(@timestamp_delta) record_encoder.write_varint(@offset_delta) record_encoder.write_varint_string(@key) record_encoder.write_varint_bytes(@value) record_encoder.write_varint_array(@headers.to_a) do |header_key, header_value| record_encoder.write_varint_string(header_key.to_s) record_encoder.write_varint_bytes(header_value.to_s) end encoder.write_varint_bytes(record_buffer.string) end