class Vines::Stream

The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.

Constants

ERROR
PAD

Attributes

config[R]
domain[R]
user[RW]

Public Class Methods

new(config) click to toggle source
# File lib/vines/stream.rb, line 16
def initialize(config)
  @config = config
end

Public Instance Methods

advance(state) click to toggle source

Advance the stream's state machine to the new state. XML nodes received by the stream will be passed to this state's node method.

# File lib/vines/stream.rb, line 143
def advance(state)
  @state = state
end
available_resources(*jid) click to toggle source
# File lib/vines/stream.rb, line 93
def available_resources(*jid)
  router.available_resources(*jid, user.jid)
end
cert_domain_matches?(domain) click to toggle source
# File lib/vines/stream.rb, line 110
def cert_domain_matches?(domain)
  @store.domain?(get_peer_cert, domain)
end
close_connection(after_writing=false) click to toggle source

Advance the state machine into the Closed state so any remaining queued nodes are not processed while we're waiting for EM to actually close the connection.

Calls superclass method
# File lib/vines/stream.rb, line 47
def close_connection(after_writing=false)
  super
  @closed = true
  advance(Client::Closed.new(self))
end
connected_resources(jid) click to toggle source
# File lib/vines/stream.rb, line 89
def connected_resources(jid)
  router.connected_resources(jid, user.jid)
end
create_parser() click to toggle source

Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).

# File lib/vines/stream.rb, line 36
def create_parser
  @parser = Parser.new.tap do |p|
    p.stream_open {|node| @nodes.push(node) }
    p.stream_close { close_connection }
    p.stanza {|node| @nodes.push(node) }
  end
end
encrypt() click to toggle source
# File lib/vines/stream.rb, line 123
def encrypt
  cert, key = @store.files_for_domain(domain)
  start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true)
end
encrypt?() click to toggle source

Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.

# File lib/vines/stream.rb, line 130
def encrypt?
  !@store.files_for_domain(domain).nil?
end
error(e) click to toggle source

Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.

# File lib/vines/stream.rb, line 150
def error(e)
  case e
  when SaslError, StanzaError
    write(e.to_xml)
  when StreamError
    send_stream_error(e)
    close_stream
  else
    log.error(e)
    send_stream_error(StreamErrors::InternalServerError.new)
    close_stream
  end
end
interested_resources(*jid) click to toggle source
# File lib/vines/stream.rb, line 97
def interested_resources(*jid)
  router.interested_resources(*jid, user.jid)
end
post_init() click to toggle source
# File lib/vines/stream.rb, line 20
def post_init
  @remote_addr, @local_addr = addresses
  @user, @closed, @stanza_size = nil, false, 0
  @bucket = TokenBucket.new(100, 10)
  @store = Store.new(@config.certs)
  @nodes = EM::Queue.new
  process_node_queue
  create_parser
  log.info { "%s %21s -> %s" %
    ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] }
end
receive_data(data) click to toggle source
# File lib/vines/stream.rb, line 53
def receive_data(data)
  return if @closed
  @stanza_size += data.bytesize
  if @stanza_size < max_stanza_size
    @parser << data rescue error(StreamErrors::NotWellFormed.new)
  else
    error(StreamErrors::PolicyViolation.new('max stanza size reached'))
  end
end
reset() click to toggle source

Reset the connection's XML parser when a new <stream:stream> header is received.

# File lib/vines/stream.rb, line 65
def reset
  create_parser
end
router() click to toggle source
# File lib/vines/stream.rb, line 164
def router
  @config.router
end
ssl_verify_peer(pem) click to toggle source
# File lib/vines/stream.rb, line 101
def ssl_verify_peer(pem)
  # EM is supposed to close the connection when this returns false,
  # but it only does that for inbound connections, not when we
  # make a connection to another server.
  @store.trusted?(pem).tap do |trusted|
    close_connection unless trusted
  end
end
storage(domain=nil) click to toggle source

