class Nylas::Streaming::DeltaStream

A live stream of deltas that stays open until explicitely closed

Attributes

api[RW]
connect_timeout[RW]
cursor[RW]
exclude_types[RW]
expanded[RW]
inactivity_timeout[RW]
include_types[RW]

Public Class Methods

new(cursor:, api:, exclude_types: [], include_types: [], expanded: false, connect_timeout: 0, inactivity_timeout: 0) click to toggle source

@param cursor [String] Cursor to start listening for changes on @param api [Nylas::API] @param expanded [Boolean] Expands threads and messages @param exclude_types [Array<String>] List of Object types not to include in the stream @param include_types [Array<String>] List of Object types to exclusively include in the stream @param connect_timeout [Integer] How long to wait before timing out on attempted connection @param inactivity_timeout [Integer] How long to wait before timing out on inactivity

# File lib/nylas-streaming.rb, line 28
def initialize(cursor:, api:, exclude_types: [], include_types: [], expanded: false, connect_timeout: 0,
               inactivity_timeout: 0)
  self.cursor = cursor
  self.api = api
  self.exclude_types = exclude_types
  self.include_types = include_types
  self.expanded = expanded
  self.connect_timeout = connect_timeout
  self.inactivity_timeout = inactivity_timeout
end

Public Instance Methods

http_error_handler(client) click to toggle source
# File lib/nylas-streaming.rb, line 69
def http_error_handler(client)
  raise Nylas::Error, client.error
end
listener() click to toggle source
# File lib/nylas-streaming.rb, line 64
def listener
  @listener ||= EventMachine::HttpRequest.new(url, connect_timeout: connect_timeout,
                                                   inactivity_timeout: inactivity_timeout).get
end
parser() click to toggle source
# File lib/nylas-streaming.rb, line 73
def parser
  return @parser if @parser

  @parser = Yajl::Parser.new(symbolize_keys: true)
  @parser
end
query() click to toggle source
# File lib/nylas-streaming.rb, line 56
def query
  query = ["cursor=#{cursor}"]
  query << "view=expanded" if expanded
  query << "include_types=#{include_types.join(',')}" unless include_types.empty?
  query << "exclude_types=#{exclude_types.join(',')}" unless exclude_types.empty?
  query.join("&")
end
stream() { |registry.cast(merge(api: api))| ... } click to toggle source
# File lib/nylas-streaming.rb, line 39
def stream
  parser.on_parse_complete = lambda do |data|
    begin
      yield(Types.registry[:delta].cast(data.merge(api: api)))
    rescue Nylas::Error => e
      Nylas::Logging.logger.error(e)
      raise e
    end
  end

  listener.stream { |chunk| parser << chunk }
end
url() click to toggle source
# File lib/nylas-streaming.rb, line 52
def url
  "#{api.client.url_for_path('/delta/streaming')}?#{query}"
end