class OandaAPI::Streaming::Request

An HTTP 1.1 streaming request. Used to create a persistent connection

with the server and continuously download a stream of resource
representations. Resources are emitted as {OandaAPI::ResourceBase}
instances.

@!attribute [rw] client

@return [OandaAPI::Streaming::Client] a streaming client instance.

@!attribute [rw] emit_heartbeats

@return [boolean] true if heartbeats are emitted.

@!attribute [r] uri

@return [URI::HTTPS] a URI instance.

@!attribute [r] request

@return [URI::HTTPS] a URI instance.

Attributes

client[RW]
emit_heartbeats[RW]
request[R]
uri[R]

Public Class Methods

new(client: nil, uri: nil, query: {}, headers: {}) click to toggle source

Creates a `Streaming::Request` instance. @param [Streaming::Client] client a streaming client instance which can be used to

send signals to an instance of this `Streaming::Request` class.

@param [String] uri an absolute URI to the service endpoint. @param [Hash] query a list of query parameters, unencoded. The list

is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.

@param [Hash] headers a list of header values that will be sent with the request.

# File lib/oanda_api/streaming/request.rb, line 30
def initialize(client: nil, uri: nil, query: {}, headers: {})
  self.client = client.nil? ? self : client
  @uri = URI uri
  @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
  @http = Net::HTTP.new @uri.host, 443
  @http.use_ssl = true
  @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  @request = Net::HTTP::Get.new @uri
  headers.each_pair { |pair| @request.add_field(*pair) }
end

Public Instance Methods

client=(value) click to toggle source

Sets the client attribute @param [OandaAPI::Streaming::Client] value @return [void] @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.

# File lib/oanda_api/streaming/request.rb, line 45
def client=(value)
  fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)
  @client = value
end
emit_heartbeats?() click to toggle source

@return [boolean] true if heatbeats are emitted.

# File lib/oanda_api/streaming/request.rb, line 51
def emit_heartbeats?
  !!@emit_heartbeats
end
running?() click to toggle source

@return [boolean] true if the instance is connected and is streaming a response.

# File lib/oanda_api/streaming/request.rb, line 68
def running?
  !!@running
end
stop!() click to toggle source

Signals the streaming request to disconnect and terminates streaming. @return [void]

# File lib/oanda_api/streaming/request.rb, line 57
def stop!
  @stop_requested = true
end
stop_requested?() click to toggle source

Returns `true` if the request has been signalled to terminate. See {#stop!}. @return [boolean]

# File lib/oanda_api/streaming/request.rb, line 63
def stop_requested?
  !!@stop_requested
end
stream(&block) click to toggle source

Emits a stream of {OandaAPI::ResourceBase} instances, depending

on the endpoint that the request is servicing, either
{OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
instances are emitted. When {#emit_heartbeats?} is `true`, then
{OandaAPI::Resource::Heartbeat} could also be emitted.

Note this method runs as an infinite loop and will block indefinitely
until either the connection is halted or a {#stop!} signal is recieved.

@yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response

stream is yielded as they are received. The client instance controlling the
streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.

@raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. @raise [OandaAPI::RequestError] if an unexpected resource is returned. @return [void]

# File lib/oanda_api/streaming/request.rb, line 87
def stream(&block)
  @stop_requested = false
  @running = true
  # @http.set_debug_output $stderr
  @http.request(@request) do |response|
    buffer = ""
    response.read_body do |chunk|
      buffer << chunk
      next unless chunk.match(/\r\n\Z/)
      buffer.gsub!(/\r\n/,"")
      handle_response(buffer).each do |resource|
        block.call(resource, @client)
        return if stop_requested?
      end
      buffer = ""
      return if stop_requested?
      sleep 0.01
    end
  end
ensure
  @running = false
  @http.finish if @http.started?
end

Private Instance Methods

handle_response(string) click to toggle source

@private Converts a raw json response into {OandaAPI::ResourceBase} instances. @return [Array<OandaAPI::ResourceBase>] depending on the endpoint

that the request is servicing, which is either an array of
{OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.

@raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. @raise [OandaAPI::RequestError] if an unexpected resource is returned.

# File lib/oanda_api/streaming/request.rb, line 121
def handle_response(string)
  parse(string).map do |parsed_response|
    case
    when parsed_response[:heartbeat]
      OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats?
    when parsed_response[:tick]
      OandaAPI::Resource::Price.new parsed_response[:tick]
    when parsed_response[:transaction]
      OandaAPI::Resource::Transaction.new parsed_response[:transaction]
    when parsed_response[:disconnect]
      fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message]
    else
      fail OandaAPI::RequestError, "unknown resource: #{parsed_response}"
    end
  end.compact
end
parse(string) click to toggle source

@private Uses the best json parser available for optimal performance and stream parsing ability.

# File lib/oanda_api/streaming/request.rb, line 140
def parse(string)
  OandaAPI::Streaming::JsonParser.adapter.parse string
end