class Protobuf::Nats::JNats

Attributes

connection[R]
options[R]

Public Class Methods

new() click to toggle source
# File lib/protobuf/nats/jnats.rb, line 23
def initialize
  @on_error_cb = lambda {|error|}
  @on_reconnect_cb = lambda {}
  @on_disconnect_cb = lambda {}
  @on_close_cb = lambda {}
  @options = nil
  @subz_cbs = {}
  @subz_mutex = ::Mutex.new
end

Public Instance Methods

close() click to toggle source

Do not depend on close for a graceful disconnect.

# File lib/protobuf/nats/jnats.rb, line 77
def close
  @connection.close rescue nil
  @connection = nil
  @supervisor.kill rescue nil
  @supervisor = nil
  @consumer.kill rescue nil
  @supervisor = nil
end
connect(options = {}) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 33
def connect(options = {})
  @options ||= options

  servers = options[:servers] || ["nats://localhost:4222"]
  servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) }
  connection_factory = ::Java::IoNatsClient::ConnectionFactory.new
  connection_factory.setServers(servers)
  connection_factory.setMaxReconnect(options[:max_reconnect_attempts])

  # Shrink the pending buffer to always raise an error and let the caller retry.
  if options[:disable_reconnect_buffer]
    connection_factory.setReconnectBufSize(1)
  end

  # Setup callbacks
  connection_factory.setDisconnectedCallback { |event| @on_disconnect_cb.call }
  connection_factory.setReconnectedCallback { |_event| @on_reconnect_cb.call }
  connection_factory.setClosedCallback { |_event| @on_close_cb.call }
  connection_factory.setExceptionHandler { |error| @on_error_cb.call(error) }

  # Setup ssl context if we're using tls
  if options[:uses_tls]
    ssl_context = create_ssl_context(options)
    connection_factory.setSecure(true)
    connection_factory.setSSLContext(ssl_context)
  end

  @connection = connection_factory.createConnection

  # We're going to spawn a consumer and supervisor
  @work_queue = @connection.createMsgChannel
  spwan_supervisor_and_consumer

  @connection
end
flush(timeout_sec = 0.5) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 86
def flush(timeout_sec = 0.5)
  connection.flush(timeout_sec * 1000)
end
new_inbox() click to toggle source
# File lib/protobuf/nats/jnats.rb, line 138
def new_inbox
  "_INBOX.#{::SecureRandom.hex(13)}"
end
next_message(sub, timeout_sec) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 90
def next_message(sub, timeout_sec)
  nats_message = sub.nextMessage(timeout_sec * 1000)
  return nil unless nats_message
  Message.new(nats_message)
end
on_close(&cb) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 154
def on_close(&cb)
  @on_close_cb = cb
end
on_disconnect(&cb) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 146
def on_disconnect(&cb)
  @on_disconnect_cb = cb
end
on_error(&cb) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 150
def on_error(&cb)
  @on_error_cb = cb
end
on_reconnect(&cb) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 142
def on_reconnect(&cb)
  @on_reconnect_cb = cb
end
publish(subject, data, mailbox = nil) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 96
def publish(subject, data, mailbox = nil)
  # The "true" here is to force flush. May not need this.
  connection.publish(subject, mailbox, data.to_java_bytes, true)
end
subscribe(subject, options = {}, &block) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 101
def subscribe(subject, options = {}, &block)
  queue = options[:queue]
  max = options[:max]
  work_queue = nil
  # We pass our work queue for processing async work because java nats
  # uses a cahced thread pool: 1 thread per async subscription.
  # Sync subs need their own queue so work is not processed async.
  work_queue = block.nil? ? connection.createMsgChannel : @work_queue
  sub = connection.subscribe(subject, queue, nil, work_queue)

  # Register the block callback. We only lock to save the callback.
  if block
    @subz_mutex.synchronize do
      @subz_cbs[sub.getSid] = block
    end
  end

  # Auto unsub if max message option was provided.
  sub.autoUnsubscribe(max) if max

  sub
