class Fluent::Plugin::MysqlFetchAndEmitOutput
Public Instance Methods
client(database)
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 183 def client(database) Mysql2::Client.new( host: @host, port: @port, username: @username, password: @password, database: database, sslkey: @sslkey, sslcert: @sslcert, sslca: @sslca, sslcapath: @sslcapath, sslcipher: @sslcipher, sslverify: @sslverify ) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 84 def configure(conf) super @accessor_for_record_key = record_accessor_create(@record_key) @accessors_for_record_matching = @record_matching_keys.map { |cf| record_accessor_create(cf.fluentd_record_key) } @column_names_for_record_matching = @record_matching_keys.map { |cf| cf.mysql_record_key } end
expand_placeholders(metadata)
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 199 def expand_placeholders(metadata) database = extract_placeholders(@database, metadata).gsub('.', '_') table = extract_placeholders(@table, metadata).gsub('.', '_') return database, table end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 91 def format(tag, time, record) value = @accessor_for_record_key.call(record) unless @accessors_for_record_matching.empty? keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(record) } if keys_for_origin_record.any?(&:nil?) @log.warn("Incoming record is omitted, because values for record matching include nil", record: record) return nil end end case value when String, Integer, Float [tag, time, record].to_msgpack else @log.warn("Incoming record is omitted, Supported value type of `record_key` is String, Integer, Float", record: record) nil end end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 110 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 114 def multi_workers_ready? true end
where_column_name()
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 118 def where_column_name @where_column || @record_key end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_mysql_fetch_and_emit.rb, line 122 def write(chunk) database, table = expand_placeholders(chunk.metadata) @handler = client(database) where_values = [] origin_records = {} chunk.msgpack_each do |tag, time, data| value = @accessor_for_record_key.call(data) case value when String where_values << "'" + Mysql2::Client.escape(value) + "'" if value when Integer, Float where_values << value.to_s if value else next end unless @accessors_for_record_matching.empty? keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(data) } parent = keys_for_origin_record[0..-2].inject(origin_records) do |h, v| h[v] ||= {} end parent[keys_for_origin_record.last] = data end end where_condition = "WHERE #{where_column_name} IN (#{where_values.join(',')})" if @additional_condition condition_sql = extract_placeholders(@additional_condition, chunk.metadata) where_condition += " AND #{condition_sql}" end sql = "SELECT #{@column_names.join(", ")} FROM #{table} #{where_condition}" @log.debug(sql) results = @handler.query(sql, cast_booleans: @cast_booleans, stream: @stream) time = Fluent::EventTime.now results.each do |row| unless @column_names_for_record_matching.empty? record = @column_names_for_record_matching.inject(origin_records) do |h, k| if h h[row[k]] end end if record @remove_keys.each do |k| record.delete(k) end if @merge_priority == :mysql row = record.merge!(row) else row = row.merge!(record) end end end @log.debug("emit", tag: @tag, record: row) router.emit(@tag, time, row) end end