class GlassOctopus::RubyKafkaAdapter

Connection adapter that uses the {github.com/zendesk/ruby-kafka ruby-kafka} gem to talk to Kafka 0.9+.

@example

adapter = GlassOctopus::RubyKafkaAdapter.new do |kafka_config|
  kafka_config.broker_list = %w[localhost:9092]
  kafka_config.topic       = "mytopic"
  kafka_config.group_id    = "mygroup"
end

adapter.connect.fetch_message do |message|
  p message
end

Attributes

options[R]

A hash that hold the configuration set up in the initializer block. @return [Hash]

Public Class Methods

new(logger=nil) { |config| ... } click to toggle source

@yield configure ruby-kafka in the yielded block.

The following configuration values are required:

* +broker_list+: list of Kafka broker addresses
* +topic+: name of the topic to subscribe to
* +group_id+: name of the consumer group
* +client_id+: the identifier for this application

Optional configuration:

* +client+: a hash passed on to Kafka.new
* +consumer+: a hash passed on to kafka.consumer
* +subscription+: a hash passed on to consumer.subscribe

Check the ruby-kafka documentation for driver specific configurations.

@raise [OptionsInvalid]

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 44
def initialize(logger=nil)
  config = OpenStruct.new
  yield config

  @options = config.to_h
  @options[:group_id] ||= @options[:group]
  @options[:logger] ||= logger
  validate_options

  @kafka = nil
  @consumer = nil
end

Public Instance Methods

close() click to toggle source

@api private

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 88
def close
  @consumer.stop
  @kafka.close
end
connect() click to toggle source

Connect to Kafka and join the consumer group. @return [void]

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 59
def connect
  @kafka = connect_to_kafka
  @consumer = create_consumer(@kafka)
  @consumer.subscribe(
    options.fetch(:topic),
    **options.fetch(:subscription, {})
  )

  self
end
connect_to_kafka() click to toggle source

@api private

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 94
def connect_to_kafka
  client_options = options.fetch(:client, {}).merge(logger: @options[:logger])
  client_options.merge!(client_id: @options[:client_id]) if @options.key?(:client_id)

  Kafka.new(seed_brokers: options.fetch(:broker_list), **client_options)
end
create_consumer(kafka) click to toggle source

@api private

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 102
def create_consumer(kafka)
  kafka.consumer(
    group_id: options.fetch(:group_id),
    **options.fetch(:consumer, {})
  )
end
fetch_message() { |message| ... } click to toggle source

Fetch messages from kafka in a loop. @yield messages read from Kafka @yieldparam message [Message] a Kafka message

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 73
def fetch_message
  @consumer.each_message do |fetched_message|
    message = Message.new(
      fetched_message.topic,
      fetched_message.partition,
      fetched_message.offset,
      fetched_message.key,
      fetched_message.value
    )

    yield message
  end
end
validate_options() click to toggle source

@api private

# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 110
def validate_options
  errors = []
  [:broker_list, :group_id, :topic].each do |key|
    errors << "Missing key: #{key}" unless options[key]
  end

  raise OptionsInvalid.new(errors) if errors.any?
end