class Kafka::ConsumerGroup::Assignor

A consumer group partition assignor

Constants

Partition

Public Class Methods

new(cluster:, strategy:) click to toggle source

@param cluster [Kafka::Cluster] @param strategy [Object] an object that implements protocol_type,

#user_data, and #assign.
# File lib/kafka/consumer_group/assignor.rb, line 15
def initialize(cluster:, strategy:)
  @cluster = cluster
  @strategy = strategy
end

Public Instance Methods

assign(members:, topics:) click to toggle source

Assign the topic partitions to the group members.

@param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash

mapping member ids to metadata.

@param topics [Array<String>] topics @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member

ids to assignments.
# File lib/kafka/consumer_group/assignor.rb, line 35
def assign(members:, topics:)
  topic_partitions = topics.flat_map do |topic|
    begin
      partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
  end

  group_assignment = {}

  members.each_key do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end
  @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
    Array(partitions).each do |partition|
      group_assignment[member_id].assign(partition.topic, [partition.partition_id])
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end
protocol_name() click to toggle source
# File lib/kafka/consumer_group/assignor.rb, line 20
def protocol_name
  @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
end
user_data() click to toggle source
# File lib/kafka/consumer_group/assignor.rb, line 24
def user_data
  @strategy.user_data if @strategy.respond_to?(:user_data)
end