class Protobuf::Nats::Client

Constants

SubscriptionInbox

Structure to hold subscription and inbox to use within pool

Public Class Methods

new(options) click to toggle source
Calls superclass method
# File lib/protobuf/nats/client.rb, line 33
def initialize(options)
  # may need to override to setup connection at this stage ... may also do on load of class
  super

  # This will ensure the client is started.
  ::Protobuf::Nats.start_client_nats_connection
end
subscription_key_cache() click to toggle source
# File lib/protobuf/nats/client.rb, line 71
def self.subscription_key_cache
  @subscription_key_cache ||= {}
end
subscription_pool() click to toggle source
# File lib/protobuf/nats/client.rb, line 17
def self.subscription_pool
  @subscription_pool ||= ::ConnectionPool.new(:size => subscription_pool_size, :timeout => 0.1) do
    inbox = ::Protobuf::Nats.client_nats_connection.new_inbox

    SubscriptionInbox.new(::Protobuf::Nats.client_nats_connection.subscribe(inbox), inbox)
  end
end
subscription_pool_size() click to toggle source
# File lib/protobuf/nats/client.rb, line 25
def self.subscription_pool_size
  @subscription_pool_size ||= if ::ENV.key?("PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE")
    ::ENV["PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE"].to_i
  else
    0
  end
end

Public Instance Methods

ack_timeout() click to toggle source
# File lib/protobuf/nats/client.rb, line 75
def ack_timeout
  @ack_timeout ||= if ::ENV.key?("PB_NATS_CLIENT_ACK_TIMEOUT")
    ::ENV["PB_NATS_CLIENT_ACK_TIMEOUT"].to_i
  else
    5
  end
end
cached_subscription_key() click to toggle source
# File lib/protobuf/nats/client.rb, line 179
def cached_subscription_key
  klass = @options[:service]
  method_name = @options[:method]

  method_name_cache = self.class.subscription_key_cache[klass] ||= {}
  method_name_cache[method_name] ||= begin
    ::Protobuf::Nats.subscription_key(klass, method_name)
  end
end
close_connection() click to toggle source
# File lib/protobuf/nats/client.rb, line 67
def close_connection
  # no-op (I think for now), the connection to server is persistent
end
formatted_service_and_method_name() click to toggle source
# File lib/protobuf/nats/client.rb, line 189
def formatted_service_and_method_name
  klass = @options[:service]
  method_name = @options[:method]
  "#{klass}##{method_name}"
end
nack_backoff_intervals() click to toggle source
# File lib/protobuf/nats/client.rb, line 83
def nack_backoff_intervals
  @nack_backoff_intervals ||= if ::ENV.key?("PB_NATS_CLIENT_NACK_BACKOFF_INTERVALS")
    ::ENV["PB_NATS_CLIENT_NACK_BACKOFF_INTERVALS"].split(",").map(&:to_i)
  else
    [0, 1, 3, 5, 10]
  end
end
nack_backoff_splay() click to toggle source
# File lib/protobuf/nats/client.rb, line 91
def nack_backoff_splay
  @nack_backoff_splay ||= if nack_backoff_splay_limit > 0
    rand(nack_backoff_splay_limit)
  else
    0
  end
end
nack_backoff_splay_limit() click to toggle source
# File lib/protobuf/nats/client.rb, line 99
def nack_backoff_splay_limit
  @nack_backoff_splay_limit ||= if ::ENV.key?("PB_NATS_CLIENT_NACK_BACKOFF_SPLAY_LIMIT")
    ::ENV["PB_NATS_CLIENT_NACK_BACKOFF_SPLAY_LIMIT"].to_i
  else
    10
  end
end
nats_request_with_two_responses(subject, data, opts) click to toggle source

This is a request that expects two responses.

  1. An ACK from the server. We use a shorter timeout.

  2. A PB message from the server. We use a longer timoeut.

