class Catena::Scheduler

Constants

MAX_STEPS

Public Instance Methods

perform(efx, stack) click to toggle source
# File lib/catena/scheduler.rb, line 11
def perform(efx, stack)
  step(efx, stack, 0)
end
step(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 15
def step(efx, stack, steps)
  if (steps > MAX_STEPS)
    logger.warn "Exceeded MAX STEPS. Stowing efx #{efx}"
    enqueue(efx, stack)
    steps
  else
    # TODO if not use >= after pass, will have efx["type"] is nil, need error message
    # TODO if has trailing "|" in chain, it'll subsume the evaluator call, and return nil
    # TODO also need error message when argument length mismatches
    logger.debug "EFX: #{efx.inspect}"
    send("step_#{efx["type"]}", efx, stack, steps)
  end
end
step_and_then(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 83
def step_and_then(efx, stack, steps)
  new_stack = stack.push(efx)
  logger.info "Processing and_then. Stack.len = #{new_stack.length}"

  raise "step_and_then new_efx is nil" if efx["side_effect"].nil?
  step(efx["side_effect"], new_stack, steps + 1)
end
step_binding(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 65
def step_binding(efx, stack, steps)
  # TODO canceling the entire process should happen at binding
  logger.info "Processing binding of #{efx["callback_name"]}"

  # FIXME shouldn't need to know explicitly the tasks are on Deployment class

  callback = find_callback(efx["callback_name"])
  args = efx["callback_args"] + [evaluator(stack)]
  logger.debug "  Calling '#{efx["callback_name"]}' with args: #{args.inspect}"

  # FIXME check the arity and note if we're short?
  # if we're at the end, and we're short on arguments, it'll happyly execute,
  # return a lambda, and silently finish
  callback.call(*args)

  return steps + 1
end
step_failure(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 47
def step_failure(efx, stack, steps)
  logger.info "Processing failure: #{efx}. Stack.len = #{stack.length}"
  new_stack = flush("and_then", stack)
  #puts "  Flushed and_then. Stack.len = #{new_stack.length}"

  if new_stack.empty?
    return steps
  else
    on_error_node = new_stack.pop()
    callback_node = on_error_node["binding_callback"]
    logger.debug "  Popped #{callback_node}. Stack.len = #{new_stack.length}"

    new_efx = chain(callback_node, efx["error"])
    raise "step_failure new_efx is nil. failure value: #{efx["error"]}" if new_efx.nil?
    step(new_efx, new_stack, steps + 1)
  end
end
step_on_error(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 91
def step_on_error(efx, stack, steps)
  logger.warn "Processing on_error...not implemented"
  return steps
end
step_pmap(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 96
def step_pmap(efx, stack, steps)
  logger.warn "Processing parallel map...not implemented"
  return steps
end
step_succeed(efx, stack, steps) click to toggle source
# File lib/catena/scheduler.rb, line 29
def step_succeed(efx, stack, steps)
  logger.debug "Processing succeed: #{efx}. Stack.len = #{stack.length}"
  new_stack = flush("on_error", stack)
  logger.debug "  Flushed on_error. Stack.len = #{new_stack.length}"

  if new_stack.empty?
    return steps
  else
    and_then_node = new_stack.pop()
    callback_node = and_then_node["binding_callback"] # the callback node is type binding
    logger.debug "  Popped #{callback_node}. Stack.len = #{new_stack.length}"

    new_efx = chain(callback_node, efx["value"])
    raise "step_succeed new_efx is nil. succeed value: #{efx["value"]}" if new_efx.nil?
    step(new_efx, new_stack, steps + 1)
  end
end

Private Instance Methods

chain(callback_node, value_or_error) click to toggle source

create another binding with the added efx, and send it through a new step. while we can use funkify to partially apply, it doesn't work here, because we need to serialize it, and so we delay execution by creating another bind step through.

same as calling the func that returns bind in Task TODO should use that so don't have to include Interpreter?

But then still have to solve the problem of knowing about Deployment in step_binding

TODO maybe just destructively update binding_callbcak with new value in args?

# File lib/catena/scheduler.rb, line 139
def chain(callback_node, value_or_error)
  func_name = Lang.callback_to_func_name(callback_node["callback_name"])
  args = callback_node["callback_args"] + [value_or_error]
  return bind(func_name, *args)
end
class_from_name() click to toggle source
# File lib/catena/scheduler.rb, line 114
def class_from_name()
  mod_name.split("::").inject(Object) do |mod, class_name|
    mod.const_get(class_name)
  end
end
enqueue(efx, stack) click to toggle source
# File lib/catena/scheduler.rb, line 120
def enqueue(efx, stack)
  logger.debug "Enqueued bound_efx: #{efx}"
  Scheduler.perform_async(efx, stack)
end
evaluator(stack) click to toggle source
# File lib/catena/scheduler.rb, line 145
def evaluator(stack)
  lambda { |result_efx|
    # FIXME need to check if result_efx is nil and then throw
    # - Do you have a trailing "|"?
    # FIXME need to check if result_efx is actually an efx
    logger.info "Enqueuing result for evaluation"
    logger.debug "  result: #{result_efx}"
    enqueue(result_efx, stack)
  }
end
find_callback(name) click to toggle source

TODO should raise if callback isn't found

# File lib/catena/scheduler.rb, line 106
def find_callback(name)
  mod_with_callback = Catena.config.modules.find do |mod|
    mod = mod.is_a?(String) ? class_from_name(mod) : mod
    mod.respond_to?(name)
  end
  return mod_with_callback.method(name)
end
flush(node_type, stack) click to toggle source
# File lib/catena/scheduler.rb, line 125
def flush(node_type, stack)
  stack.reject { |node| node["type"] == node_type }
end