class Flor::Procedure
Constants
- IF_UNLESS
- RVARS
-
“Returning vars” variables to pass back to pass upon reply. In the ‘receive’ messages, it’s a hash under the key ‘rvars’.
- TRUE_ATTS
-
Attributes that when given alone are turned to “true” attributes.
For example ‘sequence flank` gets turned to `sequence flank: true`
The transformation occurs in
Flor::Pro::Att
(“_att”). - WRAP_KEYS
Public Class Methods
Source
# File lib/flor/core/procedure.rb, line 25 def [](name) @@inherited.find { |k| k.names && k.names.include?(name) } end
Source
# File lib/flor/core/procedure.rb, line 20 def inherited(subclass) (@@inherited ||= []) << subclass end
Source
# File lib/flor/core/procedure.rb, line 46 def make(executor, node, message) heap = node['heap'] heac = self[heap] fail NameError.new("unknown procedure #{heap.inspect}") \ unless heac heac.new(executor, node, message) end
Source
# File lib/flor/core/procedure.rb, line 30 def names(*names) @names = [] unless defined?(@names) if (names = names.flatten).any? @names = names @core = !! caller.find { |l| l.match(/flor\/pcore/) } end @names end
Public Instance Methods
Source
# File lib/flor/core/procedure.rb, line 103 def debug_msg(msg=message) puts Flor.detail_msg(@executor, msg) end
Source
# File lib/flor/core/procedure.rb, line 95 def debug_tree(nid=nil) nid ||= self.nid tree = lookup_tree(nid) puts Flor.tree_to_s(tree, nid) end
Source
# File lib/flor/core/procedure.rb, line 113 def flank @node['tree'] = Flor.dup(tree) @node['replyto'] = nil wrap('nid' => parent, 'flavour' => 'flank') end
Source
# File lib/flor/core/procedure.rb, line 58 def pre_execute # empty default implementation end
Source
# File lib/flor/core/procedure.rb, line 63 def prepare_on_receive_last(on_x, max=-1) on_x .inject([]) { |a, (criteria, mop)| next a if max > 0 && a.size >= max next a unless match_on?(criteria) msg = Flor.dup(@message) a.concat( if Flor.is_message?(mop) [ Flor.dup(mop).merge!('msg' => msg) ] else # procedure args = [ [ 'msg', msg ], [ 'err', msg['error'] ] ] apply(mop, args, mop[2]) end) } end
Source
# File lib/flor/core/procedure.rb, line 82 def trigger_on_error @message['on_error'] = true close_node('on-error') @node['on_receive_last'] = prepare_on_receive_last(@node['on_error'], 1) do_wrap_cancel_children || do_receive # which should trigger 'on_receive_last' end
Protected Instance Methods
Source
# File lib/flor/core/procedure.rb, line 974 def add fail Flor::FlorError.new( "procedure does not accept add-iteration", self ) if message['elements'] # TODO fail if the procedure changed # could the message contain a SHA for the node as was when the # that message got emitted? # well, could that be applied to other messages? too late? #puts " === add " + ("=" * 40) #pp message #puts " === add. " + ("=" * 39) t = tree i = Flor.child_id(message['tnid']) t[1].insert(i, *message['trees']) [] end
Source
# File lib/flor/core/procedure.rb, line 797 def apply(fun, args, line, opts={}) anid = opts.has_key?(:anid) ? opts[:anid] : true # true => generate sub_nid fni = fun[1]['nid'] # fun nid ani = anid ? Flor.sub_nid(fni, counter_next('subs')) : fni # the "trap" apply doesn't want a subid generated before it triggers... cni = fun[1]['cnid'] # closure nid t = fun[1]['tree'] #t = t || lookup_tree(fni) # TODO when fun[1]['tree'] is settled, drop me fail ArgumentError.new("couldn't find function at #{fni}") unless t t = t[0] if t[0].is_a?(Array) t = t[1][0] if t[0] == '_att' t[1][0][0] = '_name' if t[0] == 'define' #vars = opts[:vars] || {} #vars['arguments'] = args # Should I dup? Dup upstream? ms = wrap( 'point' => 'execute', 'nid' => ani, 'tree' => [ '_apply', t[1], line ], 'arguments' => args, 'cnid' => cni) if oe = fun[1]['on_error'] ms.first['on_error'] = oe end #if fs = opts[:fields] # ms.first['payload'].merge!(fs) #end # # an idea from "sort" apply, may be useful later on... if pl = opts[:payload] ms.first['payload'] = pl end ms end
Source
# File lib/flor/core/procedure.rb, line 1011 def apply_on_receive determine_fcid_and_ncid args = [ [ 'msg', @message ], [ 'fcid', @fcid ] ] ms = apply(@node['on_receive'][0][1], args, tree[2]) @node['on_receive_nid'] = [ ms[0]['nid'], from ] ms end
Source
# File lib/flor/core/procedure.rb, line 190 def att(*keys) return nil unless @node['atts'] keys.each do |k| k = k.to_s unless k == nil a = @node['atts'].assoc(k) return a.last if a end nil end
Source
# File lib/flor/core/procedure.rb, line 217 def att_a(*keys) a = if keys.last == nil keys.pop Flor.to_a(att(*keys)) else Array(att(*keys)) end return [ a ] if Flor.is_regex_tree?(a) a end
Source
# File lib/flor/core/procedure.rb, line 160 def att_children children.select { |c| c[0] == '_att' } end
Source
# File lib/flor/core/procedure.rb, line 203 def atts(*keys) return nil unless @node['atts'] @node['atts'] .select { |k, _| keys.include?(k == nil ? nil : k.to_s) } .collect { |_, v| v } end
Source
# File lib/flor/core/procedure.rb, line 946 def cancel close_node do_wrap_cancel_children || pop_on_receive_last || wrap_cancelled end
The core cancel work, is overridden by some procedure implementations.
Source
# File lib/flor/core/procedure.rb, line 932 def cancel_when_closed [] # by default, no effect end
Handle an incoming cancel message when the node has closed. Open for override (overridden by “sequence”, “cursor”, and “until”)
Source
# File lib/flor/core/procedure.rb, line 924 def cancel_when_ended [] # node has already emitted reply to parent, ignore any later request end
Handle an incoming cancel message when the node has ended. Open for override.
Source
# File lib/flor/core/procedure.rb, line 145 def close_node(flavour=@message['flavour']) stack_status(flavour, 'closed') end
Source
# File lib/flor/core/procedure.rb, line 125 def counter_next(k) @executor.counter_next(k) end
Source
# File lib/flor/core/procedure.rb, line 463 def determine_fcid_and_ncid @fcid = point == 'receive' ? Flor.child_id(from) : nil @ncid = (@fcid || -1) + 1 end
Source
# File lib/flor/core/procedure.rb, line 966 def do_add return [] unless node_open? # if the node is closed or ended, discard the add message add end
Source
# File lib/flor/core/procedure.rb, line 877 def do_cancel @message['payload'] ||= Flor.dup(@node['payload']) # # Ensure the 'cancel' message has a payload. # If not, let's use the payload as it was upon reaching this node. return wrap if node_closed? && status_cause # # if the node is cancelled and that is caused by a message upstream # that caused the incoming message, simply reply immediately... if orl = @message['on_receive_last'] # # the message on_receive_last is used by the re_apply feature @node['on_receive_last'] = orl elsif @message['flavour'] == 'timeout' && ot = @node['on_timeout'] @node['on_receive_last'] = prepare_on_receive_last(ot) elsif oc = @node['on_cancel'] @node['on_receive_last'] = prepare_on_receive_last(oc) end return cancel_when_ended if node_ended? return cancel_when_closed if node_closed? cancel end
Called by the executor, in turns call cancel and cancel_when_ methods which may be overridden.
Source
# File lib/flor/core/procedure.rb, line 340 def do_execute pre_execute pnode = @execution['nodes'][parent] cnodes = pnode && (pnode['cnodes'] ||= []) cnodes << nid if cnodes && ( ! cnodes.include?(nid)) execute end
Source
# File lib/flor/core/procedure.rb, line 913 def do_kill return kill_when_ended if node_ended? #return kill_when_closed if node_closed? # nothing of the sort kill end
Called by the executor, in turns call kill and kill_when_ methods which may be overridden.
Source
# File lib/flor/core/procedure.rb, line 359 def do_receive remove = @message['flavour'] != 'flank' from_child = nil from_child = cnodes.delete(from) if cnodes_any? && remove if di = @message['disable'] # # the off: and the disable:/disabled: can achieve that wrap_reply('disabled' => di) elsif node_closed? if from_child || node_status_flavour receive_from_child_when_closed else receive_when_closed end elsif node_ended? receive_when_ended elsif should_apply_on_receive? apply_on_receive else orn = @node['on_receive_nid'] @from = orn[1] if orn && orn[0] == from # # in order to move on to the next child... receive end end
The executor calls do_receive
, while procedure implementations override receive
…
Source
# File lib/flor/core/procedure.rb, line 869 def do_wrap_cancel_children(h={}) wrap_cancel_children(h).instance_eval { |ms| ms.any? ? ms : nil } end
Source
# File lib/flor/core/procedure.rb, line 151 def end_node stack_status(@message['flavour'], 'ended') end
Source
# File lib/flor/core/procedure.rb, line 241 def execute_child(index=0, sub=nil, h=nil) return wrap_reply \ if index < 0 || ( ! tree[1].is_a?(Array)) || tree[1][index] == nil sub = counter_next('subs') if sub == true cnid = Flor.child_nid(nid, index, sub) hh = { 'point' => 'execute', 'nid' => cnid, 'tree' => tree[1][index], 'payload' => payload.current } hh.merge!(h) if h wrap(hh) end
Source
# File lib/flor/core/procedure.rb, line 185 def first_non_att_child_id children.index { |c| c[0] != '_att' } end
Source
# File lib/flor/core/procedure.rb, line 180 def first_unkeyed_child_id children.index { |c| c[0] != '_att' || c[1].size == 1 } end
Source
# File lib/flor/core/procedure.rb, line 469 def from_att? @fcid && Flor.same_branch?(nid, from) && (c = children[@fcid]) && c[0] == '_att' end
Source
# File lib/flor/core/procedure.rb, line 496 def from_error_handler? message['from_on_error'] == nid end
Source
# File lib/flor/core/procedure.rb, line 476 def from_keyed_att? @fcid && Flor.same_branch?(nid, from) && (c = children[@fcid]) && c[0] == '_att' && c[1].size == 2 end
Source
# File lib/flor/core/procedure.rb, line 490 def from_sub_nid i = from.split('-')[1] i ? i.to_i : false end
Source
# File lib/flor/core/procedure.rb, line 483 def from_unkeyed_att? @fcid && Flor.same_branch?(nid, from) && (c = children[@fcid]) && c[0] == '_att' && c[1].size == 1 end
Source
# File lib/flor/core/procedure.rb, line 212 def has_att?(k) @node['atts'].collect(&:first).include?(k) end
Source
# File lib/flor/core/procedure.rb, line 958 def kill close_node wrap_cancel_children('flavour' => 'kill') + wrap_cancelled end
The core kill work, open for override, but actually no procedure provides a custom implementation. Kill is kill for every of the procs.
Source
# File lib/flor/core/procedure.rb, line 939 def kill_when_ended [] # no effect end
When the node has ended, incoming kill messages are silenced ([] return).
Source
# File lib/flor/core/procedure.rb, line 501 def last_receive? children[@ncid] == nil end
Source
# File lib/flor/core/procedure.rb, line 693 def lookup_var_node(node, mode, k=nil) vars = node['vars'] if vars return node if mode == 'l' return node if mode == '' && Dense.has_key?(vars, k) end if cnode = mode == '' && @execution['nodes'][node['cnid']] return cnode if Dense.has_key?(cnode['vars'], k) end par = parent_node(node) return node if vars && par == nil && mode == 'g' return lookup_var_node(par, mode, k) if par nil end
Source
# File lib/flor/core/procedure.rb, line 401 def message_cause (@message['cause'] || []) .find { |c| c['nid'] == nid } end
From the incoming message, return the most recent cause for this node
Source
# File lib/flor/core/procedure.rb, line 165 def non_att_children children.select { |c| c[0] != '_att' } end
Source
# File lib/flor/core/procedure.rb, line 170 def non_att_count non_att_children.size end
Source
# File lib/flor/core/procedure.rb, line 148 def open_node stack_status(@message['flavour'], nil) end
Source
# File lib/flor/core/procedure.rb, line 418 def pop_on_receive_last orl = @node['on_receive_last'] return nil if orl.nil? || orl.empty? c = message_cause open_node \ unless (node_status_flavour == 'on-error') || # TODO use the cause ??? (c && c['cause'] == 'cancel' && @node['on_cancel']) || (c && c['cause'] == 'timeout' && @node['on_timeout']) @node['on_receive_last'] = [] @node['mtime'] = Flor.tstamp orl .each do |m| m['from'] = @node['parent'] \ if m['from'] == 'parent' m['payload'] = m.delete('force_payload') || message['payload'] end end
Source
# File lib/flor/core/procedure.rb, line 506 def receive determine_fcid_and_ncid return receive_first if @fcid == nil return receive_att if from_att? receive_non_att end
Source
# File lib/flor/core/procedure.rb, line 521 def receive_att nt = children[@ncid] return receive_last_att if nt == nil || nt[0] != '_att' execute_child(@ncid) end
Source
# File lib/flor/core/procedure.rb, line 515 def receive_first return receive_last_att if (c = children[0]) && c[0] != '_att' execute_child(@ncid) end
Source
# File lib/flor/core/procedure.rb, line 448 def receive_from_child_when_closed (cnodes_empty? && pop_on_receive_last) || wrap_reply end
Source
# File lib/flor/core/procedure.rb, line 529 def receive_last_att return receive_last if children[@ncid] == nil execute_child(@ncid) end
Source
# File lib/flor/core/procedure.rb, line 542 def receive_non_att if @node.has_key?('ret') @node['ret'] = receive_payload_ret @node['mtime'] = Flor.tstamp end if @node['rets'] @node['rets'] << receive_payload_ret @node['mtime'] = Flor.tstamp end return receive_last if children[@ncid] == nil execute_child(@ncid) end
Source
Source
# File lib/flor/core/procedure.rb, line 596 def receive_unkeyed_tag_att return [] if @node['tags'] # "tag:" encountered, walk away ret = @message['payload']['ret'] ret = Array(ret).flatten ret = nil unless ret.any? && ret.all? { |e| e.is_a?(String) } return [] unless ret (@node['tags'] ||= []).concat(ret) wrap('point' => 'entered', 'nid' => nid, 'tags' => ret) end
Used by ‘cursor’ (and ‘loop’) when “‘ cursor ’main’
# is equivalent to:
cursor tag: ‘main’ “‘
Source
# File lib/flor/core/procedure.rb, line 453 def receive_when_closed [] end
Source
# File lib/flor/core/procedure.rb, line 458 def receive_when_ended [] end
Source
# File lib/flor/core/procedure.rb, line 293 def rep_first_child hd, cn, ln = tree ri = cn.index { |ct| ct[0] == '_ref' || Flor.is_single_ref_tree?(ct) } return unless ri cn1 = cn.dup rt = cn[ri] cn1[ri] = if rt[0] == '_ref' [ '_rep', rt[1], rt[2] ] else s, _, l = rt [ '_rep', [ [ '_sqs', s, l ] ], l ] end @node['tree'] = [ hd, cn1, ln ] end
Source
# File lib/flor/core/procedure.rb, line 749 def set_field(path, v) Dense.set(payload.copy, path, v) rescue IndexError fail IndexError.new( "couldn't set field #{Flor.path_to_s(path)}") end
Source
# File lib/flor/core/procedure.rb, line 759 def set_value(path, value) path = Dense::Path.make(path).to_a if path.is_a?(String) #p [ path, '<-', value ] if path.length < 2 set_var('', path, value) else case path.first when /\Af(?:ld|ield)?\z/ set_field(path[1..-1], value) when /\A([lgd]?)v(?:ar|ariable)?\z/ set_var($1, path[1..-1], value) else set_var('', path, value) end end end
Source
# File lib/flor/core/procedure.rb, line 730 def set_var(mode, path, v) fail IndexError.new( 'cannot set domain variables') if mode == 'd' begin node = lookup_var_node(@node, mode, path) node = lookup_var_node(@node, 'l', path) if node.nil? && mode == '' return Dense.set(node['vars'], path, v) if node rescue IndexError end fail IndexError.new( "couldn't set var #{Flor.path_to_s([ "#{mode}v" ] + path)}") end
Source
# File lib/flor/core/procedure.rb, line 995 def should_apply_on_receive? return false if @message['from_on'] == 'receive' # # no, since the message comes from an on_receive... # how about nested on_receives? orc = @node['on_receive'] return false if orc.nil? || orc.empty? orn = @node['on_receive_nid'] return false if orn && orn[0] == from true end
Source
# File lib/flor/core/procedure.rb, line 778 def splat_value(paths, value) val = value.dup while pa = paths.shift pa = Dense::Path.make(pa).to_a if pa.is_a?(String) if m = pa.last.match(Flor::SPLAT_REGEX) k, u = m[1, 2] l = (u == '_') ? val.length - paths.length : u.to_i set_value(pa[0..-2] + [ k ], val.take(l)) if k.length > 0 val = val.drop(l) else set_value(pa, val.shift) end end end
Source
# File lib/flor/core/procedure.rb, line 130 def stack_status(flavour, status) h = { 'point' => @message['point'], 'status' => status, 'ctime' => Flor.tstamp } h['flavour'] = flavour if flavour mm = @message['m']; h['m'] = mm if mm mf = @message['from']; h['from'] = mf if mf s = node_status @node['status'].pop if s['m'] == h['m'] && s['status'] != 'ended' # only keep the latest effect of a message (probably "ended") @node['status'] << h end
Source
# File lib/flor/core/procedure.rb, line 411 def status_cause m = node_status['m'] (@message['cause'] || []).find { |c| c['m'] == m } end
Given the current node status and the incoming message, returns the upstream cause that lead to the status. Returns nil if the incoming message is not related to the current status.
Source
# File lib/flor/core/procedure.rb, line 260 def stays_an_att?(t) t[1].length == 1 && t[1][0].is_a?(Array) && t[1][0][1] == [] && TRUE_ATTS.include?(t[1][0][0]) end
Source
# File lib/flor/core/procedure.rb, line 568 def store_on(key, prc=payload['ret'], criteria=[ '*' ]) return unless Flor.is_func_tree?(prc) pnode = @node; loop do pnode = parent_node(pnode) return unless pnode break unless IF_UNLESS.include?(pnode['heap']) end flavour = "on_#{key}" prc[1][flavour] = true a = (pnode[flavour] ||= []) i = a.index { |e| e[0] == criteria } || -1 # a.insert(i, [ criteria, prc ]) end
Grab on_error proc from incoming payload and stores it into parent node.
Has no effect if there is no parent node.
Source
# File lib/flor/core/procedure.rb, line 320 def stringify_child(non_att_index) c = non_att_children[non_att_index] return unless c return unless c[1] == [] && c[0].is_a?(String) ci = children.index(c) cn = Flor.dup(children) cn[ci] = [ '_sqs', c[0], c[2] ] t = tree @node['tree'] = [ t[0], cn, t[2] ] end
Source
# File lib/flor/core/procedure.rb, line 335 def stringify_first_child stringify_child(0) end
Source
# File lib/flor/core/procedure.rb, line 315 def unatt_first_unkeyed_child unatt_unkeyed_children(true) end
Source
# File lib/flor/core/procedure.rb, line 268 def unatt_unkeyed_children(first_only=false) found = false unkeyed, keyed = att_children.partition { |c| if found false elsif stays_an_att?(c) false else is_unkeyed = c[1].size == 1 found = true if is_unkeyed && first_only is_unkeyed end } unkeyed = unkeyed .collect { |c| c[1].first } #.reject { |c| c[0] == '_' && c[1] == [] } cn = keyed + unkeyed + non_att_children @node['tree'] = [ tree[0], cn, tree[2] ] if cn != children end
Source
# File lib/flor/core/procedure.rb, line 175 def unkeyed_children children.select { |c| c[0] != '_att' || c[1].size == 1 } end
Source
# File lib/flor/core/procedure.rb, line 614 def wrap(h={}) m = {} m['point'] = 'receive' m['exid'] = exid m['from'] = nid m['nid'] = # what node should receive this reply? if @node['noreply'] # for backward compatibility 2018-10-29 nil elsif @node.has_key?('replyto') @node['replyto'] # nil or something like '0_1' or '0_2-4' else parent end m['sm'] = @message['m'] m['cause'] = @message['cause'] if @message.has_key?('cause') ret = if @node.has_key?('aret') # from the 'ret' common attribute @node['aret'] elsif h.has_key?('ret') h.delete('ret') else :no end m['payload'] = payload.dup_current m.merge!(h) m['payload']['ret'] = ret if ret != :no if vs = @node['vars'] (RVARS & vs.keys).each { |k| (m['rvars'] ||= {})[k] = vs[k] } end # # initially for "cmap" # # was considering passing the whole vars back (as 'varz'), but # it got in the way... and it might be heavy WRAP_KEYS .each { |k| co = @node["child_on_#{k}"] next unless co kri = [ '*' ] m["on_#{k}_handler"] = [ kri, co ] } [ m ] end
Source
# File lib/flor/core/procedure.rb, line 675 def wrap_cancel(h) h['point'] ||= 'cancel' h['nid'] ||= nid #h['flavour'] ||= 'xxx' wrap(h) end
Source
# File lib/flor/core/procedure.rb, line 864 def wrap_cancel_children(h={}) wrap_cancel_nodes(cnodes, h) end
Source
# File lib/flor/core/procedure.rb, line 844 def wrap_cancel_nodes(nids, h) (nids || []) .collect { |i| n = @execution['nodes'][i] s = n && n['status'].last if n == nil || (s['status'] == 'closed' && s['point'] == 'cancel') nil else wrap_cancel(h.merge('nid' => i, 'from' => nid)) end } .flatten(1) .compact end
Source
# File lib/flor/core/procedure.rb, line 859 def wrap_cancelled wrap('payload' => @message['payload'] || @node['payload']) end
Source
# File lib/flor/core/procedure.rb, line 670 def wrap_error(o) wrap('point' => 'failed', 'error' => Flor.to_error(o)) end
Source
# File lib/flor/core/procedure.rb, line 684 def wrap_schedule(h) h['point'] ||= 'schedule' h['payload'] ||= {} h['nid'] ||= nid wrap(h) end