class Kafka::RoundRobinAssignmentStrategy

A round robin assignment strategy inpired on the original java client round robin assignor. It’s capable of handling identical as well as different topic subscriptions accross the same consumer group.

Public Instance Methods

call(cluster:, members:, partitions:) click to toggle source

Assign the topic partitions to the group members.

@param cluster [Kafka::Cluster] @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash

mapping member ids to metadata

@param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of

partitions the consumer group processes

@return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash

mapping member ids to partitions.
# File lib/kafka/round_robin_assignment_strategy.rb, line 21
def call(cluster:, members:, partitions:)
  partitions_per_member = Hash.new {|h, k| h[k] = [] }
  relevant_partitions = valid_sorted_partitions(members, partitions)
  members_ids = members.keys
  iterator = (0...members.size).cycle
  idx = iterator.next

  relevant_partitions.each do |partition|
    topic = partition.topic

    while !members[members_ids[idx]].topics.include?(topic)
      idx = iterator.next
    end

    partitions_per_member[members_ids[idx]] << partition
    idx = iterator.next
  end

  partitions_per_member
end
protocol_name() click to toggle source
# File lib/kafka/round_robin_assignment_strategy.rb, line 8
def protocol_name
  "roundrobin"
end
valid_sorted_partitions(members, partitions) click to toggle source
# File lib/kafka/round_robin_assignment_strategy.rb, line 42
def valid_sorted_partitions(members, partitions)
  subscribed_topics = members.map do |id, metadata|
    metadata && metadata.topics
  end.flatten.compact

  partitions
    .select { |partition| subscribed_topics.include?(partition.topic) }
    .sort_by { |partition| partition.topic }
end