class Aerospike::PartitionTracker

Attributes

deadline[R]
iteration[R]
max_records[R]
node_capacity[R]
node_filter[R]
node_partitions_list[R]
partition_begin[R]
partition_filter[R]
partitions[R]
partitions_capacity[R]
sleep_between_retries[RW]
socket_timeout[R]
total_timeout[R]

Public Class Methods

new(policy, nodes, partition_filter = nil) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 24
def initialize(policy, nodes, partition_filter = nil)
  if partition_filter.nil?
    return init_for_node(policy, nodes[0]) if nodes.length == 1
    return init_for_nodes(policy, nodes)
  end

  # Validate here instead of initial PartitionFilter constructor because total number of
  # cluster partitions may change on the server and PartitionFilter will never have access
  # to Cluster instance.  Use fixed number of partitions for now.
  unless partition_filter.partition_begin.between?(0, Node::PARTITIONS - 1)
    raise Aerospike::Exceptions::Aerospike.new(
      Aerospike::ResultCode::PARAMETER_ERROR,
      "Invalid partition begin #{partition_filter.partition_begin}. Valid range: 0-#{Aerospike::Node::PARTITIONS - 1}"
    )
  end

  if partition_filter.count <= 0
    raise Aerospike::Exceptions::Aerospike.new(
      Aerospike::ResultCode::PARAMETER_ERROR,
      "Invalid partition count #{partition_filter.count}"
    )
  end

  if partition_filter.partition_begin + partition_filter.count > Node::PARTITIONS
    raise Aerospike::Exceptions::Aerospike.new(
      Aerospike::ResultCode::PARAMETER_ERROR,
      "Invalid partition range (#{partition_filter.partition_begin}, #{partition_filter.partition_begin + partition_filter.count}"
    )
  end

  @partition_begin = partition_filter.partition_begin
  @node_capacity = nodes.length
  @node_filter = nil
  @partitions_capacity = partition_filter.count
  @max_records = policy.max_records
  @iteration = 1

  if partition_filter.partitions.nil?  then
    partition_filter.partitions = init_partitions(policy, partition_filter.count, partition_filter.digest)
  elsif policy.max_records <= 0
    # Retry all partitions when max_records not specified.
    partition_filter.partitions.each do |ps|
      ps.retry = true
    end
  end

  @partitions = partition_filter.partitions
  @partition_filter = partition_filter
  init_timeout(policy)
end

Public Instance Methods

assign_partitions_to_nodes(cluster, namespace) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 75
def assign_partitions_to_nodes(cluster, namespace)
  list = []

  pmap = cluster.partitions
  replica_array = pmap[namespace]
  raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array

  master = (replica_array.get)[0]
  master = master.get

  @partitions.each do |part|
    if part&.retry
      node = master[part.id]

      unless node
        raise Exceptions::Aerospike.new(Aerospike::ResultCode::INVALID_NAMESPACE, "Invalid Partition Id #{part.id} for namespace `#{namespace}` in Partition Scan")
      end

    part.retry = false


    # Use node name to check for single node equality because
    # partition map may be in transitional state between
    # the old and new node with the same name.
    next if @node_filter && @node_filter.name != node.name

    np = find_node(list, node)

    unless np
      # If the partition map is in a transitional state, multiple
      # node_partitions instances (each with different partitions)
      # may be created for a single node.
      np = NodePartitions.new(node)
      list << np
    end
    np.add_partition(part)
  end
end

if @max_records.positive?
  # Distribute max_records across nodes.
  node_size = list.length

  if @max_records < node_size
    # Only include nodes that have at least 1 record requested.
    node_size = @max_records
    list = list[0...node_size]
  end

  max = 0
  max = @max_records / node_size if node_size.positive?
  rem = @max_records - (max * node_size)

  list[0...node_size].each_with_index do |np, i|
    np.record_max = (i < rem ? max + 1 : max)
  end
end

  @node_partitions_list = list
  list
end
complete?(cluster, policy) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 206
def complete?(cluster, policy)
  record_count = 0
  parts_unavailable = 0

  @node_partitions_list.each do |np|
    record_count += np.record_count
    parts_unavailable += np.parts_unavailable
  end

  if parts_unavailable == 0
    if @max_records <= 0
      @partition_filter&.done = true
    else
      if cluster.supports_partition_query.get()
       done = true

       @node_partitions_list.each do |np|
         if np.record_count >= np.record_max
           mark_retry(np)
           done = false
         end
       end

        @partition_filter&.done = done
      else
        # Server version >= 6.0 will return all records for each node up to
        # that node's max. If node's record count reached max, there stilthen
        # may be records available for that node.
        @node_partitions_list.each do |np|
          mark_retry(np) if np.record_count > 0
        end
        # Servers version < 6.0 can return less records than max and still
        # have more records for each node, so the node is only done if nthen
        # records were retrieved for that node.

        @partition_filter&.done = (record_count == 0)
      end
    end
    return true
  end

  return true if @max_records&.positive? && record_count >= @max_records

  # Check if limits have been reached
  if policy.max_retries.positive? && @iteration > policy.max_retries
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::MAX_RETRIES_EXCEEDED, "Max retries exceeded: #{policy.max_retries}")
  end

  if policy.total_timeout > 0
    # Check for total timeout.
    remaining = @deadline - Time.now - @sleep_between_retries

    raise Aerospike::Exceptions::Timeout.new(policy.totle_timeout, @iteration) if remaining <= 0

    if remaining < @total_timeout
      @total_timeout = remaining

      if @socket_timeout > @total_timeout
        @socket_timeout = @total_timeout
      end
    end
  end

  # Prepare for next iteration.
  if @max_records > 0
    @max_records -= record_count
  end
  @iteration+=1
  false
