class Aerospike::Client

Examples:

# connect to the database client = Client.new(‘192.168.0.1’)

#=> raises Aerospike::Exceptions::Timeout if a :timeout is specified and :fail_if_not_connected set to true

Attributes

cluster[RW]
default_admin_policy[RW]
default_batch_policy[RW]
default_info_policy[RW]
default_operate_policy[RW]
default_query_policy[RW]
default_read_policy[RW]
default_scan_policy[RW]
default_write_policy[RW]

Public Class Methods

new(hosts = nil, policy: ClientPolicy.new, connect: true) click to toggle source
# File lib/aerospike/client.rb, line 41
def initialize(hosts = nil, policy: ClientPolicy.new, connect: true)
  hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost")
  policy = create_policy(policy, ClientPolicy)
  set_default_policies(policy.policies)
  @cluster = Cluster.new(policy, hosts)
  @cluster.add_cluster_config_change_listener(self)

  self.connect if connect
  self
end

Public Instance Methods

add(key, bins, options = nil) click to toggle source

Examples:

client.add key, {‘bin’, -1}, :timeout => 0.001

# File lib/aerospike/client.rb, line 172
def add(key, bins, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD)
  execute_command(command)
end
append(key, bins, options = nil) click to toggle source

Examples:

client.append key, {‘bin’, ‘value to append’}, :timeout => 0.001

# File lib/aerospike/client.rb, line 130
def append(key, bins, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND)
  execute_command(command)
end
batch_exists(keys, options = nil) click to toggle source

Check if multiple record keys exist in one batch call. The returned boolean array is in positional order with the original key array order. The policy can be used to specify timeouts and protocol type.

# File lib/aerospike/client.rb, line 357
def batch_exists(keys, options = nil)
  policy = create_policy(options, BatchPolicy, default_batch_policy)
  results = Array.new(keys.length)

  execute_batch_index_commands(policy, keys) do |node, batch|
    BatchIndexExistsCommand.new(node, batch, policy, results)
  end

  results
end
batch_get(keys, bin_names = nil, options = nil) click to toggle source

Read multiple record headers and bins for specified keys in one batch call. The returned records are in positional order with the original key array order. If a key is not found, the positional record will be nil. The policy can be used to specify timeouts and protocol type.

# File lib/aerospike/client.rb, line 310
def batch_get(keys, bin_names = nil, options = nil)
  policy = create_policy(options, BatchPolicy, default_batch_policy)
  results = Array.new(keys.length)
  info_flags = INFO1_READ

  case bin_names
  when :all, nil, []
    info_flags |= INFO1_GET_ALL
    bin_names = nil
  when :none
    info_flags |= INFO1_NOBINDATA
    bin_names = nil
  end

  execute_batch_index_commands(policy, keys) do |node, batch|
    BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags)
  end

  results
end
batch_get_header(keys, options = nil) click to toggle source

Read multiple record header data for specified keys in one batch call. The returned records are in positional order with the original key array order. If a key is not found, the positional record will be nil. The policy can be used to specify timeouts and protocol type.

# File lib/aerospike/client.rb, line 335
def batch_get_header(keys, options = nil)
  batch_get(keys, :none, options)
end
batch_operate(records, options = nil) click to toggle source

Operate on multiple records for specified batch keys in one batch call. This method allows different namespaces/bins for each key in the batch. The returned records are located in the same list.

records can be BatchRead, BatchWrite, BatchDelete or BatchUDF.

Requires server version 6.0+

# File lib/aerospike/client.rb, line 346
def batch_operate(records, options = nil)
  policy = create_policy(options, BatchPolicy, default_batch_policy)

  execute_batch_operate_commands(policy, records) do |node, batch|
    BatchOperateCommand.new(node, batch, policy, records)
  end
end
change_password(user, password, options = nil) click to toggle source

Change user’s password. Clear-text password will be hashed using bcrypt before sending to server.

