class Protobuf::Nats::Client
Constants
- SubscriptionInbox
Structure to hold subscription and inbox to use within pool
Public Class Methods
new(options)
click to toggle source
Calls superclass method
# File lib/protobuf/nats/client.rb, line 33 def initialize(options) # may need to override to setup connection at this stage ... may also do on load of class super # This will ensure the client is started. ::Protobuf::Nats.start_client_nats_connection end
subscription_key_cache()
click to toggle source
# File lib/protobuf/nats/client.rb, line 71 def self.subscription_key_cache @subscription_key_cache ||= {} end
subscription_pool()
click to toggle source
# File lib/protobuf/nats/client.rb, line 17 def self.subscription_pool @subscription_pool ||= ::ConnectionPool.new(:size => subscription_pool_size, :timeout => 0.1) do inbox = ::Protobuf::Nats.client_nats_connection.new_inbox SubscriptionInbox.new(::Protobuf::Nats.client_nats_connection.subscribe(inbox), inbox) end end
subscription_pool_size()
click to toggle source
# File lib/protobuf/nats/client.rb, line 25 def self.subscription_pool_size @subscription_pool_size ||= if ::ENV.key?("PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE") ::ENV["PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE"].to_i else 0 end end
Public Instance Methods
ack_timeout()
click to toggle source
# File lib/protobuf/nats/client.rb, line 75 def ack_timeout @ack_timeout ||= if ::ENV.key?("PB_NATS_CLIENT_ACK_TIMEOUT") ::ENV["PB_NATS_CLIENT_ACK_TIMEOUT"].to_i else 5 end end
cached_subscription_key()
click to toggle source
# File lib/protobuf/nats/client.rb, line 179 def cached_subscription_key klass = @options[:service] method_name = @options[:method] method_name_cache = self.class.subscription_key_cache[klass] ||= {} method_name_cache[method_name] ||= begin ::Protobuf::Nats.subscription_key(klass, method_name) end end
close_connection()
click to toggle source
# File lib/protobuf/nats/client.rb, line 67 def close_connection # no-op (I think for now), the connection to server is persistent end
formatted_service_and_method_name()
click to toggle source
# File lib/protobuf/nats/client.rb, line 189 def formatted_service_and_method_name klass = @options[:service] method_name = @options[:method] "#{klass}##{method_name}" end
nack_backoff_intervals()
click to toggle source
# File lib/protobuf/nats/client.rb, line 83 def nack_backoff_intervals @nack_backoff_intervals ||= if ::ENV.key?("PB_NATS_CLIENT_NACK_BACKOFF_INTERVALS") ::ENV["PB_NATS_CLIENT_NACK_BACKOFF_INTERVALS"].split(",").map(&:to_i) else [0, 1, 3, 5, 10] end end
nack_backoff_splay()
click to toggle source
# File lib/protobuf/nats/client.rb, line 91 def nack_backoff_splay @nack_backoff_splay ||= if nack_backoff_splay_limit > 0 rand(nack_backoff_splay_limit) else 0 end end
nack_backoff_splay_limit()
click to toggle source
# File lib/protobuf/nats/client.rb, line 99 def nack_backoff_splay_limit @nack_backoff_splay_limit ||= if ::ENV.key?("PB_NATS_CLIENT_NACK_BACKOFF_SPLAY_LIMIT") ::ENV["PB_NATS_CLIENT_NACK_BACKOFF_SPLAY_LIMIT"].to_i else 10 end end
nats_request_with_two_responses(subject, data, opts)
click to toggle source
This is a request that expects two responses.
-
An ACK from the server. We use a shorter timeout.
-
A PB message from the server. We use a longer timoeut.
# File lib/protobuf/nats/client.rb, line 203 def nats_request_with_two_responses(subject, data, opts) # Wait for the ACK from the server ack_timeout = opts[:ack_timeout] || 5 # Wait for the protobuf response timeout = opts[:timeout] || 60 nats = ::Protobuf::Nats.client_nats_connection # Publish to server with_subscription do |sub_inbox| begin completed_request = false if !sub_inbox.subscription.is_valid # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect) nats.unsubscribe(sub_inbox.subscription) sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary end nats.publish(subject, data, sub_inbox.inbox) # Wait for reply first_message = nats.next_message(sub_inbox.subscription, ack_timeout) return :ack_timeout if first_message.nil? first_message_data = first_message.data return :nack if first_message_data == ::Protobuf::Nats::Messages::NACK second_message = nats.next_message(sub_inbox.subscription, timeout) second_message_data = second_message.nil? ? nil : second_message.data # Check messages response = case ::Protobuf::Nats::Messages::ACK when first_message_data then second_message_data when second_message_data then first_message_data else return :ack_timeout end fail(::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name) unless response completed_request = true response ensure if !completed_request nats.unsubscribe(sub_inbox.subscription) sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary end end end end
new_subscription_inbox()
click to toggle source
# File lib/protobuf/nats/client.rb, line 41 def new_subscription_inbox nats = ::Protobuf::Nats.client_nats_connection inbox = nats.new_inbox sub = if use_subscription_pooling? nats.subscribe(inbox) else nats.subscribe(inbox, :max => 2) end SubscriptionInbox.new(sub, inbox) end
reconnect_delay()
click to toggle source
# File lib/protobuf/nats/client.rb, line 107 def reconnect_delay @reconnect_delay ||= if ::ENV.key?("PB_NATS_CLIENT_RECONNECT_DELAY") ::ENV["PB_NATS_CLIENT_RECONNECT_DELAY"].to_i else ack_timeout end end
response_timeout()
click to toggle source
# File lib/protobuf/nats/client.rb, line 115 def response_timeout @response_timeout ||= if ::ENV.key?("PB_NATS_CLIENT_RESPONSE_TIMEOUT") ::ENV["PB_NATS_CLIENT_RESPONSE_TIMEOUT"].to_i else 60 end end
send_request()
click to toggle source
# File lib/protobuf/nats/client.rb, line 128 def send_request # This will ensure the client is started. ::Protobuf::Nats.start_client_nats_connection if use_subscription_pooling? available = self.class.subscription_pool.instance_variable_get("@available") ::ActiveSupport::Notifications.instrument "client.subscription_pool_available_size.protobuf-nats", available.length end ::ActiveSupport::Notifications.instrument "client.request_duration.protobuf-nats" do send_request_through_nats end end
send_request_through_nats()
click to toggle source
# File lib/protobuf/nats/client.rb, line 142 def send_request_through_nats retries ||= 3 nack_retry ||= 0 loop do setup_connection request_options = {:timeout => response_timeout, :ack_timeout => ack_timeout} @response_data = nats_request_with_two_responses(cached_subscription_key, @request_data, request_options) case @response_data when :ack_timeout ::ActiveSupport::Notifications.instrument "client.request_timeout.protobuf-nats" next if (retries -= 1) > 0 raise ::Protobuf::Nats::Errors::RequestTimeout, formatted_service_and_method_name when :nack ::ActiveSupport::Notifications.instrument "client.request_nack.protobuf-nats" interval = nack_backoff_intervals[nack_retry] nack_retry += 1 raise ::Protobuf::Nats::Errors::RequestTimeout, formatted_service_and_method_name if interval.nil? sleep((interval + nack_backoff_splay)/1000.0) next end break end parse_response rescue ::Protobuf::Nats::Errors::IOException => error ::Protobuf::Nats.log_error(error) delay = reconnect_delay logger.warn "An IOException was raised. We are going to sleep for #{delay} seconds." sleep delay retry if (retries -= 1) > 0 raise end
use_subscription_pooling?()
click to toggle source
# File lib/protobuf/nats/client.rb, line 123 def use_subscription_pooling? return @use_subscription_pooling unless @use_subscription_pooling.nil? @use_subscription_pooling = self.class.subscription_pool_size > 0 end
with_subscription() { |sub_inbox| ... }
click to toggle source
# File lib/protobuf/nats/client.rb, line 53 def with_subscription return_value = nil if use_subscription_pooling? self.class.subscription_pool.with do |sub_inbox| return_value = yield sub_inbox end else return_value = yield new_subscription_inbox end return_value end