end
unsubscribe(sub) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 124
def unsubscribe(sub)
  return if sub.nil?

  # Cleanup our async callback
  if @subz_cbs[sub.getSid]
    @subz_mutex.synchronize do
      @subz_cbs.delete(sub.getSid)
    end
  end

  # The "true" here is to ignore and invalid conn.
  sub.unsubscribe(true)
end

Private Instance Methods

create_ssl_context(options) click to toggle source
# File lib/protobuf/nats/jnats.rb, line 217
def create_ssl_context(options)
  # Create our certs and key converters to go from bouncycastle to java.
  cert_converter = org.bouncycastle.cert.jcajce.JcaX509CertificateConverter.new
  key_converter = org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter.new

  # Load the certs and keys.
  tls_ca_cert = cert_converter.getCertificate(read_pem_object_from_file(options[:tls_ca_cert]))
  tls_client_cert = cert_converter.getCertificate(read_pem_object_from_file(options[:tls_client_cert]))
  tls_client_key = key_converter.getKeyPair(read_pem_object_from_file(options[:tls_client_key]))

  # Setup the CA cert.
  ca_key_store = java.security.KeyStore.getInstance(java.security.KeyStore.getDefaultType)
  ca_key_store.load(nil, nil)
  ca_key_store.setCertificateEntry("ca-certificate", tls_ca_cert)
  trust_manager = javax.net.ssl.TrustManagerFactory.getInstance(javax.net.ssl.TrustManagerFactory.getDefaultAlgorithm)
  trust_manager.init(ca_key_store)

  # Setup the cert / key pair.
  client_key_store = java.security.KeyStore.getInstance(java.security.KeyStore.getDefaultType)
  client_key_store.load(nil, nil)
  client_key_store.setCertificateEntry("certificate", tls_client_cert)
  certificate_java_array = [tls_client_cert].to_java(java.security.cert.Certificate)
  empty_password = [].to_java(:char)
  client_key_store.setKeyEntry("private-key", tls_client_key.getPrivate, empty_password, certificate_java_array)
  key_manager = javax.net.ssl.KeyManagerFactory.getInstance(javax.net.ssl.KeyManagerFactory.getDefaultAlgorithm)
  key_manager.init(client_key_store, empty_password)

  # Create ssl context.
  context = javax.net.ssl.SSLContext.getInstance("TLSv1.2")
  context.init(key_manager.getKeyManagers, trust_manager.getTrustManagers, nil)
  context
end
read_pem_object_from_file(path) click to toggle source

Jruby-openssl depends on bouncycastle so our lives don't suck super bad

# File lib/protobuf/nats/jnats.rb, line 207
def read_pem_object_from_file(path)
  fail ::ArgumentError, "Tried to read a PEM key or cert with path nil" if path.nil?

  file_reader = java.io.FileReader.new(path)
  pem_parser = org.bouncycastle.openssl.PEMParser.new(file_reader)
  object = pem_parser.readObject
  pem_parser.close
  object
end
spawn_consumer() click to toggle source
# File lib/protobuf/nats/jnats.rb, line 177
def spawn_consumer
  @consumer = ::Thread.new do
    loop do
      begin
        message = @work_queue.take
        next unless message
        sub = message.getSubscription

        # We have to update the subscription stats so we're not considered a slow consumer.
        begin
          sub.lock
          sub.incrPMsgs(-1)
          sub.incrPBytes(-message.getData.length) if message.getData
          sub.incrDelivered(1) unless sub.isClosed
        ensure
          sub.unlock
        end

        # We don't need t
        callback = @subz_cbs[sub.getSid]
        next unless callback
        callback.call(message.getData.to_s, message.getReplyTo, message.getSubject)
      rescue => error
        @on_error_cb.call(error)
      end
    end
  end
end
spwan_supervisor_and_consumer() click to toggle source
# File lib/protobuf/nats/jnats.rb, line 160
def spwan_supervisor_and_consumer
  spawn_consumer
  @supervisor = ::Thread.new do
    loop do
      begin
        sleep 1
        next if @consumer && @consumer.alive?
        # We need to recreate the consumer thread
        @consumer.kill if @consumer
        spawn_consumer
      rescue => error
        @on_error_cb.call(error)
      end
    end
  end
end