# File lib/aerospike/client.rb, line 798
def change_password(user, password, options = nil)
  raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != ""
  policy = create_policy(options, AdminPolicy, default_admin_policy)

  hash = LoginCommand.hash_password(password)
  command = AdminCommand.new

  if user == @cluster.user
    # Change own password.
    command.change_password(@cluster, policy, user, hash)
  else
    # Change other user's password by user admin.
    command.set_password(@cluster, policy, user, hash)
  end

  @cluster.change_password(user, hash)
end
close() click to toggle source

Closes all client connections to database server nodes.

# File lib/aerospike/client.rb, line 62
def close
  @cluster.close
end
connect() click to toggle source

Connect to the cluster.

# File lib/aerospike/client.rb, line 55
def connect
  @cluster.connect
end
connected?() click to toggle source

Determines if there are active connections to the database server cluster. Returns true if connections exist.

# File lib/aerospike/client.rb, line 70
def connected?
  @cluster.connected?
end
create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil) click to toggle source

Create secondary index. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned IndexTask instance.

This method is only supported by Aerospike 3 servers. index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later) collection_type should be :list, :mapkeys or :mapvalues ctx is an optional list of context. Supported on server v6.1+.

# File lib/aerospike/client.rb, line 560
def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil)
  if options.nil? && collection_type.is_a?(Hash)
    options = collection_type
    collection_type = nil
  end
  policy = create_policy(options, Policy, default_info_policy)

  str_cmd = "sindex-create:ns=#{namespace}"
  str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
  str_cmd << ";indexname=#{index_name};numbins=1"
  str_cmd << ";context=#{CDT::Context.base64(ctx)}" unless ctx.to_a.empty?
  str_cmd << ";indextype=#{collection_type.to_s.upcase}" if collection_type
  str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}"
  str_cmd << ";priority=normal"

  # Send index command to one node. That node will distribute the command to other nodes.
  response = send_info_command(policy, str_cmd).upcase
  if response == "OK"
    # Return task that could optionally be polled for completion.
    return IndexTask.new(@cluster, namespace, index_name)
  end

  if response.start_with?("FAIL:200")
    # Index has already been created.  Do not need to poll for completion.
    return IndexTask.new(@cluster, namespace, index_name, true)
  end

  raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}")
end
create_role(role_name, privileges = [], allowlist = [], read_quota = 0, write_quota = 0, options = nil) click to toggle source

Create a user-defined role. Quotas require server security configuration “enable-quotas” to be set to true. Pass 0 for quota values for no limit.

# File lib/aerospike/client.rb, line 861
def create_role(role_name, privileges = [], allowlist = [], read_quota = 0, write_quota = 0, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.create_role(@cluster, policy, role_name, privileges, allowlist, read_quota, write_quota)
end
create_user(user, password, roles, options = nil) click to toggle source

Create user with password and roles. Clear-text password will be hashed using bcrypt before sending to server.

# File lib/aerospike/client.rb, line 783
def create_user(user, password, roles, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  hash = LoginCommand.hash_password(password)
  command = AdminCommand.new
  command.create_user(@cluster, policy, user, hash, roles)
end
delete(key, options = nil) click to toggle source

Examples:

existed = client.delete key, :timeout => 0.001

# File lib/aerospike/client.rb, line 194
def delete(key, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = DeleteCommand.new(@cluster, policy, key)
  execute_command(command)
  command.existed
end
drop_index(namespace, set_name, index_name, options = nil) click to toggle source

Delete secondary index. This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 592
def drop_index(namespace, set_name, index_name, options = nil)
  policy = create_policy(options, Policy, default_info_policy)

  str_cmd = "sindex-delete:ns=#{namespace}"
  str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
  str_cmd << ";indexname=#{index_name}"

  # Send index command to one node. That node will distribute the command to other nodes.
  response = send_info_command(policy, str_cmd).upcase
  return if response == "OK"

  # Index did not previously exist. Return without error.
  return if response.start_with?("FAIL:201")

  raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}")
end
drop_role(role_name, options = nil) click to toggle source

Remove a user-defined role.

# File lib/aerospike/client.rb, line 868
def drop_role(role_name, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.drop_role(@cluster, policy, role_name)
end
drop_user(user, options = nil) click to toggle source

Remove user from cluster.

# File lib/aerospike/client.rb, line 791
def drop_user(user, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.drop_user(@cluster, policy, user)
end
execute_udf(key, package_name, function_name, args = [], options = nil) click to toggle source

Execute user defined function on server and return results. The function operates on a single record. The package name is used to locate the udf file location:

udf file = <server udf dir>/<package name>.lua

This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 495
def execute_udf(key, package_name, function_name, args = [], options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)

  command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args)
  execute_command(command)

  record = command.record

  return nil if !record || record.bins.empty?

  result_map = record.bins

  # User defined functions don't have to return a value.
  key, obj = result_map.detect { |k, _| k.include?("SUCCESS") }
  return obj if key

  key, obj = result_map.detect { |k, _| k.include?("FAILURE") }
  message = key ? obj.to_s : "Invalid UDF return value"
  raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, message)
end
execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil) click to toggle source

execute_udf_on_query applies user defined function on records that match the statement filter. Records are not returned to the client. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned ExecuteTask instance.

This method is only supported by Aerospike 3 servers. If the policy is nil, the default relevant policy will be used.

# File lib/aerospike/client.rb, line 524
def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)

  nodes = @cluster.nodes
  if nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.")
  end

  statement = statement.clone
  statement.set_aggregate_function(package_name, function_name, function_args, false)
  # Use a thread per node
  nodes.each do |node|
    Thread.new do
      Thread.current.abort_on_exception = true
      begin
        command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id)
        execute_command(command)
      rescue => e
        Aerospike.logger.error(e)
        raise e
      end
    end
  end

  ExecuteTask.new(@cluster, statement)
