class Kafka::Protocol::MetadataResponse

A response to a {MetadataRequest}.

The response contains information on the brokers, topics, and partitions in the cluster.

## API Specification

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32

  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16

  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

Attributes

brokers[R]

@return [Array<Kafka::BrokerInfo>] the list of brokers in the cluster.

controller_id[R]

@return [Integer] The broker id of the controller broker.

topics[R]

@return [Array<TopicMetadata>] the list of topics in the cluster.

Public Class Methods

decode(decoder) click to toggle source

Decodes a MetadataResponse from a {Decoder} containing response data.

@param decoder [Decoder] @return [MetadataResponse] the metadata response.

# File lib/kafka/protocol/metadata_response.rb, line 143
def self.decode(decoder)
  brokers = decoder.array do
    node_id = decoder.int32
    host = decoder.string
    port = decoder.int32
    _rack = decoder.string

    BrokerInfo.new(
      node_id: node_id,
      host: host,
      port: port
    )
  end

  controller_id = decoder.int32

  topics = decoder.array do
    topic_error_code = decoder.int16
    topic_name = decoder.string
    _is_internal = decoder.boolean

    partitions = decoder.array do
      PartitionMetadata.new(
        partition_error_code: decoder.int16,
        partition_id: decoder.int32,
        leader: decoder.int32,
        replicas: decoder.array { decoder.int32 },
        isr: decoder.array { decoder.int32 },
      )
    end

    TopicMetadata.new(
      topic_error_code: topic_error_code,
      topic_name: topic_name,
      partitions: partitions,
    )
  end

  new(brokers: brokers, controller_id: controller_id, topics: topics)
end
new(brokers:, controller_id:, topics:) click to toggle source
# File lib/kafka/protocol/metadata_response.rb, line 75
def initialize(brokers:, controller_id:, topics:)
  @brokers = brokers
  @controller_id = controller_id
  @topics = topics
end

Public Instance Methods

controller_broker() click to toggle source
# File lib/kafka/protocol/metadata_response.rb, line 123
def controller_broker
  find_broker(controller_id)
end
find_broker(node_id) click to toggle source

Finds the broker info for the given node id.

@param node_id [Integer] the node id of the broker. @return [Kafka::BrokerInfo] information about the broker.

# File lib/kafka/protocol/metadata_response.rb, line 115
def find_broker(node_id)
  broker = @brokers.find {|b| b.node_id == node_id }

  raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil?

  broker
end
find_leader_id(topic, partition) click to toggle source

Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.

@param topic [String] the name of the topic. @param partition [Integer] the partition number. @return [Integer] the node id of the leader.

# File lib/kafka/protocol/metadata_response.rb, line 87
def find_leader_id(topic, partition)
  topic_info = @topics.find {|t| t.topic_name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "no topic #{topic}"
  end

  Protocol.handle_error(topic_info.topic_error_code)

  partition_info = topic_info.partitions.find {|p| p.partition_id == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}"
  end

  begin
    Protocol.handle_error(partition_info.partition_error_code)
  rescue ReplicaNotAvailable
    # This error can be safely ignored per the protocol specification.
  end

  partition_info.leader
end
partitions_for(topic_name) click to toggle source
# File lib/kafka/protocol/metadata_response.rb, line 127
def partitions_for(topic_name)
  topic = @topics.find {|t| t.topic_name == topic_name }

  if topic.nil?
    raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
  end

  Protocol.handle_error(topic.topic_error_code)

  topic.partitions
end