class WendelinClient

class representing a Wendelin client

Public Class Methods

new(streamtool_uri, credentials, log, ssl_timeout, open_timeout, read_timeout, keep_alive_timeout) click to toggle source

`streamtool_uri` - URI pointing to portal_input_data_stream “mountpoint” `credentials` # {'user' => _, 'password' => _} TODO change to certificate `log` - logger to use

# File lib/fluent/plugin/wendelin_client.rb, line 27
def initialize(streamtool_uri, credentials, log,
               ssl_timeout, open_timeout, read_timeout, keep_alive_timeout)
  @streamtool_uri     = streamtool_uri
  @credentials        = credentials
  @log                = log
  @ssl_timeout        = ssl_timeout
  @open_timeout       = open_timeout
  @read_timeout       = read_timeout
  @keep_alive_timeout = keep_alive_timeout
end

Public Instance Methods

ingest(reference, data_chunk) click to toggle source
# File lib/fluent/plugin/wendelin_client.rb, line 120
 def ingest(reference, data_chunk)
    uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
    req = Net::HTTP::Post.new(uri)
    if @credentials.has_key?('user')
      req.basic_auth @credentials['user'], @credentials['password']
    end

    # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
    # and it is far too slow. Such POST is legit:
    # https://stackoverflow.com/a/14710450
    req.body = data_chunk
    req.content_type = 'application/octet-stream'

    @log.on_trace do
        @log.trace '>>> REQUEST'
        @log.trace "method\t=> #{req.method}"
        @log.trace "path\t=> #{req.path}"
        @log.trace "uri\t=> #{req.uri}"
        @log.trace "body\t=> #{req.body}"
        @log.trace "body_stream\t=> #{req.body_stream}"
        req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
        @log.trace
    end

    begin
        # TODO keep connection open (so that every new ingest does not do
        # full connect again)
        res = Net::HTTP.start(uri.hostname, uri.port,
                    :use_ssl      => (uri.scheme == 'https'),
                    # NOTE = "do not check server cert"
                    # TODO move this out to conf parameters
                    :verify_mode  => OpenSSL::SSL::VERIFY_NONE,

                    # Net::HTTP default open timeout is infinity, which results
                    # in thread hang forever if other side does not fully
                    # establish connection. Default read_timeout is 60 seconds.
                    # We go safe way and make sure all timeouts are defined.
                    :ssl_timeout  => @ssl_timeout,
                    :open_timeout => @open_timeout,
                    :read_timeout => @read_timeout,
              ) do |http|
            http.request(req)
        end

    rescue
        # some http/ssl/other connection error
        @log.warn "HTTP ERROR:"
        raise

    else
        @log.on_trace do
            @log.trace '>>> RESPONSE'
            res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
            @log.trace "code\t=> #{res.code}"
            @log.trace "msg\t=> #{res.message}"
            @log.trace "class\t=> #{res.class}"
            @log.trace "body:", res.body
        end

        if res.kind_of?(Net::HTTPSuccess)       # res.code is 2XX
            #@log.info "ingested ok"
        else
            @log.warn "FAIL:"
            res.value
        end
    end
end
ingest_with_keep_alive(reference, data_chunk) click to toggle source

ingest `data_chunk` to a stream referenced as `reference`

# File lib/fluent/plugin/wendelin_client.rb, line 59
def ingest_with_keep_alive(reference, data_chunk)
  uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
  # call start_connection if http is undefined
  if ! defined? @http
    start_connection(uri)
  end

  # connect again if the connection is not started
  if ! @http.started?()
    start_connection(uri)
  end

  @request = Net::HTTP::Post.new(uri)

  # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
  # and it is far too slow. Such POST is legit:
  # https://stackoverflow.com/a/14710450
  @request.body = data_chunk
  @request.content_type = 'application/octet-stream'

  if @credentials.has_key?('user')
    @request.basic_auth @credentials['user'], @credentials['password']
  end

  @log.on_trace do
    @log.trace '>>> REQUEST'
    @log.trace "method\t=> #{@request.method}"
    @log.trace "path\t=> #{@request.path}"
    @log.trace "uri\t=> #{@request.uri}"
    @log.trace "body\t=> #{@request.body}"
    @log.trace "body_stream\t=> #{@request.body_stream}"
    @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
    @log.trace
  end

  begin
    res = @http.request(@request) # Net::HTTPResponse object
  end

  rescue
    # some http/ssl/other connection error
    @log.warn "HTTP ERROR:"
  raise
else
  @log.on_trace do
    @log.trace '>>> RESPONSE'
    res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
    @log.trace "code\t=> #{res.code}"
    @log.trace "msg\t=> #{res.message}"
    @log.trace "class\t=> #{res.class}"
    @log.trace "body:", res.body
  end

  if res.kind_of?(Net::HTTPSuccess)       # res.code is 2XX
    #@log.info "ingested ok"
  else
    @log.warn "FAIL:"
    res.value
  end
end
start_connection(uri) click to toggle source

start request in an independent function to keep the connection open

# File lib/fluent/plugin/wendelin_client.rb, line 40
def start_connection(uri)

  @log.debug "start new connection"

  @http = Net::HTTP.start(uri.hostname, uri.port,
    :use_ssl => (uri.scheme == 'https'),
    :verify_mode  => OpenSSL::SSL::VERIFY_NONE,

    # Net::HTTP default open timeout is infinity, which results
    # in thread hang forever if other side does not fully
    # establish connection. Default read_timeout is 60 seconds.
    # We go safe way and make sure all timeouts are defined.
    :ssl_timeout  => @ssl_timeout,
    :open_timeout => @open_timeout,
    :read_timeout => @read_timeout,
    :keep_alive_timeout => @keep_alive_timeout,)
end