end
exists(key, options = nil) click to toggle source

Determines if a record key exists. The policy can be used to specify timeouts.

# File lib/aerospike/client.rb, line 272
def exists(key, options = nil)
  policy = create_policy(options, Policy, default_read_policy)
  command = ExistsCommand.new(@cluster, policy, key)
  execute_command(command)
  command.exists
end
get(key, bin_names = nil, options = nil) click to toggle source

Read record header and bins for specified key. The policy can be used to specify timeouts.

# File lib/aerospike/client.rb, line 285
def get(key, bin_names = nil, options = nil)
  policy = create_policy(options, Policy, default_read_policy)

  command = ReadCommand.new(@cluster, policy, key, bin_names)
  execute_command(command)
  command.record
end
get_header(key, options = nil) click to toggle source

Read record generation and expiration only for specified key. Bins are not read. The policy can be used to specify timeouts.

# File lib/aerospike/client.rb, line 295
def get_header(key, options = nil)
  policy = create_policy(options, Policy, default_read_policy)
  command = ReadHeaderCommand.new(@cluster, policy, key)
  execute_command(command)
  command.record
end
grant_privileges(role_name, privileges, options = nil) click to toggle source

Grant privileges to a user-defined role.

# File lib/aerospike/client.rb, line 875
def grant_privileges(role_name, privileges, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.grant_privileges(@cluster, policy, role_name, privileges)
end
grant_roles(user, roles, options = nil) click to toggle source

Add roles to user’s list of roles.

# File lib/aerospike/client.rb, line 817
def grant_roles(user, roles, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.grant_roles(@cluster, policy, user, roles)
end
list_udf(options = nil) click to toggle source

ListUDF lists all packages containing user defined functions in the server. This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 457
def list_udf(options = nil)
  policy = create_policy(options, Policy, default_info_policy)

  str_cmd = "udf-list"

  # Send command to one node. That node will distribute it to other nodes.
  response_map = @cluster.request_info(policy, str_cmd)
  _, response = response_map.first

  vals = response.split(";")

  vals.map do |udf_info|
    next if udf_info.strip! == ""

    udf_parts = udf_info.split(",")
    udf = UDF.new
    udf_parts.each do |values|
      k, v = values.split("=", 2)
      case k
      when "filename"
        udf.filename = v
      when "hash"
        udf.hash = v
      when "type"
        udf.language = v
      end
    end
    udf
  end
end
node_names() click to toggle source

Returns list of active server node names in the cluster.

# File lib/aerospike/client.rb, line 84
def node_names
  @cluster.nodes.map(&:name)
end
nodes() click to toggle source

Returns array of active server nodes in the cluster.

# File lib/aerospike/client.rb, line 77
def nodes
  @cluster.nodes
end
operate(key, operations, options = nil) click to toggle source

Perform multiple read/write operations on a single key in one batch call. An example would be to add an integer value to an existing record and then read the result, all in one database call. Operations are executed in the order they are specified.

# File lib/aerospike/client.rb, line 376
def operate(key, operations, options = nil)
  policy = create_policy(options, OperatePolicy, default_operate_policy)

  args = OperateArgs.new(cluster, policy, default_write_policy, default_operate_policy, key, operations)
  command = OperateCommand.new(@cluster, key, args)
  execute_command(command)
  command.record
end
prepend(key, bins, options = nil) click to toggle source

Examples:

client.prepend key, {‘bin’, ‘value to prepend’}, :timeout => 0.001

# File lib/aerospike/client.rb, line 149
def prepend(key, bins, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND)
  execute_command(command)
end
put(key, bins, options = nil) click to toggle source

Examples:

client.put key, {‘bin’, ‘value string’}, :timeout => 0.001

# File lib/aerospike/client.rb, line 107
def put(key, bins, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE)
  execute_command(command)
end
query(statement, options = nil) click to toggle source

Query executes a query and returns a recordset. The query executor puts records on a channel from separate threads. The caller can concurrently pops records off the channel through the record channel.

This method is only supported by Aerospike 3 servers. If the policy is nil, a default policy will be generated.

# File lib/aerospike/client.rb, line 720
def query(statement, options = nil)
  query_partitions(Aerospike::PartitionFilter.all, statement, options)
end
query_execute(statement, operations = [], options = nil) click to toggle source

QueryExecute applies operations on records that match the statement filter. Records are not returned to the client. This asynchronous server call will return before the command is complete. The user can optionally wait for command completion by using the returned ExecuteTask instance.

This method is only supported by Aerospike 3+ servers. If the policy is nil, the default relevant policy will be used.

@param statement [Aerospike::Statement] The query or batch read statement. @param operations [Array<Aerospike::Operation>] An optional list of operations. @param options [Hash] An optional hash of policy options. @return [Aerospike::ExecuteTask] An ExecuteTask instance that can be used to wait for command completion.

@raise [Aerospike::Exceptions::Aerospike] if an error occurs during the operation.

# File lib/aerospike/client.rb, line 743
def query_execute(statement, operations = [], options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)

  if statement.nil?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INVALID_COMMAND, "Query failed of invalid statement.")
  end

  statement = statement.clone
  unless operations.empty?
    statement.operations = operations
  end

  task_id = statement.task_id
  nodes = @cluster.nodes
  if nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
  end

  # Use a thread per node
  nodes.each do |node|
    Thread.new do
      Thread.current.abort_on_exception = true
      begin
        command = ServerCommand.new(@cluster, node, policy, statement, true, task_id)
        execute_command(command)
      rescue => e
        Aerospike.logger.error(e)
        raise e
      end
    end
  end
  ExecuteTask.new(@cluster, statement)
end
query_partitions(partition_filter, statement, options = nil) click to toggle source

Executes a query for specified partitions and returns a recordset. The query executor puts records on the queue from separate threads. The caller can concurrently pop records off the queue through the recordset.records API.

This method is only supported by Aerospike 4.9+ servers. If the policy is nil, the default relevant policy will be used.

# File lib/aerospike/client.rb, line 694
def query_partitions(partition_filter, statement, options = nil)
  policy = create_policy(options, QueryPolicy, default_query_policy)

  nodes = @cluster.nodes
  if nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
  end

  # result recordset
  recordset = Recordset.new(policy.record_queue_size, 1, :query)
  tracker = PartitionTracker.new(policy, nodes, partition_filter)
  Thread.new do
    Thread.current.abort_on_exception = true
    QueryExecutor.query_partitions(@cluster, policy, tracker, statement, recordset)
  end

  recordset
end
query_role(role, options = nil) click to toggle source

Retrieve privileges for a given role.

# File lib/aerospike/client.rb, line 845
def query_role(role, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.query_role(@cluster, policy, role)
end
query_roles(options = nil) click to toggle source

Retrieve all roles and their privileges.

# File lib/aerospike/client.rb, line 852
def query_roles(options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.query_roles(@cluster, policy)
end
query_user(user, options = nil) click to toggle source

Retrieve roles for a given user.

# File lib/aerospike/client.rb, line 831
def query_user(user, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.query_user(@cluster, policy, user)
end
query_users(options = nil) click to toggle source

Retrieve all users and their roles.

# File lib/aerospike/client.rb, line 838
def query_users(options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.query_users(@cluster, policy)
end
register_udf(udf_body, server_path, language, options = nil) click to toggle source

Register package containing user defined functions with server. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned RegisterTask instance.

This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 406
def register_udf(udf_body, server_path, language, options = nil)
  policy = create_policy(options, Policy, default_info_policy)

  content = Base64.strict_encode64(udf_body).force_encoding("binary")
  str_cmd = "udf-put:filename=#{server_path};content=#{content};"
  str_cmd << "content-len=#{content.length};udf-type=#{language};"

  # Send UDF to one node. That node will distribute the UDF to other nodes.
  response_map = @cluster.request_info(policy, str_cmd)

  res = {}
  response_map.each do |k, response|
    vals = response.to_s.split(";")
    vals.each do |pair|
      k, v = pair.split("=", 2)
      res[k] = v
    end
  end

  if res["error"]
    raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}")
  end

  UdfRegisterTask.new(@cluster, server_path)
end
register_udf_from_file(client_path, server_path, language, options = nil) click to toggle source

Register package containing user defined functions with server. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned RegisterTask instance.

This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 395
def register_udf_from_file(client_path, server_path, language, options = nil)
  udf_body = File.read(client_path)
  register_udf(udf_body, server_path, language, options)
end
remove_udf(udf_name, options = nil) click to toggle source

RemoveUDF removes a package containing user defined functions in the server. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned RemoveTask instance.

This method is only supported by Aerospike 3 servers.

# File lib/aerospike/client.rb, line 438
def remove_udf(udf_name, options = nil)
  policy = create_policy(options, Policy, default_info_policy)

  str_cmd = "udf-remove:filename=#{udf_name};"

  # Send command to one node. That node will distribute it to other nodes.
  # Send UDF to one node. That node will distribute the UDF to other nodes.
  response_map = @cluster.request_info(policy, str_cmd)
  _, response = response_map.first

  if response == "ok"
    UdfRemoveTask.new(@cluster, udf_name)
  else
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, response)
  end
end
request_info(*commands, policy: nil) click to toggle source
# File lib/aerospike/client.rb, line 609
def request_info(*commands, policy: nil)
  policy = create_policy(policy, Policy, default_info_policy)
  @cluster.request_info(policy, *commands)
end
revoke_privileges(role_name, privileges, options = nil) click to toggle source

Revoke privileges from a user-defined role.

# File lib/aerospike/client.rb, line 882
def revoke_privileges(role_name, privileges, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.revoke_privileges(@cluster, policy, role_name, privileges)
end
revoke_roles(user, roles, options = nil) click to toggle source

Remove roles from user’s list of roles.

# File lib/aerospike/client.rb, line 824
def revoke_roles(user, roles, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.revoke_roles(@cluster, policy, user, roles)
end
scan_all(namespace, set_name, bin_names = nil, options = nil) click to toggle source

Reads all records in specified namespace and set from all nodes. If the policy’s concurrent_nodes is specified, each server node will be read in parallel. Otherwise, server nodes are read sequentially. If the policy is nil, the default relevant policy will be used.

# File lib/aerospike/client.rb, line 673
def scan_all(namespace, set_name, bin_names = nil, options = nil)
  scan_partitions(Aerospike::PartitionFilter.all, namespace, set_name, bin_names, options)
end
scan_node(node, namespace, set_name, bin_names = nil, options = nil) click to toggle source

ScanNode reads all records in specified namespace and set, from one node only. The policy can be used to specify timeouts.

# File lib/aerospike/client.rb, line 679
def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
  scan_node_partitions(node, namespace, set_name, bin_names, options)
end
scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil) click to toggle source

Reads all records in specified namespace and set for one node only. If the policy is nil, the default relevant policy will be used.

# File lib/aerospike/client.rb, line 648
def scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil)
  policy = create_policy(options, ScanPolicy, default_scan_policy)

  # Retry policy must be one-shot for scans.
  # copy on write for policy
  new_policy = policy.clone

  unless node.active?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
  end

  tracker = Aerospike::PartitionTracker.new(policy, [node])
  recordset = Recordset.new(policy.record_queue_size, 1, :scan)
  Thread.new do
    Thread.current.abort_on_exception = true
    ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names)
  end

  recordset
end
scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil) click to toggle source

Reads records in specified namespace and set using partition filter. If the policy’s concurrent_nodes is specified, each server node will be read in parallel. Otherwise, server nodes are read sequentially. If partition_filter is nil, all partitions will be scanned. If the policy is nil, the default relevant policy will be used. This method is only supported by Aerospike 4.9+ servers.

# File lib/aerospike/client.rb, line 624
def scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil)
  policy = create_policy(options, ScanPolicy, default_scan_policy)

  # Retry policy must be one-shot for scans.
  # copy on write for policy
  new_policy = policy.clone

  nodes = @cluster.nodes
  if nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
  end

  tracker = Aerospike::PartitionTracker.new(policy, nodes, partition_filter)
  recordset = Recordset.new(policy.record_queue_size, 1, :scan)
  Thread.new do
    Thread.current.abort_on_exception = true
    ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names)
  end

  recordset
