class Tengine::Job::Runtime::Signal
Attributes
callback[R]
start.job.job.tengineイベントによって ジョブは :ready -> :starting -> :running に遷移するが、 一度のroot_jobnet.update_with_lock では :starting が保存されないので、 2回のroot_jobnet.update_with_lock に分けることができるようにするための 処理を記憶しておく属性です
callbacks[R]
data[RW]
@attribute 受け渡しのためにデータを一時的に保持する属性。 現時点ではジョブのrunからackを返す際にPIDを保持するために使用します。
event[R]
paths[R]
reservations[R]
Public Class Methods
new(event)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 22 def initialize(event) @event = event reset end
Public Instance Methods
cache(*args)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 70 def cache(*args) case args.length when 1 then obj = args.first return nil if obj.nil? if obj.is_a?(Array) obj.map{|o| cache(o)} else cache(*cache_key(obj)) || remember(obj) end when 2 then @cache[args] else raise ArgumentError, "#{self.class.name}#cache requires 1 or 2 arguments" end end
cache_key(obj)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 92 def cache_key(obj) return [obj.class.name, obj.id.to_s] end
cache_list()
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 96 def cache_list Tengine.logger.debug "-" * 100 Tengine.logger.debug "#{__FILE__}##{__LINE__}" Tengine.logger.debug "object_id: #{object_id}" @cache.each do |key, obj| Tengine.logger.debug "#{obj.object_id} #{key.inspect} #{obj.inspect}" << (obj.changed? ? " CHANGED" : "") end Tengine.logger.debug "-" * 100 end
call_later(&block)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 41 def call_later(&block) @callbacks.push(block) end
callback=(value)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 36 def callback=(value) Tengine.logger.warn("Tengine::Job::Runtime::Signal#callback= is deprecated. Use call_later instead of it.\n " << caller.join("\n ")) @callback = value end
changed_vertecs()
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 106 def changed_vertecs @cache.values.select(&:changed?). map{|obj| obj.is_a?(Tengine::Job::Runtime::Edge) ? obj.owner : obj}. uniq end
execution()
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 112 def execution @execution ||= Tengine::Job::Runtime::Execution.find(event[:execution_id]) end
fire(source, event_type_name, properties = {}, options = {})
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 163 def fire(source, event_type_name, properties = {}, options = {}) case source when Tengine::Job::Runtime::Execution then properties[:execution_id] ||= source.id.to_s properties[:root_jobnet_id] ||= source.root_jobnet.id.to_s properties[:root_jobnet_name_path] ||= source.root_jobnet.name_path properties[:target_jobnet_id] ||= source.root_jobnet.id.to_s properties[:target_jobnet_name_path] ||= source.root_jobnet.name_path else properties[:execution_id] ||= self.execution.id.to_s properties[:root_jobnet_id] ||= source.root.id.to_s properties[:root_jobnet_name_path] ||= source.root.name_path end # デバッグ用 # properties[:target_jobnet_name] = source.root.vertex(properties[:target_jobnet_id]).name_path options ||= {} options[:properties] = properties properties.each do |key, value| if value.is_a?(Moped::BSON::ObjectId) properties[key] = value.to_s end end @reservations << Reservation.new(source, event_type_name, options) end
leave(obj, action = :transmit)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 116 def leave(obj, action = :transmit) @paths << obj begin if obj.is_a?(Tengine::Job::Runtime::Edge) if obj.destination.is_a?(Tengine::Job::Runtime::NamedVertex) self.call_later do cache(obj.destination).send(action, self) end else cache(obj.destination).send(action, self) end elsif obj.is_a?(Tengine::Job::Runtime::Vertex) obj.next_edges.each do |edge| # cache_list with_paths_backup{ cache(edge).send(action, self) } end else raise Tengine::Job::Runtime::Signal::Error, "leaving unsupported object: #{obj.inspect}" end rescue Tengine::Job::Runtime::Signal::Error => e puts "[#{e.class.name}] #{e.message}\nsignal.paths: #{@paths.inspect}" raise e end end
process_callbacks()
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 45 def process_callbacks until self.callbacks.empty? Tengine.logger.debug("-" * 20) callbacks.shift.call end while self.callback block, @callback = @callback, nil block.call end end
remember(obj)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 57 def remember(obj) if obj.is_a?(Array) obj.each{|o| remember(o)} else return nil if obj.nil? key = cache_key(obj) cached = cache(*key) return cached if cached @cache[key] = obj end obj end
remember_all(vertex)
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 87 def remember_all(vertex) v = Tengine::Job::Structure::Visitor::AllWithEdge.new{|obj| remember(obj) } vertex.accept_visitor(v) end
reset()
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 27 def reset @cache = {} @paths = [] @reservations = [] @data = nil @callback = nil @callbacks = [] end
with_paths_backup() { || ... }
click to toggle source
# File lib/tengine/job/runtime/signal.rb, line 141 def with_paths_backup paths_backup = @paths.dup begin yield if block_given? ensure @paths = paths_backup end end