class Fluent::Plugin::HttpInput
Constants
- EMPTY_GIF_IMAGE
- EVENT_RECORD_PARAMETER
- RESPONSE_200
- RESPONSE_204
- RESPONSE_IMG
- RES_400_STATUS
- RES_500_STATUS
- RES_TEXT_HEADER
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Input::new
# File lib/fluent/plugin/in_http.rb, line 94 def initialize super @km = nil @format_name = nil @parser_time_key = nil # default parsers @parser_msgpack = nil @parser_json = nil @default_time_parser = nil @default_keep_time_key = nil @float_time_parser = nil # <parse> configured parser @custom_parser = nil end
Public Instance Methods
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Base#close
# File lib/fluent/plugin/in_http.rb, line 190 def close server_wait_until_stop super end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_http.rb, line 112 def configure(conf) compat_parameters_convert(conf, :parser) super if @cors_allow_credentials if @cors_allow_origins.nil? || @cors_allow_origins.include?('*') raise Fluent::ConfigError, "Cannot enable cors_allow_credentials without specific origins" end end m = if @parser_configs.first['@type'] == 'in_http' @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') @parser_msgpack.time_key = nil @parser_msgpack.estimate_current_event = false @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') @parser_json.time_key = nil @parser_json.estimate_current_event = false default_parser = parser_create(usage: '') @format_name = 'default' @parser_time_key = default_parser.time_key @default_time_parser = default_parser.get_time_parser @default_keep_time_key = default_parser.keep_time_key method(:parse_params_default) else @custom_parser = parser_create @format_name = @parser_configs.first['@type'] @parser_time_key = @custom_parser.time_key method(:parse_params_with_parser) end self.singleton_class.module_eval do define_method(:parse_params, m) end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 172 def multi_workers_ready? true end
on_request(path_info, params)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 202 def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') mes = Fluent::MultiEventStream.new parse_params(params) do |record_time, record| if record.nil? log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } next end add_params_to_record(record, params) time = if param_time = params['time'] param_time = param_time.to_f param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else record_time.nil? ? convert_time_field(record) : record_time end mes.add(time, record) end rescue => e if @dump_error_log log.error "failed to process request", error: e end return [RES_400_STATUS, RES_TEXT_HEADER, "400 Bad Request\n#{e}\n"] end # TODO server error begin router.emit_stream(tag, mes) unless mes.empty? rescue => e if @dump_error_log log.error "failed to emit data", error: e end return [RES_500_STATUS, RES_TEXT_HEADER, "500 Internal Server Error\n#{e}\n"] end if @respond_with_empty_img return RESPONSE_IMG else if @use_204_response return RESPONSE_204 else return RESPONSE_200 end end end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Input#start
# File lib/fluent/plugin/in_http.rb, line 176 def start @_event_loop_run_timeout = @blocking_timeout super log.debug "listening http", bind: @bind, port: @port @km = KeepaliveManager.new(@keepalive_timeout) event_loop_attach(@km) server_create_connection(:in_http, @port, bind: @bind, backlog: @backlog, &method(:on_server_connect)) @float_time_parser = Fluent::NumericTimeParser.new(:float) end
Private Instance Methods
add_params_to_record(record, params)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 304 def add_params_to_record(record, params) if @add_http_headers params.each_pair { |k, v| if k.start_with?("HTTP_".freeze) record[k] = v end } end if @add_query_params params.each_pair { |k, v| if k.start_with?("QUERY_".freeze) record[k] = v end } end if @add_remote_addr record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end end
convert_time_field(record)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 326 def convert_time_field(record) if t = @default_keep_time_key ? record[@parser_time_key] : record.delete(@parser_time_key) if @default_time_parser @default_time_parser.parse(t) else Fluent::EventTime.from_time(Time.at(t)) end else Fluent::EventTime.now end end
on_server_connect(conn)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 255 def on_server_connect(conn) handler = Handler.new(conn, @km, method(:on_request), @body_size_limit, @format_name, log, @cors_allow_origins, @cors_allow_credentials, @add_query_params) conn.on(:data) do |data| handler.on_read(data) end conn.on(:write_complete) do |_| handler.on_write_complete end conn.on(:close) do |_| handler.on_close end end
parse_params_default(params) { |nil, record| ... }
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 274 def parse_params_default(params) if msgpack = params['msgpack'] @parser_msgpack.parse(msgpack) do |_time, record| yield nil, record end elsif js = params['json'] @parser_json.parse(js) do |_time, record| yield nil, record end elsif ndjson = params['ndjson'] ndjson.split(/\r?\n/).each do |js| @parser_json.parse(js) do |_time, record| yield nil, record end end else raise "'json', 'ndjson' or 'msgpack' parameter is required" end end
parse_params_with_parser(params) { |time, record| ... }
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 294 def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @custom_parser.parse(content) do |time, record| yield time, record end else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end end