class GlassOctopus::RubyKafkaAdapter
Connection adapter that uses the {github.com/zendesk/ruby-kafka ruby-kafka} gem to talk to Kafka 0.9+.
@example
adapter = GlassOctopus::RubyKafkaAdapter.new do |kafka_config| kafka_config.broker_list = %w[localhost:9092] kafka_config.topic = "mytopic" kafka_config.group_id = "mygroup" end adapter.connect.fetch_message do |message| p message end
Attributes
options[R]
A hash that hold the configuration set up in the initializer block. @return [Hash]
Public Class Methods
new(logger=nil) { |config| ... }
click to toggle source
@yield configure ruby-kafka in the yielded block.
The following configuration values are required: * +broker_list+: list of Kafka broker addresses * +topic+: name of the topic to subscribe to * +group_id+: name of the consumer group * +client_id+: the identifier for this application Optional configuration: * +client+: a hash passed on to Kafka.new * +consumer+: a hash passed on to kafka.consumer * +subscription+: a hash passed on to consumer.subscribe Check the ruby-kafka documentation for driver specific configurations.
@raise [OptionsInvalid]
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 44 def initialize(logger=nil) config = OpenStruct.new yield config @options = config.to_h @options[:group_id] ||= @options[:group] @options[:logger] ||= logger validate_options @kafka = nil @consumer = nil end
Public Instance Methods
close()
click to toggle source
@api private
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 88 def close @consumer.stop @kafka.close end
connect()
click to toggle source
Connect to Kafka and join the consumer group. @return [void]
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 59 def connect @kafka = connect_to_kafka @consumer = create_consumer(@kafka) @consumer.subscribe( options.fetch(:topic), **options.fetch(:subscription, {}) ) self end
connect_to_kafka()
click to toggle source
@api private
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 94 def connect_to_kafka client_options = options.fetch(:client, {}).merge(logger: @options[:logger]) client_options.merge!(client_id: @options[:client_id]) if @options.key?(:client_id) Kafka.new(seed_brokers: options.fetch(:broker_list), **client_options) end
create_consumer(kafka)
click to toggle source
@api private
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 102 def create_consumer(kafka) kafka.consumer( group_id: options.fetch(:group_id), **options.fetch(:consumer, {}) ) end
fetch_message() { |message| ... }
click to toggle source
Fetch messages from kafka in a loop. @yield messages read from Kafka @yieldparam message [Message] a Kafka message
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 73 def fetch_message @consumer.each_message do |fetched_message| message = Message.new( fetched_message.topic, fetched_message.partition, fetched_message.offset, fetched_message.key, fetched_message.value ) yield message end end
validate_options()
click to toggle source
@api private
# File lib/glass_octopus/connection/ruby_kafka_adapter.rb, line 110 def validate_options errors = [] [:broker_list, :group_id, :topic].each do |key| errors << "Missing key: #{key}" unless options[key] end raise OptionsInvalid.new(errors) if errors.any? end