class HTTP2Adapter::StreamAdapter

Virtualizes adapter over HTTP2 stream

Public Class Methods

new(connection) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 44
def initialize(connection)
  @connection = connection
end

Public Instance Methods

body() click to toggle source
# File lib/polyphony/http/client/http2.rb, line 140
def body
  @waiting_done_fiber = Fiber.current
  suspend
  @buffered_chunks.join
  # body = +''
  # while !@done
  #   p :body_suspend_pre
  #   chunk = suspend
  #   p :body_suspend_post
  #   body << chunk
  # end
  # puts ""
  # body
rescue Exception => e
  p e
  puts e.backtrace.join("\n")
end
each_chunk() { |shift until empty?| ... } click to toggle source
# File lib/polyphony/http/client/http2.rb, line 158
def each_chunk
  yield @buffered_chunks.shift until @buffered_chunks.empty?

  @waiting_chunk_fiber = Fiber.current
  until @done
    chunk = suspend
    yield chunk
  end
end
next_body_chunk() { |shift| ... } click to toggle source
# File lib/polyphony/http/client/http2.rb, line 168
def next_body_chunk
  return yield @buffered_chunks.shift unless @buffered_chunks.empty?

  @waiting_chunk_fuber = Fiber.current
  until @done
    chunk = suspend
    return yield chunk
  end

  nil
end
on_close(_stream) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 93
def on_close(_stream)
  @done = true
  @waiting_done_fiber&.schedule
end
on_data(chunk) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 85
def on_data(chunk)
  if @waiting_chunk_fiber
    @waiting_chunk_fiber&.schedule chunk
  else
    @buffered_chunks << chunk
  end
end
on_headers(headers) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 77
def on_headers(headers)
  if @waiting_headers_fiber
    @waiting_headers_fiber.schedule headers.to_h
  else
    @headers = headers.to_h
  end
end
prepare_headers(ctx) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 128
def prepare_headers(ctx)
  headers = {
    ':method'    => ctx[:method].to_s,
    ':scheme'    => ctx[:uri].scheme,
    ':authority' => [ctx[:uri].host, ctx[:uri].port].join(':'),
    ':path'      => ctx[:uri].request_uri,
    'User-Agent' => 'curl/7.54.0'
  }
  headers.merge!(ctx[:opts][:headers]) if ctx[:opts][:headers]
  headers
end
protocol() click to toggle source
# File lib/polyphony/http/client/http2.rb, line 124
def protocol
  :http2
end
request(ctx) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 48
def request(ctx)
  stream = setup_stream # (ctx, stream)
  send_request(ctx, stream)

  stream.on(:headers, &method(:on_headers))
  stream.on(:data, &method(:on_data))
  stream.on(:close, &method(:on_close))

  # stream.on(:active) { puts "* active" }
  # stream.on(:half_close) { puts "* half_close" }

  wait_for_response(ctx, stream)
rescue Exception => e
  p e
  puts e.backtrace.join("\n")
  # ensure
  # stream.close
end
send_request(ctx, stream) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 67
def send_request(ctx, stream)
  headers = prepare_headers(ctx)
  if ctx[:opts][:payload]
    stream.headers(headers, end_stream: false)
    stream.data(ctx[:opts][:payload], end_stream: true)
  else
    stream.headers(headers, end_stream: true)
  end
end
setup_stream() click to toggle source
# File lib/polyphony/http/client/http2.rb, line 98
def setup_stream
  stream = @connection.allocate_stream

  @headers = nil
  @done = nil
  @buffered_chunks = []

  @waiting_headers_fiber = nil
  @waiting_chunk_fiber = nil
  @waiting_done_fiber = nil

  stream
end
wait_for_headers() click to toggle source
# File lib/polyphony/http/client/http2.rb, line 117
def wait_for_headers
  return @headers if @headers

  @waiting_headers_fiber = Fiber.current
  suspend
end
wait_for_response(_ctx, _stream) click to toggle source
# File lib/polyphony/http/client/http2.rb, line 112
def wait_for_response(_ctx, _stream)
  headers = wait_for_headers
  Response.new(self, headers[':status'].to_i, headers)
end