# File lib/protobuf/nats/client.rb, line 203
def nats_request_with_two_responses(subject, data, opts)
  # Wait for the ACK from the server
  ack_timeout = opts[:ack_timeout] || 5
  # Wait for the protobuf response
  timeout = opts[:timeout] || 60

  nats = ::Protobuf::Nats.client_nats_connection

  # Publish to server
  with_subscription do |sub_inbox|
    begin
      completed_request = false

      if !sub_inbox.subscription.is_valid # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect)
        nats.unsubscribe(sub_inbox.subscription)
        sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary
      end

      nats.publish(subject, data, sub_inbox.inbox)

      # Wait for reply
      first_message = nats.next_message(sub_inbox.subscription, ack_timeout)
      return :ack_timeout if first_message.nil?

      first_message_data = first_message.data
      return :nack if first_message_data == ::Protobuf::Nats::Messages::NACK

      second_message = nats.next_message(sub_inbox.subscription, timeout)
      second_message_data = second_message.nil? ? nil : second_message.data

      # Check messages
      response = case ::Protobuf::Nats::Messages::ACK
                 when first_message_data then second_message_data
                 when second_message_data then first_message_data
                 else return :ack_timeout
                 end

      fail(::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name) unless response

      completed_request = true
      response
    ensure
      if !completed_request
        nats.unsubscribe(sub_inbox.subscription)
        sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary
      end
    end
  end
end
new_subscription_inbox() click to toggle source
# File lib/protobuf/nats/client.rb, line 41
def new_subscription_inbox
  nats = ::Protobuf::Nats.client_nats_connection
  inbox = nats.new_inbox
  sub = if use_subscription_pooling?
          nats.subscribe(inbox)
        else
          nats.subscribe(inbox, :max => 2)
        end

  SubscriptionInbox.new(sub, inbox)
end
reconnect_delay() click to toggle source
# File lib/protobuf/nats/client.rb, line 107
def reconnect_delay
  @reconnect_delay ||= if ::ENV.key?("PB_NATS_CLIENT_RECONNECT_DELAY")
    ::ENV["PB_NATS_CLIENT_RECONNECT_DELAY"].to_i
  else
    ack_timeout
  end
end
response_timeout() click to toggle source
# File lib/protobuf/nats/client.rb, line 115
def response_timeout
  @response_timeout ||= if ::ENV.key?("PB_NATS_CLIENT_RESPONSE_TIMEOUT")
    ::ENV["PB_NATS_CLIENT_RESPONSE_TIMEOUT"].to_i
  else
    60
  end
end
send_request() click to toggle source
# File lib/protobuf/nats/client.rb, line 128
def send_request
  # This will ensure the client is started.
  ::Protobuf::Nats.start_client_nats_connection

  if use_subscription_pooling?
    available = self.class.subscription_pool.instance_variable_get("@available")
    ::ActiveSupport::Notifications.instrument "client.subscription_pool_available_size.protobuf-nats", available.length
  end

  ::ActiveSupport::Notifications.instrument "client.request_duration.protobuf-nats" do
    send_request_through_nats
  end
end
send_request_through_nats() click to toggle source
# File lib/protobuf/nats/client.rb, line 142
def send_request_through_nats
  retries ||= 3
  nack_retry ||= 0

  loop do
    setup_connection
    request_options = {:timeout => response_timeout, :ack_timeout => ack_timeout}
    @response_data = nats_request_with_two_responses(cached_subscription_key, @request_data, request_options)
    case @response_data
    when :ack_timeout
      ::ActiveSupport::Notifications.instrument "client.request_timeout.protobuf-nats"
      next if (retries -= 1) > 0
      raise ::Protobuf::Nats::Errors::RequestTimeout, formatted_service_and_method_name
    when :nack
      ::ActiveSupport::Notifications.instrument "client.request_nack.protobuf-nats"
      interval = nack_backoff_intervals[nack_retry]
      nack_retry += 1
      raise ::Protobuf::Nats::Errors::RequestTimeout, formatted_service_and_method_name if interval.nil?
      sleep((interval + nack_backoff_splay)/1000.0)
      next
    end

    break
  end

  parse_response
rescue ::Protobuf::Nats::Errors::IOException => error
  ::Protobuf::Nats.log_error(error)

  delay = reconnect_delay
  logger.warn "An IOException was raised. We are going to sleep for #{delay} seconds."
  sleep delay

  retry if (retries -= 1) > 0
  raise
end
use_subscription_pooling?() click to toggle source
# File lib/protobuf/nats/client.rb, line 123
def use_subscription_pooling?
  return @use_subscription_pooling unless @use_subscription_pooling.nil?
  @use_subscription_pooling = self.class.subscription_pool_size > 0
end
with_subscription() { |sub_inbox| ... } click to toggle source
# File lib/protobuf/nats/client.rb, line 53
def with_subscription
  return_value = nil

  if use_subscription_pooling?
    self.class.subscription_pool.with do |sub_inbox|
      return_value = yield sub_inbox
    end
  else
    return_value = yield new_subscription_inbox
  end

  return_value
end