module Temporal::Testing::TemporalOverride
Public Instance Methods
complete_activity(async_token, result = nil)
click to toggle source
Calls superclass method
# File lib/temporal/testing/temporal_override.rb, line 48 def complete_activity(async_token, result = nil) return super if Temporal::Testing.disabled? details = Activity::AsyncToken.decode(async_token) execution = executions[[details.workflow_id, details.run_id]] execution.complete_activity(async_token, result) end
fail_activity(async_token, exception)
click to toggle source
Calls superclass method
# File lib/temporal/testing/temporal_override.rb, line 57 def fail_activity(async_token, exception) return super if Temporal::Testing.disabled? details = Activity::AsyncToken.decode(async_token) execution = executions[[details.workflow_id, details.run_id]] execution.fail_activity(async_token, exception) end
fetch_workflow_execution_info(_namespace, workflow_id, run_id)
click to toggle source
Calls superclass method
# File lib/temporal/testing/temporal_override.rb, line 32 def fetch_workflow_execution_info(_namespace, workflow_id, run_id) return super if Temporal::Testing.disabled? execution = executions[[workflow_id, run_id]] Workflow::ExecutionInfo.new( workflow: nil, workflow_id: workflow_id, run_id: run_id, start_time: nil, close_time: nil, status: execution.status, history_length: nil, ).freeze end
schedule_workflow(workflow, cron_schedule, *input, **args)
click to toggle source
We don't support testing the actual cron schedules, but we will defer execution. You can simulate running these deferred with Temporal::Testing.execute_all_scheduled_workflows o Temporal::Testing.execute_scheduled_workflow, or assert against the cron schedule with Temporal::Testing.schedules.
Calls superclass method
# File lib/temporal/testing/temporal_override.rb, line 24 def schedule_workflow(workflow, cron_schedule, *input, **args) return super if Temporal::Testing.disabled? if Temporal::Testing.local? start_locally(workflow, cron_schedule, *input, **args) end end
start_workflow(workflow, *input, **args)
click to toggle source
Calls superclass method
# File lib/temporal/testing/temporal_override.rb, line 11 def start_workflow(workflow, *input, **args) return super if Temporal::Testing.disabled? if Temporal::Testing.local? start_locally(workflow, nil, *input, **args) end end
Private Instance Methods
allowed?(workflow_id, reuse_policy)
click to toggle source
# File lib/temporal/testing/temporal_override.rb, line 117 def allowed?(workflow_id, reuse_policy) disallowed_statuses = disallowed_statuses_for(reuse_policy) # there isn't a single execution in a dissallowed status executions.none? do |(w_id, _), execution| w_id == workflow_id && disallowed_statuses.include?(execution.status) end end
disallowed_statuses_for(reuse_policy)
click to toggle source
# File lib/temporal/testing/temporal_override.rb, line 133 def disallowed_statuses_for(reuse_policy) case reuse_policy when :allow_failed [Workflow::ExecutionInfo::RUNNING_STATUS, Workflow::ExecutionInfo::COMPLETED_STATUS] when :allow [Workflow::ExecutionInfo::RUNNING_STATUS] when :reject [ Workflow::ExecutionInfo::RUNNING_STATUS, Workflow::ExecutionInfo::FAILED_STATUS, Workflow::ExecutionInfo::COMPLETED_STATUS ] end end
executions()
click to toggle source
# File lib/temporal/testing/temporal_override.rb, line 68 def executions @executions ||= {} end
previous_run_id(workflow_id)
click to toggle source
# File lib/temporal/testing/temporal_override.rb, line 126 def previous_run_id(workflow_id) executions.each do |(w_id, run_id), _| return run_id if w_id == workflow_id end nil end
start_locally(workflow, schedule, *input, **args)
click to toggle source
# File lib/temporal/testing/temporal_override.rb, line 72 def start_locally(workflow, schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed workflow_id = options[:workflow_id] || SecureRandom.uuid run_id = SecureRandom.uuid if !allowed?(workflow_id, reuse_policy) raise Temporal::WorkflowExecutionAlreadyStartedFailure.new( "Workflow execution already started for id #{workflow_id}, reuse policy #{reuse_policy}", previous_run_id(workflow_id) ) end execution = WorkflowExecution.new executions[[workflow_id, run_id]] = execution execution_options = ExecutionOptions.new(workflow, options) metadata = Metadata::Workflow.new( name: workflow_id, run_id: run_id, attempt: 1, headers: execution_options.headers ) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, workflow.disabled_releases, metadata ) if schedule.nil? execution.run do workflow.execute_in_context(context, input) end else # Defer execution; in testing mode, it'll need to be invoked manually. Temporal::Testing::ScheduledWorkflows::Private::Store.add( workflow_id: workflow_id, cron_schedule: schedule, executor_lambda: lambda do execution.run do workflow.execute_in_context(context, input) end end, ) end run_id end