end
set_quotas(role_name, read_quota, write_quota, options = nil) click to toggle source

Set or update quota for a role.

# File lib/aerospike/client.rb, line 889
def set_quotas(role_name, read_quota, write_quota, options = nil)
  policy = create_policy(options, AdminPolicy, default_admin_policy)
  command = AdminCommand.new
  command.set_quotas(@cluster, policy, role_name, read_quota, write_quota)
end
supports_feature?(feature) click to toggle source
# File lib/aerospike/client.rb, line 88
def supports_feature?(feature)
  @cluster.supports_feature?(feature)
end
touch(key, options = nil) click to toggle source

Examples:

client.touch key, :timeout => 0.001

# File lib/aerospike/client.rb, line 259
def touch(key, options = nil)
  policy = create_policy(options, WritePolicy, default_write_policy)
  command = TouchCommand.new(@cluster, policy, key)
  execute_command(command)
end
truncate(namespace, set_name = nil, before_last_update = nil, options = {}) click to toggle source

Removes records in the specified namespace/set efficiently.

This method is orders of magnitude faster than deleting records one at a time. It requires Aerospike Server version 3.12 or later. See www.aerospike.com/docs/reference/info#truncate for further information.

This asynchronous server call may return before the truncate is complete. The user can still write new records after the server call returns because new records will have last update times greater than the truncate cut-off (set at the time of the truncate call.)

