class Fluent::Plugin::AvroFormatter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/formatter_avro.rb, line 17
def configure(conf)
  super
  if not ((@schema_json.nil? ? 0 : 1) + (@schema_file.nil? ? 0 : 1) + (@schema_url.nil? ? 0 : 1) == 1) then
    raise Fluent::ConfigError, 'schema_json, schema_file, or schema_url is required, but not multiple!'
  end
  if (@schema_json.nil? && !@schema_file.nil?) then
    @schema_json = File.read(@schema_file)
  end
  if (@schema_json.nil? && !@schema_url.nil?) then
    @schema_json = fetch_schema(@schema_url,@schema_url_key)
  end
  @schema = Avro::Schema.parse(@schema_json)
  @writer = Avro::IO::DatumWriter.new(@schema)
end
fetch_schema(url,schema_key) click to toggle source
# File lib/fluent/plugin/formatter_avro.rb, line 64
def fetch_schema(url,schema_key)
  response_body = fetch_url(url)
  if schema_key.nil? then
    return response_body
  else
    return JSON.parse(response_body)[schema_key]
  end
end
fetch_url(url) click to toggle source
# File lib/fluent/plugin/formatter_avro.rb, line 58
def fetch_url(url)
  uri = URI.parse(url)
  response = Net::HTTP.get_response uri
  response.body
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/formatter_avro.rb, line 32
def format(tag, time, record)
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  begin
    @writer.write(record, encoder)
  rescue => e
    raise e if schema_url.nil?
    schema_changed = false
    begin
      new_schema_json = fetch_schema(@schema_url,@schema_url_key)
      new_schema = Avro::Schema.parse(new_schema_json)
      schema_changed = (new_schema_json == @schema_json)
      @schema_json = new_schema_json
      @schema = new_schema
    rescue
    end
    if schema_changed then
      @writer = Avro::IO::DatumWriter.new(@schema)
      @writer.write(record, encoder)
    else
      raise e
    end
  end
  buffer.string
end