class Output::KafkaPlugin

Public Class Methods

new(options) click to toggle source

Create a kafka output plugin instance

@param [Hash] options @option options [Array] :brokers (['localhost:9092']) @option options [String] :client_id ('fileminer') @option options [String] :topic ('fileminer') @option options [Symbol] :mode (:sync) :sync or :async @option options [Symbol] :auto_delivery (:disabled) :disabled or :enabled @option options [Hash] :delivery_conf

# File lib/fileminer/output/kafka.rb, line 19
def initialize(options)
  brokers = options[:brokers] || ['localhost:9092']
  client_id = options[:client_id] || 'fileminer'
  @topic = options[:topic] || 'fileminer'
  @kafka = Kafka.new(brokers, client_id: client_id)
  case @mode = options[:mode]
  when :sync
    @producer = @kafka.producer
  when :async
    case @auto_delivery = options[:auto_delivery]
    when :disabled
      @producer = @kafka.async_producer
    when :enabled
      @producer = @kafka.async_producer options[:delivery_conf]
    else
      raise "invalid value #@auto_delivery of auto_delivery"
    end
  else
    raise "unsupported mode #@mode"
  end
end

Public Instance Methods

close() click to toggle source

close the kafka producer

# File lib/fileminer/output/kafka.rb, line 55
def close
  @producer.shutdown
  @kafka.close
end
send_all(lines, &listener) click to toggle source

Send all lines to kafka using producer API

@param [Array] lines @yield a listener to be called after all lines just be delivered

# File lib/fileminer/output/kafka.rb, line 45
def send_all(lines, &listener)
  lines.each do |line|
    message = line.to_json
    @producer.produce(message, topic: @topic)
  end
  @producer.deliver_messages unless @mode == :async and @auto_delivery == :enabled
  listener.call
end