If no policy options are provided, +@default_info_policy+ will be used.

# File lib/aerospike/client.rb, line 216
def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
  policy = create_policy(options, Policy, default_info_policy)

  node = @cluster.random_node

  if set_name && !set_name.to_s.strip.empty?
    str_cmd = "truncate:namespace=#{namespace}"
    str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty?
  else
    str_cmd = if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE)
      "truncate-namespace:namespace=#{namespace}"
              else
      "truncate:namespace=#{namespace}"
              end
  end

  if before_last_update
    lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round
    str_cmd << ";lut=#{lut_nanos}"
  elsif supports_feature?(Aerospike::Features::LUT_NOW)
    # Servers >= 4.3.1.4 require lut argument
    str_cmd << ";lut=now"
  end

  response = send_info_command(policy, str_cmd, node).upcase
  return if response == "OK"
  raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
end

Private Instance Methods

cluster_config_changed(cluster) click to toggle source
# File lib/aerospike/client.rb, line 946
def cluster_config_changed(cluster)
  Aerospike.logger.debug { "Cluster config change detected; active nodes: #{cluster.nodes.map(&:name)}" }
  setup_command_validators
end
create_policy(policy, policy_klass, default_policy = nil) click to toggle source
# File lib/aerospike/client.rb, line 931
def create_policy(policy, policy_klass, default_policy = nil)
  case policy
  when nil
    default_policy || policy_klass.new
  when policy_klass
    policy
  when Hash
    policy_klass.new(policy)
  else
    raise TypeError, "policy should be a #{policy_klass.name} instance or a Hash"
  end
