class GRPC::RpcServer

RpcServer hosts a number of services and makes them available on the network.

Constants

DEFAULT_MAX_WAITING_REQUESTS

Deprecated due to internal changes to the thread pool

DEFAULT_POLL_PERIOD

Default poll period is 1s

DEFAULT_POOL_SIZE

Default thread pool size is 30

SIGNAL_CHECK_PERIOD

Signal check period is 0.25s

Private Class Methods

new(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, connect_md_proc: nil, server_args: {}, interceptors: []) click to toggle source

Creates a new RpcServer.

The RPC server is configured using keyword arguments.

There are some specific keyword args used to configure the RpcServer instance.

  • pool_size: the size of the thread pool the server uses to run its

threads. No more concurrent requests can be made than the size of the thread pool

  • max_waiting_requests: Deprecated due to internal changes to the thread

pool. This is still an argument for compatibility but is ignored.

  • poll_period: The amount of time in seconds to wait for

currently-serviced RPC's to finish before cancelling them when shutting down the server.

  • pool_keep_alive: The amount of time in seconds to wait

for currently busy thread-pool threads to finish before forcing an abrupt exit to each thread.

  • connect_md_proc:

when non-nil is a proc for determining metadata to send back the client on receiving an invocation req. The proc signature is:

{key: val, ..} func(method_name, {key: val, ...})
  • server_args:

A server arguments hash to be passed down to the underlying core server

  • interceptors:

An array of GRPC::ServerInterceptor objects that will be used for intercepting server handlers to provide extra functionality. Interceptors are an EXPERIMENTAL API.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 217
def initialize(pool_size: DEFAULT_POOL_SIZE,
               max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
               poll_period: DEFAULT_POLL_PERIOD,
               pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE,
               connect_md_proc: nil,
               server_args: {},
               interceptors: [])
  @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
  @max_waiting_requests = max_waiting_requests
  @poll_period = poll_period
  @pool_size = pool_size
  @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
  @run_cond = ConditionVariable.new
  @run_mutex = Mutex.new
  # running_state can take 4 values: :not_started, :running, :stopping, and
  # :stopped. State transitions can only proceed in that order.
  @running_state = :not_started
  @server = Core::Server.new(server_args)
  @interceptors = InterceptorRegistry.new(interceptors)
end
setup_connect_md_proc(a_proc) click to toggle source

setup_connect_md_proc is used by initialize to validate the connect_md_proc.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 176
def self.setup_connect_md_proc(a_proc)
  return nil if a_proc.nil?
  fail(TypeError, '!Proc') unless a_proc.is_a? Proc
  a_proc
end

Private Instance Methods

add_rpc_descs_for(service) click to toggle source

This should be called while holding @run_mutex

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 534
def add_rpc_descs_for(service)
  cls = service.is_a?(Class) ? service : service.class
  specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
  cls.rpc_descs.each_pair do |name, spec|
    route = "/#{cls.service_name}/#{name}".to_sym
    fail "already registered: rpc #{route} from #{spec}" if specs.key? route
    specs[route] = spec
    rpc_name = GenericService.underscore(name.to_s).to_sym
    if service.is_a?(Class)
      handlers[route] = cls.new.method(rpc_name)
    else
      handlers[route] = service.method(rpc_name)
    end
    GRPC.logger.info("handling #{route} with #{handlers[route]}")
  end
end
assert_valid_service_class(cls) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 525
def assert_valid_service_class(cls)
  unless cls.include?(GenericService)
    fail "#{cls} must 'include GenericService'"
  end
  fail "#{cls} should specify some rpc descriptions" if
    cls.rpc_descs.size.zero?
end
available?(an_rpc) click to toggle source

Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 418
def available?(an_rpc)
  return an_rpc if @pool.ready_for_work?
  GRPC.logger.warn('no free worker threads currently')
  noop = proc { |x| x }

  # Create a new active call that knows that metadata hasn't been
  # sent yet
  c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
                     metadata_received: true, started: false)
  c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
                'No free threads in thread pool')
  nil
end
handle(service) click to toggle source

handle registration of classes

service is either a class that includes GRPC::GenericService and whose new function can be called without argument or any instance of such a class.

E.g, after

class Divider

include GRPC::GenericService
rpc :div DivArgs, DivReply    # single request, single response
def initialize(optional_arg='default option') # no args
  ...
end

srv = GRPC::RpcServer.new(...)

# Either of these works

srv.handle(Divider)

# or

srv.handle(Divider.new('replace optional arg'))

It raises RuntimeError:

  • if service is not valid service class or object

  • its handler methods are already registered

  • if the server is already running

@param service [Object|Class] a service class or object as described

above
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 333
def handle(service)
  @run_mutex.synchronize do
    unless @running_state == :not_started
      fail 'cannot add services if the server has been started'
    end
    cls = service.is_a?(Class) ? service : service.class
    assert_valid_service_class(cls)
    add_rpc_descs_for(service)
  end
end
implemented?(an_rpc) click to toggle source

Sends UNIMPLEMENTED if the method is not implemented by this server

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 433
def implemented?(an_rpc)
  mth = an_rpc.method.to_sym
  return an_rpc if rpc_descs.key?(mth)
  GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
  noop = proc { |x| x }

  # Create a new active call that knows that
  # metadata hasn't been sent yet
  c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
                     metadata_received: true, started: false)
  c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
  nil
end
loop_handle_server_calls() click to toggle source

