class Kafka::Connection
A connection to a single Kafka
broker.
Usually you’ll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consume from.
## Instrumentation
Connections emit a ‘request.connection.kafka` notification on each request. The following keys will be found in the payload:
-
‘:api` — the name of the API being invoked.
-
‘:request_size` — the number of bytes in the request.
-
‘:response_size` — the number of bytes in the response.
The notification also includes the duration of the request.
Constants
- CONNECT_TIMEOUT
- IDLE_TIMEOUT
Time after which an idle connection will be reopened.
- SOCKET_TIMEOUT
Attributes
Public Class Methods
Opens a connection to a Kafka
broker.
@param host [String] the hostname of the broker. @param port [Integer] the port of the broker. @param client_id [String] the client id is a user-specified string sent in each
request to help trace calls and should logically identify the application making the request.
@param logger [Logger] the logger used to log trace messages. @param connect_timeout [Integer] the socket timeout for connecting to the broker.
Default is 10 seconds.
@param socket_timeout [Integer] the socket timeout for reading and writing to the
broker. Default is 10 seconds.
@return [Connection] a new connection.
# File lib/kafka/connection.rb, line 53 def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context @socket = nil @last_request = nil end
Public Instance Methods
# File lib/kafka/connection.rb, line 74 def close @logger.debug "Closing socket to #{to_s}" @socket.close if @socket end
# File lib/kafka/connection.rb, line 70 def open? !@socket.nil? && !@socket.closed? end
Sends a request over the connection.
@param request [#encode, response_class] the request that should be
encoded and written.
@return [Object] the response.
# File lib/kafka/connection.rb, line 86 def send_request(request) api_name = Protocol.api_name(request.api_key) # Default notification payload. notification = { broker_host: @host, api: api_name, request_size: 0, response_size: 0, } raise IdleConnection if idle? @logger.push_tags(api_name) @instrumenter.instrument("request.connection", notification) do open unless open? @correlation_id += 1 @logger.debug "Sending #{api_name} API request #{@correlation_id} to #{to_s}" write_request(request, notification) response_class = request.response_class response = wait_for_response(response_class, notification) unless response_class.nil? @last_request = Time.now response end rescue SystemCallError, EOFError, IOError => e close raise ConnectionError, "Connection error #{e.class}: #{e}" ensure @logger.pop_tags end
# File lib/kafka/connection.rb, line 66 def to_s "#{@host}:#{@port}" end
Private Instance Methods
# File lib/kafka/connection.rb, line 150 def idle? @last_request && @last_request < Time.now - IDLE_TIMEOUT end
# File lib/kafka/connection.rb, line 126 def open @logger.debug "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." if @ssl_context @socket = SSLSocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout, ssl_context: @ssl_context) else @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) end @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 @last_request = nil rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e @logger.error "Failed to connect to #{self}: #{e}" raise ConnectionError, e end
Reads a response from the connection.
@param response_class [#decode] an object that can decode the response from
a given Decoder.
@return [nil]
# File lib/kafka/connection.rb, line 185 def read_response(response_class, notification) @logger.debug "Waiting for response #{@correlation_id} from #{to_s}" data = @decoder.bytes notification[:response_size] = data.bytesize buffer = StringIO.new(data) response_decoder = Kafka::Protocol::Decoder.new(buffer) correlation_id = response_decoder.int32 response = response_class.decode(response_decoder) @logger.debug "Received response #{correlation_id} from #{to_s}" return correlation_id, response rescue Errno::ETIMEDOUT @logger.error "Timed out while waiting for response #{@correlation_id}" raise end
# File lib/kafka/connection.rb, line 205 def wait_for_response(response_class, notification) loop do correlation_id, response = read_response(response_class, notification) # There may have been a previous request that timed out before the client # was able to read the response. In that case, the response will still be # sitting in the socket waiting to be read. If the response we just read # was to a previous request, we can safely skip it. if correlation_id < @correlation_id @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" elsif correlation_id > @correlation_id raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" else return response end end end
Writes a request over the connection.
@param request [#encode] the request that should be encoded and written.
@return [nil]
# File lib/kafka/connection.rb, line 159 def write_request(request, notification) message = Kafka::Protocol::RequestMessage.new( api_key: request.api_key, api_version: request.respond_to?(:api_version) ? request.api_version : 0, correlation_id: @correlation_id, client_id: @client_id, request: request, ) data = Kafka::Protocol::Encoder.encode_with(message) notification[:request_size] = data.bytesize @encoder.write_bytes(data) nil rescue Errno::ETIMEDOUT @logger.error "Timed out while writing request #{@correlation_id}" raise end