class Aerospike::ExecuteTask

ExecuteTask is used to poll for long running server execute job completion.

Public Class Methods

new(cluster, statement) click to toggle source

NewExecuteTask initializes task with fields needed to query server nodes.

Calls superclass method
# File lib/aerospike/task/execute_task.rb, line 22
def initialize(cluster, statement)
  super(cluster, false)

  @task_id = statement.task_id
  @scan = statement.is_scan?

  self
end

Public Instance Methods

all_nodes_done?() click to toggle source

queries all nodes for task completion status.

# File lib/aerospike/task/execute_task.rb, line 32
def all_nodes_done?
  modul = @scan ? "scan" : "query"
  cmd1 = "query-show:trid=#{@task_id}"
  cmd2 = modul + "-show:trid=#{@task_id}"
  cmd3 = "jobs:module=" + modul + ";cmd=get-job;trid=#{@task_id}"

  nodes = @cluster.nodes
  done = false

  nodes.each do |node|
    command = cmd3
    if node.supports_feature?(Aerospike::Features::PARTITION_QUERY)
      command = cmd1
    elsif node.supports_feature?(Aerospike::Features::QUERY_SHOW)
      command = cmd2
    end

    conn = node.get_connection(0)
    responseMap, = Info.request(conn, command)
    node.put_connection(conn)

    response = responseMap[command]
    find = "job_id=#{@task_id}:"
    index = response.index(find)

    unless index
      # don't return on first check
      done = true
      next
    end

    b = index + find.length
    response = response[b, response.length]
    find = "job_status="
    index = response.index(find)

    next unless index

    b = index + find.length
    response = response[b, response.length]
    e = response.index(":")
    status = response[0, e]

    case status
    when "ABORTED"
      raise Aerospike::Exceptions::QueryTerminated
    when "IN PROGRESS"
      return false
    when "DONE"
      done = true
    end
  end

  done
end