class Wukong::Load::KafkaLoader
Loads data into Kafka.
Uses the `kafka-rb` gem to create a Kafka::Producer to write to Kafka.
Allows loading records into a given topic on a given partition. Records can have fields `_topic` and `_partition` which override the given topic and partition on a per-record basis.
The names of these fields within each record (`_topic` and `_partition`) can be customized.
Attributes
producer[RW]
The Kafka producer used to send messages to Kafka.
Public Instance Methods
load(record)
click to toggle source
Load
a single record into Kafka.
@param [Hash] record
# File lib/wukong-load/loaders/kafka.rb, line 65 def load record begin topic = topic_for(record) partition = partition_for(record) bytes = producer.send(topic, messages_for(record), :partition => partition) log.info("Wrote #{bytes} bytes to #{topic}/#{partition}") rescue => e handle_error(record, e) end end
setup()
click to toggle source
Creates the producer.
# File lib/wukong-load/loaders/kafka.rb, line 48 def setup begin require 'kafka' rescue LoadError => e raise Error.new("Please ensure that the 'kafka-rb' gem is installed and available (in your Gemfile)") end log.debug("Connecting to Kafka broker at #{host}:#{port}...") begin self.producer = Kafka::MultiProducer.new(:host => host, :port => port) rescue => e raise Error.new(e.message) end end