end
execute_batch_index_commands(policy, keys) { |node, batch| ... } click to toggle source
# File lib/aerospike/client.rb, line 976
def execute_batch_index_commands(policy, keys)
  if @cluster.nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.")
  end

  batch_nodes = BatchIndexNode.generate_list(@cluster, policy.replica, keys)
  threads = []

  batch_nodes.each do |batch|
    threads << Thread.new do
      command = yield batch.node, batch
      execute_command(command)
    end
  end

  threads.each(&:join)
end
execute_batch_operate_commands(policy, records) { |node, batch| ... } click to toggle source
# File lib/aerospike/client.rb, line 994
def execute_batch_operate_commands(policy, records)
  if @cluster.nodes.empty?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.")
  end

  batch_nodes = BatchOperateNode.generate_list(@cluster, policy.replica, records)
  threads = []

  batch_nodes.each do |batch|
    threads << Thread.new do
      command = yield batch.node, batch
      execute_command(command)
    end
  end

  threads.each(&:join)
end
execute_command(command) click to toggle source
# File lib/aerospike/client.rb, line 971
def execute_command(command)
  validate_command(command)
  command.execute
end
hash_to_bins(hash) click to toggle source
# File lib/aerospike/client.rb, line 918
def hash_to_bins(hash)
  if hash.is_a?(Bin)
    [hash]
  elsif hash.is_a?(Array)
    hash # it is a list of bins
  else
    hash.map do |k, v|
      raise Aerospike::Exceptions::Parse.new("bin name `#{k}` is not a string.") unless k.is_a?(String)
      Bin.new(k, v)
    end
  end
