class WEEL::DSLRealization
Attributes
Public Class Methods
Source
# File lib/weel.rb, line 477 def initialize #{{{ @__weel_search_positions = {} @__weel_positions = Array.new @__weel_main = nil @__weel_data ||= Hash.new @__weel_endpoints ||= Hash.new @__weel_connectionwrapper = ConnectionWrapperBase @__weel_connectionwrapper_args = [] @__weel_state = :ready @__weel_status = Status.new(0,"undefined") @__weel_sim = false @__weel_lock = Mutex.new end
Public Instance Methods
Source
# File lib/weel.rb, line 1059 def __weel_finalize #{{{ __weel_recursive_join(@__weel_main) @__weel_state = :stopped @__weel_connectionwrapper::inform_state_change @__weel_connectionwrapper_args, @__weel_state end
Source
# File lib/weel.rb, line 1065 def __weel_state=(newState)# {{{ return @__weel_state if newState == @__weel_state && @__weel_state != :ready @__weel_positions = Array.new if newState == :running @__weel_state = newState if newState == :stopping || newState == :finishing @__weel_status.nudge! __weel_recursive_continue(@__weel_main) end @__weel_connectionwrapper::inform_state_change @__weel_connectionwrapper_args, @__weel_state end
Source
# File lib/weel.rb, line 650 def alternative(condition,args={},&block)# {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] Thread.current[:mutex] ||= Mutex.new Thread.current[:mutex].synchronize do return if Thread.current[:alternative_mode][-1] == :exclusive && Thread.current[:alternative_executed][-1] == true if condition.is_a?(String) condition = __weel_eval_condition(condition, args) end Thread.current[:alternative_executed][-1] = true if condition end searchmode = __weel_is_in_search_mode __weel_protect_yield(&block) if searchmode || condition Thread.current[:alternative_executed][-1] = true if __weel_is_in_search_mode != searchmode # we swiched from searchmode true to false, thus branch has been executed which is as good as evaling the condition to true end
Defines a possible choice of a choose-Construct Block is executed if condition == true or searchmode is active (to find the starting position)
Source
# File lib/weel.rb, line 501 def call(position, endpoint, parameters: {}, finalize: nil, update: nil, prepare: nil, salvage: nil, &finalizeblk) #{{{ __weel_activity(position,:call,endpoint,parameters,finalize||finalizeblk,update,prepare,salvage) end
DSL-Constructs for atomic calls to external services (calls) and pure context manipulations (manipulate). Calls can also manipulate context (after the invoking the external services) position: a unique identifier within the wf-description (may be used by the search to identify a starting point) endpoint: (only with :call) ep of the service parameters: (only with :call) service parameters
Source
# File lib/weel.rb, line 629 def choose(mode=:inclusive,&block) # {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] Thread.current[:alternative_executed] ||= [] Thread.current[:alternative_mode] ||= [] Thread.current[:alternative_executed] << false Thread.current[:alternative_mode] << mode cw = @__weel_connectionwrapper.new @__weel_connectionwrapper_args cw.split_branches Thread.current.__id__ __weel_protect_yield(&block) cw.join_branches Thread.current.__id__ Thread.current[:alternative_executed].pop Thread.current[:alternative_mode].pop nil end
Choose DSL-Construct Defines a choice in the Workflow path. May contain multiple execution alternatives
Source
# File lib/weel.rb, line 670 def critical(id,&block)# {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] @__weel_critical ||= Mutex.new semaphore = nil @__weel_critical.synchronize do @__weel_critical_sections ||= {} semaphore = @__weel_critical_sections[id] ? @__weel_critical_sections[id] : Mutex.new @__weel_critical_sections[id] = semaphore if id end semaphore.synchronize do __weel_protect_yield(&block) end end
Defines a critical block (=Mutex)
Source
# File lib/weel.rb, line 732 def escape #{{{ return if __weel_is_in_search_mode return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] throw :escape end
Source
# File lib/weel.rb, line 685 def loop(condition,args={},&block)# {{{ unless condition[0].is_a?(String) && [:pre_test,:post_test].include?(condition[1]) && args.is_a?(Hash) raise "condition must be called pre_test{} or post_test{}" end return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] if __weel_is_in_search_mode catch :escape do __weel_protect_yield(&block) end if __weel_is_in_search_mode return else ### in case it was a :post_test we wake inside the loop so we can check ### condition first thing condition[1] = :pre_test end end loop_guard = 0 loop_id = SecureRandom.uuid catch :escape do case condition[1] when :pre_test while __weel_eval_condition(condition[0],args) && self.__weel_state != :stopping && self.__weel_state != :stopped && self.__weel_state != :finishing && !Thread.current[:nolongernecessary] loop_guard += 1 __weel_protect_yield(&block) sleep 1 if @__weel_connectionwrapper::loop_guard(@__weel_connectionwrapper_args,loop_id,loop_guard) end when :post_test begin loop_guard += 1 __weel_protect_yield(&block) sleep 1 if @__weel_connectionwrapper::loop_guard(@__weel_connectionwrapper_args,loop_id,loop_guard) end while __weel_eval_condition(condition[0],args) && self.__weel_state != :stopping && self.__weel_state != :stopped && self.__weel_state != :finishing && !Thread.current[:nolongernecessary] end end end
Defines a Cycle (loop/iteration)
Source
# File lib/weel.rb, line 506 def manipulate(position, parameters=nil, script=nil, &scriptblk) #{{{ if scriptblk.nil? && script.nil? && !parameters.nil? script, parameters = parameters, nil end __weel_activity(position,:manipulate,nil,parameters||{},script||scriptblk) end
when two params, second param always script when block and two params, parameters stays
Source
# File lib/weel.rb, line 664 def otherwise(args={},&block) # {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] __weel_protect_yield(&block) if __weel_is_in_search_mode || !Thread.current[:alternative_executed].last end
Source
# File lib/weel.rb, line 516 def parallel(type=nil,&block)# {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] Thread.current[:branches] = [] Thread.current[:branch_traces] = {} Thread.current[:branch_traces_ids] = 0 Thread.current[:branch_finished_count] = 0 Thread.current[:branch_event] = Continue.new Thread.current[:mutex] = Mutex.new __weel_protect_yield(&block) Thread.current[:branch_wait_count] = (type.is_a?(Hash) && type[:wait] != nil && (type[:wait].is_a?(Integer) && type[:wait] > 0) ? type[:wait] : Thread.current[:branches].size) Thread.current[:branch_wait_count_cancel] = 0 Thread.current[:branch_wait_count_cancel_condition] = (type.is_a?(Hash) && type[:cancel] != nil && type[:cancel] == :first ) ? :first : :last 1.upto Thread.current[:branches].size do Thread.current[:branch_event].wait end cw = @__weel_connectionwrapper.new @__weel_connectionwrapper_args cw.split_branches Thread.current.__id__, Thread.current[:branch_traces] Thread.current[:branches].each do |thread| # decide after executing block in parallel cause for coopis # it goes out of search mode while dynamically counting branches if Thread.current[:branch_search] == false thread[:branch_search] = false end thread[:start_event]&.continue # sometimes start event might not even exist yet (i.e. race condition) end Thread.current[:branch_event].wait unless self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:branches].length == 0 cw.join_branches Thread.current.__id__, Thread.current[:branch_traces] unless self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped # first set all to no_longer_neccessary, just in case, but this should not be necessary Thread.current[:branches].each do |thread| thread[:nolongernecessary] = true __weel_recursive_continue(thread) end # wait for all, they should not even exist at this point Thread.current[:branches].each do |thread| __weel_recursive_join(thread) end end end
Parallel DSL-Construct Defines Workflow paths that can be executed parallel. May contain multiple branches (parallel_branch
)
Source
# File lib/weel.rb, line 565 def parallel_branch(data=@__weel_data,&block)# {{{ return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] branch_parent = Thread.current branch_parent[:branches] << Thread.new(data) do |*local| Thread.current.abort_on_exception = true Thread.current[:branch_search] = @__weel_search_positions.any? Thread.current[:branch_parent] = branch_parent Thread.current[:start_event] = Continue.new Thread.current[:local] = local Thread.current[:branch_wait_count_cancel_active] = false branch_parent[:mutex].synchronize do Thread.current[:branch_traces_id] = branch_parent[:branch_traces_ids] branch_parent[:branch_traces_ids] += 1 end # parallel_branch could be possibly around an alternative. Thus thread has to inherit the alternative_executed # after branching, update it in the parent (TODO) if branch_parent[:alternative_executed] && branch_parent[:alternative_executed].length > 0 Thread.current[:alternative_executed] = [branch_parent[:alternative_executed].last] Thread.current[:alternative_mode] = [branch_parent[:alternative_mode].last] end branch_parent[:branch_event].continue Thread.current[:start_event].wait unless self.__weel_state == :stopping || self.__weel_state == :stopped || self.__weel_state == :finishing unless self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] __weel_protect_yield(*local, &block) end branch_parent[:mutex].synchronize do branch_parent[:branch_finished_count] += 1 if branch_parent[:branch_wait_count_cancel_condition] == :last if branch_parent[:branch_finished_count] == branch_parent[:branch_wait_count] && self.__weel_state != :stopping && self.__weel_state != :finishing branch_parent[:branches].each do |thread| if thread.alive? && thread[:branch_wait_count_cancel_active] == false thread[:nolongernecessary] = true __weel_recursive_continue(thread) end end end end if branch_parent[:branch_finished_count] == branch_parent[:branches].length && self.__weel_state != :stopping && self.__weel_state != :finishing branch_parent[:branch_event].continue end end unless self.__weel_state == :stopping || self.__weel_state == :stopped || self.__weel_state == :finishing if Thread.current[:branch_position] @__weel_positions.delete Thread.current[:branch_position] begin ipc = {} ipc[:unmark] = [Thread.current[:branch_position]] @__weel_connectionwrapper::inform_position_change(@__weel_connectionwrapper_args,ipc) end rescue nil Thread.current[:branch_position] = nil end end end end
Defines a branch of a parallel-Construct
Source
# File lib/weel.rb, line 728 def post_test(code=nil,&blk)# {{{ [code || blk, :post_test] end
Source
# File lib/weel.rb, line 725 def pre_test(code=nil,&blk)# {{{ [code || blk, :pre_test] end
Source
# File lib/weel.rb, line 742 def stop(position) #{{{ searchmode = __weel_is_in_search_mode(position) return if searchmode return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] # gather traces in threads to point to join if Thread.current[:branch_parent] && Thread.current[:branch_traces_id] Thread.current[:branch_parent][:branch_traces][Thread.current[:branch_traces_id]] ||= [] Thread.current[:branch_parent][:branch_traces][Thread.current[:branch_traces_id]] << position end __weel_progress position, 0, true self.__weel_state = :stopping end
Source
# File lib/weel.rb, line 737 def terminate #{{{ return if __weel_is_in_search_mode return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] self.__weel_state = :finishing end
Source
# File lib/weel.rb, line 722 def test(code=nil,&blk)# {{{ code || blk end
Source
# File lib/weel.rb, line 494 def 🠊(code); ProcString.new(code); end
DSL-Construct for translating expressions into static parameters
Private Instance Methods
Source
# File lib/weel.rb, line 833 def __weel_activity(position, type, endpoint, parameters, finalize=nil, update=nil, prepare=nil, salvage=nil)# {{{ position = __weel_position_test position searchmode = __weel_is_in_search_mode(position) return if searchmode == true begin return if self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped || Thread.current[:nolongernecessary] Thread.current[:continue] = Continue.new connectionwrapper = @__weel_connectionwrapper.new @__weel_connectionwrapper_args, position, Thread.current[:continue] # gather traces in threads to point to join if Thread.current[:branch_parent] && Thread.current[:branch_traces_id] Thread.current[:branch_parent][:branch_traces][Thread.current[:branch_traces_id]] ||= [] Thread.current[:branch_parent][:branch_traces][Thread.current[:branch_traces_id]] << position end wp = __weel_progress position, connectionwrapper.activity_uuid case type when :manipulate raise Signal::Stop unless connectionwrapper.vote_sync_before raise Signal::Skip if self.__weel_state == :stopping || self.__weel_state == :finishing if finalize.is_a?(String) connectionwrapper.activity_manipulate_handle(parameters) connectionwrapper.inform_activity_manipulate struct = connectionwrapper.manipulate(false,@__weel_lock,@__weel_data,@__weel_endpoints,@__weel_status,Thread.current[:local],connectionwrapper.additional,finalize,'Activity ' + position.to_s) connectionwrapper.inform_manipulate_change( ((struct && struct.changed_status) ? @__weel_status : nil), ((struct && struct.changed_data.any?) ? struct.changed_data.uniq : nil), ((struct && struct.changed_endpoints.any?) ? struct.changed_endpoints.uniq : nil), @__weel_data, @__weel_endpoints ) connectionwrapper.inform_activity_done wp.detail = :after @__weel_connectionwrapper::inform_position_change @__weel_connectionwrapper_args, :after => [wp] end when :call begin again = catch Signal::Again do # Will be nil if we do not throw (using default connectionwrapper) connectionwrapper.mem_guard params = connectionwrapper.prepare(@__weel_lock,@__weel_data,@__weel_endpoints,@__weel_status,Thread.current[:local],connectionwrapper.additional,prepare,endpoint,parameters) raise Signal::Stop unless connectionwrapper.vote_sync_before(params) raise Signal::Skip if self.__weel_state == :stopping || self.__weel_state == :finishing connectionwrapper.activity_handle wp.passthrough, params wp.passthrough = connectionwrapper.activity_passthrough_value unless wp.passthrough.nil? @__weel_connectionwrapper::inform_position_change @__weel_connectionwrapper_args, :wait => [wp] end begin # cleanup after callback updates connectionwrapper.mem_guard # with loop if catching Signal::Again # handshake call and wait until it finished waitingresult = nil waitingresult = Thread.current[:continue].wait unless Thread.current[:nolongernecessary] || self.__weel_state == :stopping || self.__weel_state == :finishing || self.__weel_state == :stopped if Thread.current[:nolongernecessary] raise Signal::NoLongerNecessary end if self.__weel_state == :stopping || self.__weel_state == :finishing connectionwrapper.activity_stop wp.passthrough = connectionwrapper.activity_passthrough_value raise Signal::Proceed if wp.passthrough # if stop, but no passthrough, let manipulate happen and then stop end next if waitingresult == WEEL::Signal::UpdateAgain && (connectionwrapper.activity_result_value.nil? || connectionwrapper.activity_result_value&.length == 0) code, cmess = if waitingresult == WEEL::Signal::UpdateAgain [update, 'update'] elsif waitingresult == WEEL::Signal::Salvage if salvage [salvage, 'salvage'] else raise('HTTP Error. The service return status was not between 200 and 300.') end else [finalize, 'finalize'] end if code.is_a?(String) connectionwrapper.inform_activity_manipulate struct = nil # when you throw without parameters, ma contains nil, so we return Signal::Proceed to give ma a meaningful value in other cases ma = catch Signal::Again do struct = connectionwrapper.manipulate(false,@__weel_lock,@__weel_data,@__weel_endpoints,@__weel_status,Thread.current[:local],connectionwrapper.additional,code,'Activity ' + position.to_s + ' ' + cmess,connectionwrapper.activity_result_value,connectionwrapper.activity_result_options) Signal::Proceed end connectionwrapper.inform_manipulate_change( ((struct && struct.changed_status) ? @__weel_status : nil), ((struct && struct.changed_data.any?) ? struct.changed_data.uniq : nil), ((struct && struct.changed_endpoints.any?) ? struct.changed_endpoints.uniq : nil), @__weel_data, @__weel_endpoints ) throw(Signal::Again, Signal::Again) if ma.nil? || ma == Signal::Again # this signal again loops "there is a catch" because rescue signal throw that throughly restarts the task else end end while waitingresult == Signal::UpdateAgain # this signal again loops because async update, proposal: rename to UpdateAgain if connectionwrapper.activity_passthrough_value.nil? connectionwrapper.inform_activity_done wp.passthrough = nil wp.detail = :after @__weel_connectionwrapper::inform_position_change @__weel_connectionwrapper_args, :after => [wp] end end end while again == Signal::Again # there is a catch end raise Signal::Proceed rescue Signal::SkipManipulate, Signal::Proceed if self.__weel_state != :stopping && self.__weel_state != :finishing && !connectionwrapper.vote_sync_after self.__weel_state = :stopping wp.detail = :unmark end rescue Signal::NoLongerNecessary connectionwrapper.activity_stop connectionwrapper.inform_activity_cancelled connectionwrapper.inform_activity_done @__weel_positions.delete wp Thread.current[:branch_position] = nil wp.passthrough = nil wp.detail = :unmark @__weel_connectionwrapper::inform_position_change @__weel_connectionwrapper_args, :unmark => [wp] rescue Signal::StopSkipManipulate, Signal::Stop self.__weel_state = :stopping rescue Signal::Skip nil rescue SyntaxError => se connectionwrapper.inform_activity_failed se self.__weel_state = :stopping rescue => err @__weel_connectionwrapper::inform_connectionwrapper_error @__weel_connectionwrapper_args, err self.__weel_state = :stopping ensure connectionwrapper.mem_guard unless connectionwrapper.nil? if Thread.current[:branch_parent] Thread.current[:branch_parent][:mutex].synchronize do if Thread.current[:branch_parent][:branch_wait_count_cancel_condition] == :first if !Thread.current[:branch_wait_count_cancel_active] && Thread.current[:branch_parent][:branch_wait_count_cancel] < Thread.current[:branch_parent][:branch_wait_count] Thread.current[:branch_wait_count_cancel_active] = true Thread.current[:branch_parent][:branch_wait_count_cancel] += 1 end if Thread.current[:branch_parent][:branch_wait_count_cancel] == Thread.current[:branch_parent][:branch_wait_count] && self.__weel_state != :stopping && self.__weel_state != :finishing Thread.current[:branch_parent][:branches].each do |thread| if thread.alive? && thread[:branch_wait_count_cancel_active] == false thread[:nolongernecessary] = true __weel_recursive_continue(thread) end end end end end end Thread.current[:continue].clear if Thread.current[:continue] && Thread.current[:continue].is_a?(Continue) end end
Source
# File lib/weel.rb, line 777 def __weel_eval_condition(condition,args={}) #{{{ begin connectionwrapper = @__weel_connectionwrapper.new @__weel_connectionwrapper_args connectionwrapper.test_condition(@__weel_data,@__weel_endpoints,Thread.current[:local],connectionwrapper.additional,condition,args) rescue NameError => err # don't look into it, or it will explode self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,Exception.new("protect_yield: `#{err.name}` is not a thing that can be used. Maybe it is meant to be a string and you forgot quotes?"),nil) nil rescue WEEL::Signal::Error => err self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,err,nil) nil rescue => err self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,err,nil) nil end end
Source
# File lib/weel.rb, line 1039 def __weel_is_in_search_mode(position = nil)# {{{ branch = Thread.current return false unless branch[:branch_search] if position && branch[:branch_search] && @__weel_search_positions.include?(position) # matching searchpos => start execution from here branch[:branch_search] = false # execute all activities in THIS branch (thread) after this point branch[:branch_search_now] = true # just now did we switch the search mode while branch.key?(:branch_parent) # also all parent branches should execute activities after this point, additional branches spawned by parent branches should still be in search mode branch = branch[:branch_parent] branch[:branch_search] = false branch[:branch_search_now] = true # just now did we switch the search mode end @__weel_search_positions[position].detail == :after else true end end
Source
# File lib/weel.rb, line 1030 def __weel_position_test(position)# {{{ if position.is_a?(Symbol) && position.to_s =~ /[a-zA-Z][a-zA-Z0-9_]*/ position else self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,Exception.new("position (#{position}) not valid"),nil) end end
Source
# File lib/weel.rb, line 796 def __weel_progress(position, uuid, skip=false) #{{{ ipc = {} branch = Thread.current if Thread.current[:branch_parent] && Thread.current[:branch_parent][:branch_position] @__weel_positions.delete Thread.current[:branch_parent][:branch_position] ipc[:unmark] ||= [] ipc[:unmark] << Thread.current[:branch_parent][:branch_position] rescue nil Thread.current[:branch_parent][:branch_position] = nil end if Thread.current[:branch_position] @__weel_positions.delete Thread.current[:branch_position] ipc[:unmark] ||= [] ipc[:unmark] << Thread.current[:branch_position] rescue nil end wp = if branch[:branch_search_now] == true branch[:branch_search_now] = false WEEL::Position.new(position, uuid, skip ? :after : :at, @__weel_search_positions[position]&.passthrough) else WEEL::Position.new(position, uuid, skip ? :after : :at) end ipc[skip ? :after : :at] = [wp] @__weel_search_positions.delete(position) @__weel_search_positions.each do |k,ele| # some may still be in active search but lets unmark them for good measure ipc[:unmark] ||= [] ipc[:unmark] << ele true end ipc[:unmark].uniq! if ipc[:unmark] @__weel_positions << wp Thread.current[:branch_position] = wp @__weel_connectionwrapper::inform_position_change @__weel_connectionwrapper_args, ipc wp end
Source
# File lib/weel.rb, line 759 def __weel_protect_yield(*local) #{{{ begin yield(*local) if block_given? rescue NameError => err # don't look into it, or it will explode self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,Exception.new("protect_yield: `#{err.name}` is not a thing that can be used. Maybe it is meant to be a string and you forgot quotes?"),nil) nil rescue WEEL::Signal::Error => err self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,err,nil) nil rescue => err self.__weel_state = :stopping @__weel_connectionwrapper::inform_syntax_error(@__weel_connectionwrapper_args,err,nil) nil end end
Source
# File lib/weel.rb, line 1002 def __weel_recursive_continue(thread)# {{{ return unless thread if thread.alive? && thread[:continue] thread[:continue].continue end if thread.alive? && thread[:branch_event] thread[:mutex].synchronize do thread[:branch_event].continue unless thread[:branch_event].nil? end end if thread[:branches] thread[:branches].each do |b| __weel_recursive_continue(b) end end end
Source
# File lib/weel.rb, line 1018 def __weel_recursive_join(thread)# {{{ return unless thread if thread.alive? && thread != Thread.current thread.join end if thread[:branches] thread[:branches].each do |b| __weel_recursive_join(b) end end end
Source
# File lib/weel.rb, line 995 def __weel_recursive_print(thread,indent='')# {{{ if thread[:branches] thread[:branches].each do |b| __weel_recursive_print(b,indent+' ') end end end