class Fluent::VerticaJsonOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_verticajson.rb, line 14
def initialize
  super

  require 'vertica'
  require 'json'
end

Public Instance Methods

format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_verticajson.rb, line 21
def format(tag, time, record)
  record_altered = Hash[
      record.map{ |k, v|
        if v.is_a?(Hash) or v.is_a?(Array)
          [k, "#{v.to_json}"]
        else
          [k, v]
        end
      }
  ]

  $log.info "New data received to the buffer for the table #{@schema}.#{@table}"
  record_altered.to_json
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_verticajson.rb, line 36
    def write(chunk)
      perm_table = "\"#{@schema}\".\"#{@table}\""

      chunk.open do |file|

        file_contents = file.read

        vertica.copy(<<-SQL) { |handle| handle.write(file_contents) }
          COPY #{perm_table}
          FROM STDIN
          PARSER fjsonparser(flatten_maps=false)
          ENFORCELENGTH DIRECT
          REJECTED DATA AS TABLE #{@table}_rejected
        SQL

        vertica.close
        @vertica = nil
      end
      $log.info "Data successfully loaded to vertica table #{perm_table}."
    end

Private Instance Methods

vertica() click to toggle source
# File lib/fluent/plugin/out_verticajson.rb, line 59
def vertica
  @vertica ||= Vertica.connect({
    :host     => @host,
    :user     => @username,
    :password => @password,
    :ssl      => @ssl,
    :port     => @port,
    :database => @database
  })
end