class Fluent::Plugin::HttpInput::Handler

Constants

RES_200_STATUS
RES_403_STATUS

Attributes

content_type[R]

Public Class Methods

new(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 341
def initialize(io, km, callback, body_size_limit, format_name, log,
               cors_allow_origins, cors_allow_credentials, add_query_params)
  @io = io
  @km = km
  @callback = callback
  @body_size_limit = body_size_limit
  @next_close = false
  @format_name = format_name
  @log = log
  @cors_allow_origins = cors_allow_origins
  @cors_allow_credentials = cors_allow_credentials
  @idle = 0
  @add_query_params = add_query_params
  @km.add(self)

  @remote_port, @remote_addr = io.remote_port, io.remote_addr
  @parser = Http::Parser.new(self)
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 584
def close
  @io.close
end
closing?() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 597
def closing?
  @next_close
end
handle_get_request() click to toggle source

Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.

# File lib/fluent/plugin/in_http.rb, line 450
def handle_get_request
  return send_response_and_close(RES_200_STATUS, {}, "")
end
handle_options_request() click to toggle source

Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.

# File lib/fluent/plugin/in_http.rb, line 456
def handle_options_request
  # Is CORS enabled in the first place?
  if @cors_allow_origins.nil?
    return send_response_and_close(RES_403_STATUS, {}, "")
  end

  # in_http does not support HTTP methods except POST
  if @access_control_request_method != 'POST'
    return send_response_and_close(RES_403_STATUS, {}, "")
  end

  header = {
    "Access-Control-Allow-Methods" => "POST",
    "Access-Control-Allow-Headers" => @access_control_request_headers || "",
  }

  # Check the origin and send back a CORS response
  if @cors_allow_origins.include?('*')
    header["Access-Control-Allow-Origin"] = "*"
    send_response_and_close(RES_200_STATUS, header, "")
  elsif include_cors_allow_origin
    header["Access-Control-Allow-Origin"] = @origin
    if @cors_allow_credentials
      header["Access-Control-Allow-Credentials"] = true
    end
    send_response_and_close(RES_200_STATUS, header, "")
  else
    send_response_and_close(RES_403_STATUS, {}, "")
  end
end
include_cors_allow_origin() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 624
def include_cors_allow_origin
  if @origin.nil?
    return false
  end

  if @cors_allow_origins.include?(@origin)
    return true
  end
  filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""}
  r = filtered_cors_allow_origins.find do |origin|
    (start_str, end_str) = origin.split("*", 2)
    @origin.start_with?(start_str) && @origin.end_with?(end_str)
  end

  !r.nil?
end
on_body(chunk) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 435
def on_body(chunk)
  if @body.bytesize + chunk.bytesize > @body_size_limit
    unless closing?
      send_response_and_close("413 Request Entity Too Large", {}, "Too large")
    end
    return
  end
  @body << chunk
end
on_close() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 364
def on_close
  @km.delete(self)
end
on_headers_complete(headers) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 381
def on_headers_complete(headers)
  expect = nil
  size = nil

  if @parser.http_version == [1, 1]
    @keep_alive = true
  else
    @keep_alive = false
  end
  @env = {}
  @content_type = ""
  @content_encoding = ""
  headers.each_pair {|k,v|
    @env["HTTP_#{k.tr('-','_').upcase}"] = v
    case k
    when /\AExpect\z/i
      expect = v
    when /\AContent-Length\Z/i
      size = v.to_i
    when /\AContent-Type\Z/i
      @content_type = v
    when /\AContent-Encoding\Z/i
      @content_encoding = v
    when /\AConnection\Z/i
      if /close/i.match?(v)
        @keep_alive = false
      elsif /Keep-alive/i.match?(v)
        @keep_alive = true
      end
    when /\AOrigin\Z/i
      @origin  = v
    when /\AX-Forwarded-For\Z/i
      # For multiple X-Forwarded-For headers. Use first header value.
      v = v.first if v.is_a?(Array)
      @remote_addr = v.split(",").first
    when /\AAccess-Control-Request-Method\Z/i
      @access_control_request_method = v
    when /\AAccess-Control-Request-Headers\Z/i
      @access_control_request_headers = v
    end
  }
  if expect
    if expect == '100-continue'.freeze
      if !size || size < @body_size_limit
        send_response_nobody("100 Continue", {})
      else
        send_response_and_close("413 Request Entity Too Large", {}, "Too large")
      end
    else
      send_response_and_close("417 Expectation Failed", {}, "")
    end
  end
