class Fluent::Plugin::ForwardOutput::AckHandler

Constants

ACKWaitingSockInfo
Ack

Public Class Methods

new(timeout:, log:, read_length:) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 31
def initialize(timeout:, log:, read_length:)
  @mutex = Mutex.new
  @ack_waitings = []
  @timeout = timeout
  @log = log
  @read_length = read_length
  @unpacker = Fluent::MessagePackFactory.msgpack_unpacker
end

Public Instance Methods

collect_response(select_interval) { |nil, nil, nil, ret| ... } click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 40
def collect_response(select_interval)
  now = Fluent::Clock.now
  sockets = []
  results = []
  begin
    new_list = []
    @mutex.synchronize do
      @ack_waitings.each do |info|
        if info.expired?(now)
          # There are 2 types of cases when no response has been received from socket:
          # (1) the node does not support sending responses
          # (2) the node does support sending response but responses have not arrived for some reasons.
          @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port
          results << [info, Result::FAILED]
        else
          sockets << info.sock
          new_list << info
        end
      end
      @ack_waitings = new_list
    end

    begin
      readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
    rescue IOError
      @log.info "connection closed while waiting for readable sockets"
      readable_sockets = nil
    end

    if readable_sockets
      readable_sockets.each do |sock|
        results << read_ack_from_sock(sock)
      end
    end

    results.each do |info, ret|
      if info.nil?
        yield nil, nil, nil, ret
      else
        yield info.chunk_id, info.node, info.sock, ret
      end
    end
  rescue => e
    @log.error 'unexpected error while receiving ack', error: e
    @log.error_backtrace
  end
end
create_ack(chunk_id, node) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 100
def create_ack(chunk_id, node)
  Ack.new(chunk_id, node, self)
end
enqueue(node, sock, cid) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 104
def enqueue(node, sock, cid)
  info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout)
  @mutex.synchronize do
    @ack_waitings << info
  end
end

Private Instance Methods

delete(info) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 169
def delete(info)
  @mutex.synchronize do
    @ack_waitings.delete(info)
  end
end
dump_unique_id_hex(unique_id) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 159
def dump_unique_id_hex(unique_id)
  Fluent::UniqueId.hex(unique_id)
end
find(sock) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 163
def find(sock)
  @mutex.synchronize do
    @ack_waitings.find { |info| info.sock == sock }
  end
end
read_ack_from_sock(sock) click to toggle source
# File lib/fluent/plugin/out_forward/ack_handler.rb, line 113
def read_ack_from_sock(sock)
  begin
    raw_data = sock.instance_of?(Fluent::PluginHelper::Socket::WrappedSocket::TLS) ? sock.readpartial(@read_length) : sock.recv(@read_length)
  rescue Errno::ECONNRESET, EOFError # ECONNRESET for #recv, #EOFError for #readpartial
    raw_data = ''
  rescue IOError
    @log.info "socket closed while receiving ack response"
    return nil, Result::FAILED
  end

  info = find(sock)

  if info.nil?
    # The info can be deleted by another thread during `sock.recv()` and `find()`.
    # This is OK since another thread has completed to process the ack, so we can skip this.
    # Note: exclusion mechanism about `collect_response()` may need to be considered.
    @log.debug "could not find the ack info. this ack may be processed by another thread."
    return nil, Result::FAILED
  elsif raw_data.empty?
    # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF.
    # If this happens we assume the data wasn't delivered and retry it.
    @log.warn 'destination node closed the connection. regard it as unavailable.', host: info.node.host, port: info.node.port
    # info.node.disable!
    return info, Result::FAILED
  else
    @unpacker.feed(raw_data)
    res = @unpacker.read
    @log.trace 'getting response from destination', host: info.node.host, port: info.node.port, chunk_id: dump_unique_id_hex(info.chunk_id), response: res
    if res['ack'] != info.chunk_id_base64
      # Some errors may have occurred when ack and chunk id is different, so send the chunk again.
      @log.warn 'ack in response and chunk id in sent data are different', chunk_id: dump_unique_id_hex(info.chunk_id), ack: res['ack']
      return info, Result::CHUNKID_UNMATCHED
    else
      @log.trace 'got a correct ack response', chunk_id: dump_unique_id_hex(info.chunk_id)
    end

    return info, Result::SUCCESS
  end
rescue => e
  @log.error 'unexpected error while receiving ack message', error: e
  @log.error_backtrace
  [nil, Result::FAILED]
ensure
  delete(info)
end