Returns the storage system for the domain. If no domain is given, the stream's storage mechanism is returned.

# File lib/vines/stream.rb, line 71
def storage(domain=nil)
  @config.storage(domain || self.domain)
end
unbind() click to toggle source
# File lib/vines/stream.rb, line 134
def unbind
  router.delete(self)
  log.info { "%s %21s -> %s" %
    ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] }
  log.info { "Streams connected: #{router.size}" }
end
update_user_streams(user) click to toggle source

Reload the user's information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.

# File lib/vines/stream.rb, line 83
def update_user_streams(user)
  connected_resources(user.jid.bare).each do |stream|
    stream.user.update_from(user)
  end
end
vhost() click to toggle source

Returns the Vines::Config::Host virtual host for the stream's domain.

# File lib/vines/stream.rb, line 76
def vhost
  @config.vhost(domain)
end
write(data) click to toggle source

Send the data over the wire to this client.

# File lib/vines/stream.rb, line 115
def write(data)
  log_node(data, :out)
  if data.respond_to?(:to_xml)
    data = data.to_xml(:indent => 0)
  end
  send_data(data)
end

Private Instance Methods

addresses() click to toggle source

Return the remote and local socket addresses used by this connection.

# File lib/vines/stream.rb, line 171
def addresses
  [get_peername, get_sockname].map do |addr|
    addr ? Socket.unpack_sockaddr_in(addr)[0, 2].reverse.join(':') : 'unknown'
  end
end
close_stream() click to toggle source

Write a closing stream tag to the stream then close the stream. Subclasses can override this method for custom close behavior.

# File lib/vines/stream.rb, line 185
def close_stream
  write('</stream:stream>')
  close_connection_after_writing
end
enforce_rate_limit() click to toggle source
# File lib/vines/stream.rb, line 220
def enforce_rate_limit
  unless @bucket.take(1)
    raise StreamErrors::PolicyViolation.new('rate limit exceeded')
  end
end
error?(node) click to toggle source
# File lib/vines/stream.rb, line 190
def error?(node)
  ns = node.namespace ? node.namespace.href : nil
  node.name == ERROR && ns == NAMESPACES[:stream]
end
log_node(node, direction) click to toggle source
# File lib/vines/stream.rb, line 226
def log_node(node, direction)
  return unless log.debug?
  from, to = @remote_addr, @local_addr
  from, to = to, from if direction == :out
  label = (direction == :out) ? 'Sent' : 'Received'
  log.debug("%s %21s -> %s\n%s\n" %
    ["#{label} stanza:".ljust(PAD), from, to, node])
end
process_node(node) click to toggle source
# File lib/vines/stream.rb, line 207
def process_node(node)
  log_node(node, :in)
  @stanza_size = 0
  enforce_rate_limit
  if error?(node)
    close_stream
  else
    state.node(node)
  end
rescue => e
  error(e)
end
process_node_queue() click to toggle source

Schedule a queue pop on the EM thread to handle the next element. This guarantees all stanzas received on this stream are processed in order. tools.ietf.org/html/rfc6120#section-10.1

# File lib/vines/stream.rb, line 198
def process_node_queue
  @nodes.pop do |node|
    Fiber.new do
      process_node(node)
      process_node_queue
    end.resume unless @closed
  end
end
send_stream_error(e) click to toggle source

Write the StreamError's xml to the stream. Subclasses can override this method with custom error writing behavior.

# File lib/vines/stream.rb, line 179
def send_stream_error(e)
  write(e.to_xml)
end
state() click to toggle source

Returns the current State of the stream's state machine. Provided as a method so subclasses can override the behavior.

# File lib/vines/stream.rb, line 237
def state
  @state
end
valid_address?(jid) click to toggle source

Return true if this is a valid domain-only JID that can be used in stream initiation stanza headers.

# File lib/vines/stream.rb, line 243
def valid_address?(jid)
  JID.new(jid).domain? rescue false
end