end
on_message_begin() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 377
def on_message_begin
  @body = ''
end
on_message_complete() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 487
def on_message_complete
  return if closing?

  if @parser.http_method == 'GET'.freeze
    return handle_get_request()
  end

  if @parser.http_method == 'OPTIONS'.freeze
    return handle_options_request()
  end

  # CORS check
  # ==========
  # For every incoming request, we check if we have some CORS
  # restrictions and allow listed origins through @cors_allow_origins.
  unless @cors_allow_origins.nil?
    unless @cors_allow_origins.include?('*') || include_cors_allow_origin
      send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "")
      return
    end
  end

  # Content Encoding
  # =================
  # Decode payload according to the "Content-Encoding" header.
  # For now, we only support 'gzip' and 'deflate'.
  begin
    if @content_encoding == 'gzip'.freeze
      @body = Zlib::GzipReader.new(StringIO.new(@body)).read
    elsif @content_encoding == 'deflate'.freeze
      @body = Zlib::Inflate.inflate(@body)
    end
  rescue
    @log.warn 'fails to decode payload', error: $!.to_s
    send_response_and_close(RES_400_STATUS, {}, "")
    return
  end

  @env['REMOTE_ADDR'] = @remote_addr if @remote_addr

  uri = URI.parse(@parser.request_url)
  params = WEBrick::HTTPUtils.parse_query(uri.query)

  if @format_name != 'default'
    params[EVENT_RECORD_PARAMETER] = @body
  elsif /^application\/x-www-form-urlencoded/.match?(@content_type)
    params.update WEBrick::HTTPUtils.parse_query(@body)
  elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/
    boundary = WEBrick::HTTPUtils.dequote($1)
    params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary)
  elsif /^application\/json/.match?(@content_type)
    params['json'] = @body
  elsif /^application\/csp-report/.match?(@content_type)
    params['json'] = @body
  elsif /^application\/msgpack/.match?(@content_type)
    params['msgpack'] = @body
  elsif /^application\/x-ndjson/.match?(@content_type)
    params['ndjson'] = @body
  end
  path_info = uri.path

  if (@add_query_params)

    query_params = WEBrick::HTTPUtils.parse_query(uri.query)

    query_params.each_pair {|k,v|
      params["QUERY_#{k.tr('-','_').upcase}"] = v
    }
  end

  params.merge!(@env)

  @env.clear

  code, header, body = @callback.call(path_info, params)
  body = body.to_s
  header = header.dup if header.frozen?

  unless @cors_allow_origins.nil?
    if @cors_allow_origins.include?('*')
      header['Access-Control-Allow-Origin'] = '*'
    elsif include_cors_allow_origin
      header['Access-Control-Allow-Origin'] = @origin
      if @cors_allow_credentials
        header["Access-Control-Allow-Credentials"] = true
      end
    end
  end

  if @keep_alive
    header['Connection'] = 'Keep-Alive'.freeze
    send_response(code, header, body)
  else
    send_response_and_close(code, header, body)
  end
end
on_read(data) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 368
def on_read(data)
  @idle = 0
  @parser << data
rescue
  @log.warn "unexpected error", error: $!.to_s
  @log.warn_backtrace
  @io.close
end
on_write_complete() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 588
def on_write_complete
  @io.close if @next_close
end
send_response(code, header, body) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 601
def send_response(code, header, body)
  header['Content-Length'] ||= body.bytesize
  header['Content-Type'] ||= 'text/plain'.freeze

  data = %[HTTP/1.1 #{code}\r\n]
  header.each_pair {|k,v|
    data << "#{k}: #{v}\r\n"
  }
  data << "\r\n".freeze
  @io.write(data)

  @io.write(body)
end
send_response_and_close(code, header, body) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 592
def send_response_and_close(code, header, body)
  send_response(code, header, body)
  @next_close = true
end
send_response_nobody(code, header) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 615
def send_response_nobody(code, header)
  data = %[HTTP/1.1 #{code}\r\n]
  header.each_pair {|k,v|
    data << "#{k}: #{v}\r\n"
  }
  data << "\r\n".freeze
  @io.write(data)
end
step_idle() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 360
def step_idle
  @idle += 1
end