end
send_info_command(policy, command, node = nil) click to toggle source
# File lib/aerospike/client.rb, line 908
def send_info_command(policy, command, node = nil)
  Aerospike.logger.debug { "Sending info command: #{command}" }
  if node
    _, response = @cluster.request_node_info(node, policy, command).first
  else
    _, response = @cluster.request_info(policy, command).first
  end
  response.to_s
end
set_default_policies(policies) click to toggle source
# File lib/aerospike/client.rb, line 897
def set_default_policies(policies)
  self.default_info_policy = create_policy(policies[:info], Policy)
  self.default_read_policy = create_policy(policies[:read], Policy)
  self.default_admin_policy = create_policy(policies[:admin], AdminPolicy)
  self.default_batch_policy = create_policy(policies[:batch], BatchPolicy)
  self.default_query_policy = create_policy(policies[:query], QueryPolicy)
  self.default_scan_policy = create_policy(policies[:scan], ScanPolicy)
  self.default_write_policy = create_policy(policies[:write], WritePolicy)
  self.default_operate_policy = create_policy(policies[:operate], OperatePolicy)
end
setup_command_validators() click to toggle source
# File lib/aerospike/client.rb, line 951
def setup_command_validators
  Aerospike.logger.debug { "Cluster features: #{@cluster.features.get.to_a}" }
  validators = []

  # guard against unsupported particle types
  unsupported_particle_types = []
  unsupported_particle_types << Aerospike::ParticleType::DOUBLE unless supports_feature?(Aerospike::Features::FLOAT)
  unsupported_particle_types << Aerospike::ParticleType::GEOJSON unless supports_feature?(Aerospike::Features::GEO)
  validators << UnsupportedParticleTypeValidator.new(*unsupported_particle_types) unless unsupported_particle_types.empty?

  @command_validators = validators
end
validate_command(command) click to toggle source
# File lib/aerospike/client.rb, line 964
def validate_command(command)
  return unless @command_validators
  @command_validators.each do |validator|
    validator.call(command)
  end
end