class Aerospike::Cluster
Attributes
aliases[R]
client_policy[R]
cluster_id[R]
cluster_name[R]
connection_queue_size[R]
connection_timeout[R]
features[R]
password[R]
rack_aware[RW]
rack_id[RW]
session_expiration[RW]
session_token[RW]
tls_options[R]
user[R]
Public Class Methods
new(policy, hosts)
click to toggle source
# File lib/aerospike/cluster.rb, line 29 def initialize(policy, hosts) @client_policy = policy @cluster_seeds = hosts @fail_if_not_connected = policy.fail_if_not_connected @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout @tend_interval = policy.tend_interval @cluster_name = policy.cluster_name @tls_options = policy.tls @rack_aware = policy.rack_aware @rack_id = policy.rack_id @replica_index = Atomic.new(0) @aliases = {} @cluster_nodes = [] @partition_write_map = {} @node_index = Atomic.new(0) @features = Atomic.new(Set.new) @closed = Atomic.new(true) @mutex = Mutex.new @cluster_config_change_listeners = Atomic.new([]) @old_node_count = 0 # setup auth info for cluster if policy.requires_authentication @user = policy.user @password = LoginCommand.hash_password(policy.password) end initialize_tls_host_names(hosts) if tls_enabled? if policy.min_connections_per_node > policy.max_connections_per_node raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR, "Invalid policy configuration: Minimum connections per node cannot be greater than maximum connections per node.") end end
Public Instance Methods
add_alias(host, node)
click to toggle source
# File lib/aerospike/cluster.rb, line 573 def add_alias(host, node) if host && node @mutex.synchronize do @aliases[host] = node end end end
add_aliases(node)
click to toggle source
# File lib/aerospike/cluster.rb, line 623 def add_aliases(node) # Add node's aliases to global alias set. # Aliases are only used in tend thread, so synchronization is not necessary. node.aliases.each do |aliass| @aliases[aliass] = node end end
add_cluster_config_change_listener(listener)
click to toggle source
# File lib/aerospike/cluster.rb, line 353 def add_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.push(listener) end end
add_nodes(nodes_to_add)
click to toggle source
# File lib/aerospike/cluster.rb, line 614 def add_nodes(nodes_to_add) # Add all nodes at once to avoid copying entire array multiple times. nodes_to_add.each do |node| add_aliases(node) end add_nodes_copy(nodes_to_add) end
add_nodes_copy(nodes_to_add)
click to toggle source
# File lib/aerospike/cluster.rb, line 631 def add_nodes_copy(nodes_to_add) @mutex.synchronize do @cluster_nodes.concat(nodes_to_add) end end
add_seeds(hosts)
click to toggle source
# File lib/aerospike/cluster.rb, line 102 def add_seeds(hosts) @mutex.synchronize do @cluster_seeds.concat(hosts) end end
batch_read_node(partition, replica_policy)
click to toggle source
Returns a node on the cluster for read operations
# File lib/aerospike/cluster.rb, line 121 def batch_read_node(partition, replica_policy) case replica_policy when Aerospike::Replica::MASTER, Aerospike::Replica::SEQUENCE master_node(partition) when Aerospike::Replica::MASTER_PROLES master_proles_node(partition) when Aerospike::Replica::PREFER_RACK rack_node(partition, seq) when Aerospike::Replica::RANDOM random_node else raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value") end end
change_password(user, password)
click to toggle source
# File lib/aerospike/cluster.rb, line 348 def change_password(user, password) # change password ONLY if the user is the same @password = password if @user == user end
close()
click to toggle source
Closes all cached connections to the cluster nodes and stops the tend thread
# File lib/aerospike/cluster.rb, line 305 def close return if @closed.value # send close signal to maintenance channel @closed.value = true @tend_thread.kill nodes.each(&:close) end
connect()
click to toggle source
# File lib/aerospike/cluster.rb, line 67 def connect wait_till_stablized if @fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike, Aerospike::ResultCode::SERVER_NOT_AVAILABLE end launch_tend_thread Aerospike.logger.info('New cluster initialized and ready to be used...') end
connected?()
click to toggle source
# File lib/aerospike/cluster.rb, line 114 def connected? # Must copy array reference for copy on write semantics to work. node_array = nodes (node_array.length > 0) && !@closed.value end
create_connection(host)
click to toggle source
# File lib/aerospike/cluster.rb, line 595 def create_connection(host) ::Aerospike::Cluster::CreateConnection.(self, host) end
create_node(nv)
click to toggle source
# File lib/aerospike/cluster.rb, line 589 def create_node(nv) node = ::Aerospike::Node.new(self, nv) node.fill_connection_pool_up_to(@client_policy.min_connections_per_node) node end
credentials_given?()
click to toggle source
# File lib/aerospike/cluster.rb, line 79 def credentials_given? !(@user.nil? || @user.empty?) end
find_alias(aliass)
click to toggle source
# File lib/aerospike/cluster.rb, line 314 def find_alias(aliass) @mutex.synchronize do @aliases[aliass] end end
find_node_by_name(node_name)
click to toggle source
# File lib/aerospike/cluster.rb, line 699 def find_node_by_name(node_name) nodes.detect { |node| node.name == node_name } end
find_node_in_partition_map(filter)
click to toggle source
# File lib/aerospike/cluster.rb, line 603 def find_node_in_partition_map(filter) partitions_list = partitions partitions_list.values.each do |replica_array| replica_array.get.each do |node_array| return true if node_array.value.any? { |node| node == filter } end end false end
find_node_name(list, name)
click to toggle source
Finds a node by name in a list of nodes
# File lib/aerospike/cluster.rb, line 569 def find_node_name(list, name) list.any? { |node| node.name == name } end
find_nodes_to_remove(refresh_count)
click to toggle source
# File lib/aerospike/cluster.rb, line 599 def find_nodes_to_remove(refresh_count) FindNodesToRemove.(self, refresh_count) end
get_node_by_name(node_name)
click to toggle source
Find a node by name and returns an error if not found
# File lib/aerospike/cluster.rb, line 296 def get_node_by_name(node_name) node = find_node_by_name(node_name) raise Aerospike::Exceptions::InvalidNode unless node node end
get_node_for_key(replica_policy, key, is_write: false)
click to toggle source
# File lib/aerospike/cluster.rb, line 239 def get_node_for_key(replica_policy, key, is_write: false) partition = Partition.new_by_key(key) if is_write master_node(partition) else batch_read_node(partition, replica_policy) end end
initialize_tls_host_names(hosts)
click to toggle source
# File lib/aerospike/cluster.rb, line 96 def initialize_tls_host_names(hosts) hosts.each do |host| host.tls_name ||= cluster_id.nil? ? host.name : cluster_id end end
inspect()
click to toggle source
# File lib/aerospike/cluster.rb, line 365 def inspect "#<Aerospike::Cluster @cluster_nodes=#{@cluster_nodes}>" end
launch_tend_thread()
click to toggle source
# File lib/aerospike/cluster.rb, line 369 def launch_tend_thread @tend_thread = Thread.new do Thread.current.abort_on_exception = false loop do tend sleep(@tend_interval / 1000.0) rescue => e Aerospike.logger.error("Exception occured during tend: #{e}") Aerospike.logger.debug { e.backtrace.join("\n") } end end end
log_tend_stats(nodes)
click to toggle source
# File lib/aerospike/cluster.rb, line 458 def log_tend_stats(nodes) diff = nodes.size - @old_node_count action = "#{diff.abs} #{diff.abs == 1 ? 'node has' : 'nodes have'} #{diff > 0 ? 'joined' : 'left'} the cluster." Aerospike.logger.info("Tend finished. #{action} Old node count: #{@old_node_count}, New node count: #{nodes.size}") @old_node_count = nodes.size end
master_node(partition)
click to toggle source
Returns a node on the cluster for read operations
# File lib/aerospike/cluster.rb, line 155 def master_node(partition) partition_map = partitions replica_array = partition_map[partition.namespace] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array node_array = replica_array.get[0] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array node = node_array.get[partition.partition_id] raise Aerospike::Exceptions::InvalidNode if !node || !node.active? node end
master_proles_node(partition)
click to toggle source
Returns a node on the cluster for read operations
# File lib/aerospike/cluster.rb, line 202 def master_proles_node(partition) partition_map = partitions replica_array = partition_map[partition.namespace] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get node = nil for replica in replica_array idx = (@replica_index.update { |v| v.succ } % replica_array.size).abs node = replica_array[idx].get[partition.partition_id] return node if node && node.active? end raise Aerospike::Exceptions::InvalidNode end
node_exists(search, node_list)
click to toggle source
# File lib/aerospike/cluster.rb, line 695 def node_exists(search, node_list) node_list.any? { |node| node == search } end
node_partitions(node, namespace)
click to toggle source
Returns partitions pertaining to a node
# File lib/aerospike/cluster.rb, line 249 def node_partitions(node, namespace) res = [] partition_map = partitions replica_array = partition_map[namespace] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array node_array = replica_array.get[0] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array pid = 0 for tnode in node_array.get res << pid if node == tnode pid+=1 end res end
nodes()
click to toggle source
Returns a list of all nodes in the cluster
# File lib/aerospike/cluster.rb, line 288 def nodes @mutex.synchronize do # Must copy array reference for copy on write semantics to work. @cluster_nodes.dup end end
notify_cluster_config_changed()
click to toggle source
# File lib/aerospike/cluster.rb, line 507 def notify_cluster_config_changed listeners = @cluster_config_change_listeners.get listeners.each do |listener| listener.send(:cluster_config_changed, self) end end
partitions()
click to toggle source
# File lib/aerospike/cluster.rb, line 520 def partitions res = nil @mutex.synchronize do res = @partition_write_map end res end
rack_node(partition, seq)
click to toggle source
Returns a node on the cluster
# File lib/aerospike/cluster.rb, line 170 def rack_node(partition, seq) partition_map = partitions replica_array = partition_map[partition.namespace] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get is_retry = seq.value > -1 node = nil fallback = nil for i in 1..replica_array.length idx = (seq.update { |v| v.succ } % replica_array.size).abs node = replica_array[idx].get[partition.partition_id] next unless node fallback = node # If fallback exists, do not retry on node where command failed, # even if fallback is not on the same rack. return fallback if is_retry && fallback && i == replica_array.length return node if node && node.active? && node.has_rack(partition.namespace, @rack_id) end return fallback if fallback raise Aerospike::Exceptions::InvalidNode end
random_node()
click to toggle source
Returns a random node on the cluster
# File lib/aerospike/cluster.rb, line 270 def random_node # Must copy array reference for copy on write semantics to work. node_array = nodes length = node_array.length i = 0 while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. idx = (@node_index.update { |v| v.succ } % node_array.length).abs node = node_array[idx] return node if node.active? i = i.succ end raise Aerospike::Exceptions::InvalidNode end
read_node(partition, replica_policy, seq)
click to toggle source
Returns a node on the cluster for read operations
# File lib/aerospike/cluster.rb, line 137 def read_node(partition, replica_policy, seq) case replica_policy when Aerospike::Replica::MASTER master_node(partition) when Aerospike::Replica::MASTER_PROLES master_proles_node(partition) when Aerospike::Replica::PREFER_RACK rack_node(partition, seq) when Aerospike::Replica::SEQUENCE sequence_node(partition, seq) when Aerospike::Replica::RANDOM random_node else raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value") end end
refresh_nodes()
click to toggle source
Refresh status of all nodes in cluster. Adds new nodes and/or removes unhealty ones
# File lib/aerospike/cluster.rb, line 399 def refresh_nodes cluster_config_changed = false nodes = self.nodes if nodes.empty? seed_nodes cluster_config_changed = true nodes = self.nodes end peers = Peers.new # Clear node reference count nodes.each do |node| node.refresh_reset end peers.use_peers = supports_peers_protocol? # refresh all known nodes nodes.each do |node| node.refresh_info(peers) end # refresh peers when necessary if peers.generation_changed? # Refresh peers for all nodes that responded the first time even if only # one node's peers changed. peers.reset_refresh_count! nodes.each do |node| node.refresh_peers(peers) end end nodes.each do |node| node.refresh_partitions(peers) if node.partition_generation.changed? node.refresh_racks if node.rebalance_generation.changed? end if peers.generation_changed? || !peers.use_peers? nodes_to_remove = find_nodes_to_remove(peers.refresh_count) if nodes_to_remove.any? remove_nodes(nodes_to_remove) cluster_config_changed = true end end # Add any new nodes from peer refresh if peers.nodes.any? # peers.nodes is a Hash. Pass only values, ie. the array of nodes add_nodes(peers.nodes.values) cluster_config_changed = true end cluster_config_changed end
remove_alias(aliass)
click to toggle source
# File lib/aerospike/cluster.rb, line 581 def remove_alias(aliass) if aliass @mutex.synchronize do @aliases.delete(aliass) end end end
remove_cluster_config_change_listener(listener)
click to toggle source
# File lib/aerospike/cluster.rb, line 359 def remove_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.delete(listener) end end
remove_nodes(nodes_to_remove)
click to toggle source
# File lib/aerospike/cluster.rb, line 637 def remove_nodes(nodes_to_remove) # There is no need to delete nodes from partition_write_map because the nodes # have already been set to inactive. Further connection requests will result # in an exception and a different node will be tried. # Cleanup node resources. nodes_to_remove.each do |node| # Remove node's aliases from cluster alias set. # Aliases are only used in tend thread, so synchronization is not necessary. node.aliases.each do |aliass| Aerospike.logger.debug("Removing alias #{aliass}") remove_alias(aliass) end node.close end # Remove all nodes at once to avoid copying entire array multiple times. remove_nodes_copy(nodes_to_remove) end
remove_nodes_copy(nodes_to_remove)
click to toggle source
# File lib/aerospike/cluster.rb, line 665 def remove_nodes_copy(nodes_to_remove) # Create temporary nodes array. # Since nodes are only marked for deletion using node references in the nodes array, # and the tend thread is the only thread modifying nodes, we are guaranteed that nodes # in nodes_to_remove exist. Therefore, we know the final array size. nodes_list = nodes node_array = [] count = 0 # Add nodes that are not in remove list. nodes_list.each do |node| if node_exists(node, nodes_to_remove) Aerospike.logger.info("Removed node `#{node}`") else node_array[count] = node count += 1 end end # Do sanity check to make sure assumptions are correct. if count < node_array.length Aerospike.logger.warn("Node remove mismatch. Expected #{node_array.length}, Received #{count}") # Resize array. node_array = node_array.dup[0..count-1] end set_nodes(node_array) end
request_info(policy, *commands)
click to toggle source
# File lib/aerospike/cluster.rb, line 325 def request_info(policy, *commands) node = random_node conn = node.get_connection(policy.timeout) Info.request(conn, *commands).tap do node.put_connection(conn) end end
request_node_info(node, policy, *commands)
click to toggle source
# File lib/aerospike/cluster.rb, line 333 def request_node_info(node, policy, *commands) conn = node.get_connection(policy.timeout) Info.request(conn, *commands).tap do node.put_connection(conn) end end
reset_session_info()
click to toggle source
# File lib/aerospike/cluster.rb, line 87 def reset_session_info @session_token = nil @session_expiration = nil end
seed_nodes()
click to toggle source
# File lib/aerospike/cluster.rb, line 529 def seed_nodes seed_array = seeds Aerospike.logger.info("Seeding the cluster. Seeds count: #{seed_array.length}") list = [] seed_array.each do |seed| begin seed_node_validator = NodeValidator.new(self, seed, @connection_timeout, @cluster_name, tls_options) rescue => e Aerospike.logger.error("Seed #{seed} failed: #{e}\n#{e.backtrace.join("\n")}") next end nv = nil # Seed host may have multiple aliases in the case of round-robin dns configurations. seed_node_validator.aliases.each do |aliass| if aliass == seed nv = seed_node_validator else begin nv = NodeValidator.new(self, aliass, @connection_timeout, @cluster_name, tls_options) rescue => e Aerospike.logger.error("Seed #{seed} failed: #{e}") next end end next if find_node_name(list, nv.name) node = create_node(nv) add_aliases(node) list << node end end add_nodes_copy(list) if list.length > 0 end
seeds()
click to toggle source
# File lib/aerospike/cluster.rb, line 108 def seeds @mutex.synchronize do @cluster_seeds.dup end end
sequence_node(partition, seq)
click to toggle source
Returns a random node on the cluster
# File lib/aerospike/cluster.rb, line 221 def sequence_node(partition, seq) partition_map = partitions replica_array = partition_map[partition.namespace] raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get node = nil for replica in replica_array idx = (seq.update { |v| v.succ } % replica_array.size).abs node = replica_array[idx].get[partition.partition_id] return node if node && node.active? end raise Aerospike::Exceptions::InvalidNode end
session_valid?()
click to toggle source
# File lib/aerospike/cluster.rb, line 83 def session_valid? @session_token && @session_expiration && @session_expiration.to_i < Time.now.to_i end
set_nodes(nodes)
click to toggle source
# File lib/aerospike/cluster.rb, line 658 def set_nodes(nodes) @mutex.synchronize do # Replace nodes with copy. @cluster_nodes = nodes end end
set_partitions(part_map)
click to toggle source
# File lib/aerospike/cluster.rb, line 514 def set_partitions(part_map) @mutex.synchronize do @partition_write_map = part_map end end
supports_feature?(feature)
click to toggle source
# File lib/aerospike/cluster.rb, line 340 def supports_feature?(feature) @features.get.include?(feature.to_s) end
supports_peers_protocol?()
click to toggle source
# File lib/aerospike/cluster.rb, line 344 def supports_peers_protocol? nodes.all? { |node| node.supports_feature?(Aerospike::Features::PEERS) } end
tend()
click to toggle source
Check health of all nodes in cluster
# File lib/aerospike/cluster.rb, line 385 def tend was_changed = refresh_nodes return unless was_changed update_cluster_features notify_cluster_config_changed # only log the tend finish IF the number of nodes has been changed. # This prevents spamming the log on every tend interval log_tend_stats(nodes) end
tls_enabled?()
click to toggle source
# File lib/aerospike/cluster.rb, line 92 def tls_enabled? !tls_options.nil? && tls_options[:enable] != false end
update_cluster_features()
click to toggle source
# File lib/aerospike/cluster.rb, line 499 def update_cluster_features # Cluster supports features that are supported by all nodes @features.update do node_features = nodes.map(&:features) node_features.reduce(&:intersection) || Set.new end end
update_partitions(parser)
click to toggle source
# File lib/aerospike/cluster.rb, line 320 def update_partitions(parser) nmap = parser.update_partitions(partitions) set_partitions(nmap) if nmap end
wait_till_stablized()
click to toggle source
# File lib/aerospike/cluster.rb, line 465 def wait_till_stablized count = -1 done = false # will run until the cluster is stabilized thr = Thread.new do loop do tend # Check to see if cluster has changed since the last Tend. # If not, assume cluster has stabilized and return. break if count == nodes.length # Break if timed out break if done sleep(0.001) # sleep for a millisecond count = nodes.length end end # wait for the thread to finish or timeout # This will give the client up to 10 times the timeout duration to find # a host and connect successfully eventually, in case the DNS # returns multiple IPs and some of them are not reachable. thr.join(@connection_timeout * 10) done = true sleep(0.001) thr.kill if thr.alive? @closed.value = false if @cluster_nodes.length > 0 end