class Fluent::Plugin::MysqlFetchAndEmitOutput

Public Instance Methods

client(database) click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 183
def client(database)
  Mysql2::Client.new(
    host: @host,
    port: @port,
    username: @username,
    password: @password,
    database: database,
    sslkey: @sslkey,
    sslcert: @sslcert,
    sslca: @sslca,
    sslcapath: @sslcapath,
    sslcipher: @sslcipher,
    sslverify: @sslverify
  )
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 84
def configure(conf)
  super
  @accessor_for_record_key = record_accessor_create(@record_key)
  @accessors_for_record_matching = @record_matching_keys.map { |cf| record_accessor_create(cf.fluentd_record_key) }
  @column_names_for_record_matching = @record_matching_keys.map { |cf| cf.mysql_record_key }
end
expand_placeholders(metadata) click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 199
def expand_placeholders(metadata)
  database = extract_placeholders(@database, metadata).gsub('.', '_')
  table = extract_placeholders(@table, metadata).gsub('.', '_')
  return database, table
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 91
def format(tag, time, record)
  value = @accessor_for_record_key.call(record)
  unless @accessors_for_record_matching.empty?
    keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(record) }
    if keys_for_origin_record.any?(&:nil?)
      @log.warn("Incoming record is omitted, because values for record matching include nil", record: record)
      return nil
    end
  end

  case value
  when String, Integer, Float
    [tag, time, record].to_msgpack
  else
    @log.warn("Incoming record is omitted, Supported value type of `record_key` is String, Integer, Float", record: record)
    nil
  end
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 110
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 114
def multi_workers_ready?
  true
end
where_column_name() click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 118
def where_column_name
  @where_column || @record_key
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 122
def write(chunk)
  database, table = expand_placeholders(chunk.metadata)
  @handler = client(database)
  where_values = []
  origin_records = {}
  chunk.msgpack_each do |tag, time, data|
    value = @accessor_for_record_key.call(data)
    case value
    when String
      where_values << "'" + Mysql2::Client.escape(value) + "'" if value
    when Integer, Float
      where_values << value.to_s if value
    else
      next
    end

    unless @accessors_for_record_matching.empty?
      keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(data) }
      parent = keys_for_origin_record[0..-2].inject(origin_records) do |h, v|
        h[v] ||= {}
      end
      parent[keys_for_origin_record.last] = data
    end
  end
  where_condition = "WHERE #{where_column_name} IN (#{where_values.join(',')})"

  if @additional_condition
    condition_sql = extract_placeholders(@additional_condition, chunk.metadata)
    where_condition += " AND #{condition_sql}"
  end

  sql = "SELECT #{@column_names.join(", ")} FROM #{table} #{where_condition}"
  @log.debug(sql)
  results = @handler.query(sql, cast_booleans: @cast_booleans, stream: @stream)

  time = Fluent::EventTime.now
  results.each do |row|
    unless @column_names_for_record_matching.empty?
      record = @column_names_for_record_matching.inject(origin_records) do |h, k|
        if h
          h[row[k]]
        end
      end

      if record
        @remove_keys.each do |k|
          record.delete(k)
        end

        if @merge_priority == :mysql
          row = record.merge!(row)
        else
          row = row.merge!(record)
        end
      end
    end
    @log.debug("emit", tag: @tag, record: row)
    router.emit(@tag, time, row)
  end
end