end
find_node(list, node) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 176
def find_node(list, node)
  list.each do |node_partition|
      # Use pointer equality for performance.
      return node_partition if node_partition.node == node
  end
  nil
end
init_partitions(policy, partition_count, digest) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 149
def init_partitions(policy, partition_count, digest)
  parts_all = Array.new(partition_count)

  (0...partition_count).each do |i|
    parts_all[i] = Aerospike::PartitionStatus.new(@partition_begin + i)
  end

  parts_all[0].digest = digest if digest

  @sleep_between_retries = policy.sleep_between_retries
  @socket_timeout = policy.socket_timeout
  @total_timeout = policy.timeout

  if @total_timeout.positive?
    @deadline = Time.now + @total_timeout

    if @socket_timeout == 0 || @socket_timeout > @total_timeout
      @socket_timeout = @total_timeout
    end
  end

  parts_all
end
init_timeout(policy) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 137
def init_timeout(policy)
  @sleep_between_retries = policy.sleep_between_retries
  @socket_timeout = policy.socket_timeout
  @total_timeout = policy.timeout
  if @total_timeout.positive?
    @deadline = Time.now + @total_timeout
    if !@socket_timeout || @socket_timeout > @total_timeout
      @socket_timeout = @total_timeout
    end
  end
end
mark_retry(node_partitions) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 294
def mark_retry(node_partitions)
  node_partitions.parts_full.each do |ps|
    ps.retry = true
  end

  node_partitions.parts_partial.each do |ps|
    ps.retry = true
  end
end
partition_unavailable(node_partitions, partition_id) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 184
def partition_unavailable(node_partitions, partition_id)
  @partitions[partition_id-@partition_begin].retry = true
  node_partitions.parts_unavailable+=1
end
set_digest(node_partitions, key) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 189
def set_digest(node_partitions, key)
  partition_id = key.partition_id
  @partitions[partition_id-@partition_begin].digest = key.digest
  node_partitions.record_count+=1
end
set_last(node_partitions, key, bval) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 195
def set_last(node_partitions, key, bval)
  partition_id = key.partition_id()
  if partition_id-@partition_begin < 0
    raise "key.partition_id: #{@partition_id}, partition_begin: #{@partition_begin}"
  end
  ps = @partitions[partition_id-@partition_begin]
  ps.digest = key.digest
  ps.bval = bval
  node_partitions.record_count+=1
end
should_retry(node_partitions, err) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 277
def should_retry(node_partitions, err)
  case err
  when Aerospike::Exceptions::Aerospike
    case err.result_code
    when Aerospike::ResultCode::TIMEOUT,
      Aerospike::ResultCode::NETWORK_ERROR,
      Aerospike::ResultCode::SERVER_NOT_AVAILABLE,
      Aerospike::ResultCode::INDEX_NOTFOUND
        mark_retry(node_partitions)
        node_partitions.parts_unavailable = node_partitions.parts_full.length + node_partitions.parts_partial.length
        true
    end
  else
    false
  end
end
to_s() click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 304
def to_s
  sb = StringIO.new
  @partitions.each_with_index do |ps, i|
    sb << ps.to_s
    sb << if (i+1)%16 == 0
      "\n"
          else
      "\t"
          end
  end
  sb.string
end

Private Instance Methods

init_for_node(policy, node) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 334
def init_for_node(policy, node)
  @partition_begin = 0
  @node_capacity = 1
  @node_filter = node
  @partitions_capacity = Aerospike::Node::PARTITIONS
  @max_records = policy.max_records
  @iteration = 1

  @partitions = init_partitions(policy, Aerospike::Node::PARTITIONS, nil)
  init_timeout(policy)
end
init_for_nodes(policy, nodes) click to toggle source
# File lib/aerospike/query/partition_tracker.rb, line 319
def init_for_nodes(policy, nodes)
  ppn = Aerospike::Node::PARTITIONS / nodes.length
  ppn += ppn / 4

  @partition_begin = 0
  @node_capacity = nodes.length
  @node_filter = nil
  @partitions_capacity = ppn
  @max_records = policy.max_records
  @iteration = 1

  @partitions = init_partitions(policy, Aerospike::Node::PARTITIONS, nil)
  init_timeout(policy)
end