class Protobuf::Nats::Server
Constants
- MILLISECOND
Attributes
nats[R]
subscriptions[R]
thread_pool[R]
Public Class Methods
new(options)
click to toggle source
# File lib/protobuf/nats/server.rb, line 17 def initialize(options) @options = options @processing_requests = true @running = true @stopped = false @nats = @options[:client] || ::Protobuf::Nats::NatsClient.new @nats.connect(::Protobuf::Nats.config.connection_options) @thread_pool = ::Protobuf::Nats::ThreadPool.new(@options[:threads], :max_queue => max_queue_size) @subscriptions = [] @server = options.fetch(:server, ::Socket.gethostname) end
Public Instance Methods
detect_and_handle_a_pause()
click to toggle source
# File lib/protobuf/nats/server.rb, line 161 def detect_and_handle_a_pause case # If we are taking requests and detect a pause file, then unsubscribe. when @processing_requests && paused? @processing_requests = false logger.warn("Pausing server!") unsubscribe # If we were paused and the pause file is no longer present, then subscribe again. when !@processing_requests && !paused? logger.warn("Resuming server: resubscribing to all services and restarting slow start!") @processing_requests = true subscribe end end
do_not_subscribe_to_includes?(subscription_key)
click to toggle source
# File lib/protobuf/nats/server.rb, line 85 def do_not_subscribe_to_includes?(subscription_key) return false unless ::Protobuf::Nats.config.server_subscription_key_do_not_subscribe_to_when_includes_any_of.respond_to?(:any?) return false if ::Protobuf::Nats.config.server_subscription_key_do_not_subscribe_to_when_includes_any_of.empty? ::Protobuf::Nats.config.server_subscription_key_do_not_subscribe_to_when_includes_any_of.any? do |key| subscription_key.include?(key) end end
enqueue_request(request_data, reply_id)
click to toggle source
# File lib/protobuf/nats/server.rb, line 48 def enqueue_request(request_data, reply_id) ::ActiveSupport::Notifications.instrument "server.message_received.protobuf-nats" enqueued_at = ::Time.now was_enqueued = thread_pool.push do begin # Instrument the thread pool time-to-execute duration. processed_at = ::Time.now ::ActiveSupport::Notifications.instrument("server.thread_pool_execution_delay.protobuf-nats", (processed_at - enqueued_at) * MILLISECOND) # Process request. response_data = handle_request(request_data, 'server' => @server) # Publish response. nats.publish(reply_id, response_data) rescue => error ::Protobuf::Nats.notify_error_callbacks(error) ensure # Instrument the request duration. completed_at = ::Time.now ::ActiveSupport::Notifications.instrument("server.request_duration.protobuf-nats", (completed_at - enqueued_at) * MILLISECOND) end end # Publish an ACK to signal the server has picked up the work. if was_enqueued nats.publish(reply_id, ::Protobuf::Nats::Messages::ACK) else ::ActiveSupport::Notifications.instrument "server.message_dropped.protobuf-nats" nats.publish(reply_id, ::Protobuf::Nats::Messages::NACK) end was_enqueued end
finish_slow_start()
click to toggle source
Slow start subscriptions by adding X rounds of subz every Y seconds, where X is subscriptions_per_rpc_endpoint
and Y is slow_start_delay.
# File lib/protobuf/nats/server.rb, line 144 def finish_slow_start logger.info "Slow start has started..." completed = 1 # We have (X - 1) here because we always subscribe at least once. (subscriptions_per_rpc_endpoint - 1).times do next unless @running next if paused? completed += 1 sleep slow_start_delay subscribe_to_services_once logger.info "Slow start adding another round of subscriptions (#{completed}/#{subscriptions_per_rpc_endpoint})..." end logger.info "Slow start finished." end
max_queue_size()
click to toggle source
# File lib/protobuf/nats/server.rb, line 32 def max_queue_size ::ENV.fetch("PB_NATS_SERVER_MAX_QUEUE_SIZE", @options[:threads]).to_i end
only_subscribe_to_includes?(subscription_key)
click to toggle source
# File lib/protobuf/nats/server.rb, line 94 def only_subscribe_to_includes?(subscription_key) return true unless ::Protobuf::Nats.config.server_subscription_key_only_subscribe_to_when_includes_any_of.respond_to?(:any?) return true if ::Protobuf::Nats.config.server_subscription_key_only_subscribe_to_when_includes_any_of.empty? ::Protobuf::Nats.config.server_subscription_key_only_subscribe_to_when_includes_any_of.any? do |key| subscription_key.include?(key) end end
pause_file_path()
click to toggle source
# File lib/protobuf/nats/server.rb, line 103 def pause_file_path ::ENV.fetch("PB_NATS_SERVER_PAUSE_FILE_PATH", nil) end
paused?()
click to toggle source
# File lib/protobuf/nats/server.rb, line 177 def paused? !pause_file_path.nil? && ::File.exist?(pause_file_path) end
print_subscription_keys()
click to toggle source
# File lib/protobuf/nats/server.rb, line 107 def print_subscription_keys logger.info "Creating subscriptions:" with_each_subscription_key do |subscription_key| logger.info " - #{subscription_key}" end end
run() { || ... }
click to toggle source
# File lib/protobuf/nats/server.rb, line 181 def run nats.on_reconnect do logger.warn "Server NATS connection was reconnected" end nats.on_disconnect do logger.warn "Server NATS connection was disconnected" end nats.on_error do |error| ::Protobuf::Nats.notify_error_callbacks(error) end nats.on_close do logger.warn "Server NATS connection was closed" end print_subscription_keys if paused? yield if block_given? else subscribe { yield if block_given? } end loop do break unless @running detect_and_handle_a_pause sleep 1 end unsubscribe logger.info "Waiting up to 60 seconds for the thread pool to finish shutting down..." thread_pool.shutdown thread_pool.wait_for_termination(60) ensure @stopped = true end
running?()
click to toggle source
# File lib/protobuf/nats/server.rb, line 220 def running? @stopped end
service_klasses()
click to toggle source
# File lib/protobuf/nats/server.rb, line 44 def service_klasses ::Protobuf::Rpc::Service.implemented_services.map(&:safe_constantize) end
slow_start_delay()
click to toggle source
# File lib/protobuf/nats/server.rb, line 36 def slow_start_delay @slow_start_delay ||= ::ENV.fetch("PB_NATS_SERVER_SLOW_START_DELAY", 10).to_i end
stop()
click to toggle source
# File lib/protobuf/nats/server.rb, line 224 def stop @running = false end
subscribe() { || ... }
click to toggle source
# File lib/protobuf/nats/server.rb, line 228 def subscribe subscribe_to_services_once yield if block_given? finish_slow_start end
subscribe_to_services_once()
click to toggle source
# File lib/protobuf/nats/server.rb, line 115 def subscribe_to_services_once with_each_subscription_key do |subscription_key_and_queue| subscriptions << nats.subscribe(subscription_key_and_queue, :queue => subscription_key_and_queue) do |request_data, reply_id, _subject| unless enqueue_request(request_data, reply_id) logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } end end end end
subscriptions_per_rpc_endpoint()
click to toggle source
# File lib/protobuf/nats/server.rb, line 40 def subscriptions_per_rpc_endpoint @subscriptions_per_rpc_endpoint ||= ::ENV.fetch("PB_NATS_SERVER_SUBSCRIPTIONS_PER_RPC_ENDPOINT", 10).to_i end
unsubscribe()
click to toggle source
# File lib/protobuf/nats/server.rb, line 234 def unsubscribe logger.info "Unsubscribing from rpc routes..." subscriptions.each do |subscription_id| nats.unsubscribe(subscription_id) end end
with_each_subscription_key() { |subscription_key| ... }
click to toggle source
# File lib/protobuf/nats/server.rb, line 125 def with_each_subscription_key fail ::ArgumentError unless block_given? service_klasses.each do |service_klass| service_klass.rpcs.each do |service_method, _| # Skip services that are not implemented. next unless service_klass.method_defined?(service_method) subscription_key = ::Protobuf::Nats.subscription_key(service_klass, service_method) next if do_not_subscribe_to_includes?(subscription_key) next unless only_subscribe_to_includes?(subscription_key) yield subscription_key end end end