handles calls to the server

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 448
def loop_handle_server_calls
  fail 'not started' if running_state == :not_started
  while running_state == :running
    begin
      an_rpc = @server.request_call
      break if (!an_rpc.nil?) && an_rpc.call.nil?
      active_call = new_active_server_call(an_rpc)
      unless active_call.nil?
        @pool.schedule(active_call) do |ac|
          c, mth = ac
          begin
            rpc_descs[mth].run_server_method(
              c,
              rpc_handlers[mth],
              @interceptors.build_context
            )
          rescue StandardError
            c.send_status(GRPC::Core::StatusCodes::INTERNAL,
                          'Server handler failed')
          end
        end
      end
    rescue Core::CallError, RuntimeError => e
      # these might happen for various reasons.  The correct behavior of
      # the server is to log them and continue, if it's not shutting down.
      if running_state == :running
        GRPC.logger.warn("server call failed: #{e}")
      end
      next
    end
  end
  # @running_state should be :stopping here
  @run_mutex.synchronize do
    transition_running_state(:stopped)
    GRPC.logger.info("stopped: #{self}")
    @server.close
  end
end
new_active_server_call(an_rpc) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 487
def new_active_server_call(an_rpc)
  return nil if an_rpc.nil? || an_rpc.call.nil?

  # allow the metadata to be accessed from the call
  an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
  connect_md = nil
  unless @connect_md_proc.nil?
    connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
  end

  return nil unless available?(an_rpc)
  return nil unless implemented?(an_rpc)

  # Create the ActiveCall. Indicate that metadata hasnt been sent yet.
  GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
  rpc_desc = rpc_descs[an_rpc.method.to_sym]
  c = ActiveCall.new(an_rpc.call,
                     rpc_desc.marshal_proc,
                     rpc_desc.unmarshal_proc(:input),
                     an_rpc.deadline,
                     metadata_received: true,
                     started: false,
                     metadata_to_send: connect_md)
  c.attach_peer_cert(an_rpc.call.peer_cert)
  mth = an_rpc.method.to_sym
  [c, mth]
end
rpc_descs() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 517
def rpc_descs
  @rpc_descs ||= {}
end
rpc_handlers() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 521
def rpc_handlers
  @rpc_handlers ||= {}
end
run() click to toggle source

runs the server

  • if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.

  • running? returns true after this is called, until stop cause the the server to stop.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 351
def run
  @run_mutex.synchronize do
    fail 'cannot run without registering services' if rpc_descs.size.zero?
    @pool.start
    @server.start
    transition_running_state(:running)
    @run_cond.broadcast
  end
  loop_handle_server_calls
end
Also aliased as: run_till_terminated
run_till_terminated()
Alias for: run
run_till_terminated_or_interrupted(signals, wait_interval = 60) click to toggle source

runs the server with signal handlers @param signals

List of String, Integer or both representing signals that the user
would like to send to the server for graceful shutdown

@param wait_interval (optional)

Integer seconds that user would like stop_server_thread to poll
stop_server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 371
def run_till_terminated_or_interrupted(signals, wait_interval = 60)
  @stop_server = false
  @stop_server_mu = Mutex.new
  @stop_server_cv = ConditionVariable.new

  @stop_server_thread = Thread.new do
    loop do
      break if @stop_server
      @stop_server_mu.synchronize do
        @stop_server_cv.wait(@stop_server_mu, wait_interval)
      end
    end

    # stop is surrounded by mutex, should handle multiple calls to stop
    #   correctly
    stop
  end

  valid_signals = Signal.list

  # register signal handlers
  signals.each do |sig|
    # input validation
    target_sig = if sig.class == String
                   # cut out the SIG prefix to see if valid signal
                   sig.upcase.start_with?('SIG') ? sig.upcase[3..-1] : sig.upcase
                 else
                   sig
                 end

    # register signal traps for all valid signals
    if valid_signals.value?(target_sig) || valid_signals.key?(target_sig)
      Signal.trap(target_sig) do
        @stop_server = true
        @stop_server_cv.broadcast
      end
    else
      fail "#{target_sig} not a valid signal"
    end
  end

  run

  @stop_server_thread.join
end
running?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 280
def running?
  running_state == :running
end
running_state() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 260
def running_state
  @run_mutex.synchronize do
    return @running_state
  end
end
stop() click to toggle source

stops a running server

the call has no impact if the server is already stopped, otherwise server's current call loop is it's last.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 242
def stop
  # if called via run_till_terminated_or_interrupted,
  #   signal stop_server_thread and don't do anything
  if @stop_server.nil? == false && @stop_server == false
    @stop_server = true
    @stop_server_cv.broadcast
    return
  end
  @run_mutex.synchronize do
    fail 'Cannot stop before starting' if @running_state == :not_started
    return if @running_state != :running
    transition_running_state(:stopping)
    deadline = from_relative_time(@poll_period)
    @server.shutdown_and_notify(deadline)
  end
  @pool.stop
end
stopped?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 284
def stopped?
  running_state == :stopped
end
transition_running_state(target_state) click to toggle source

Can only be called while holding @run_mutex

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 267
def transition_running_state(target_state)
  state_transitions = {
    not_started: :running,
    running: :stopping,
    stopping: :stopped
  }
  if state_transitions[@running_state] == target_state
    @running_state = target_state
  else
    fail "Bad server state transition: #{@running_state}->#{target_state}"
  end
end
wait_till_running(timeout = nil) click to toggle source

Is called from other threads to wait for run to start up the server.

If run has not been called, this returns immediately.

@param timeout [Numeric] number of seconds to wait @return [true, false] true if the server is running, false otherwise

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 294
def wait_till_running(timeout = nil)
  @run_mutex.synchronize do
    @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
    return @running_state == :running
  end
end