class Temporal::Workflow::StateManager
Constants
- RELEASE_MARKER
- SIDE_EFFECT_MARKER
Attributes
command_tracker[R]
commands[R]
dispatcher[R]
local_time[R]
marker_ids[R]
releases[R]
side_effects[R]
Public Class Methods
new(dispatcher)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 23 def initialize(dispatcher) @dispatcher = dispatcher @commands = [] @marker_ids = Set.new @releases = {} @side_effects = [] @command_tracker = Hash.new { |hash, key| hash[key] = CommandStateMachine.new } @last_event_id = 0 @local_time = nil @replay = false end
Public Instance Methods
apply(history_window)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 75 def apply(history_window) @replay = history_window.replay? @local_time = history_window.local_time @last_event_id = history_window.last_event_id # handle markers first since their data is needed for processing events history_window.markers.each do |event| apply_event(event) end history_window.events.each do |event| apply_event(event) end end
next_side_effect()
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 71 def next_side_effect side_effects.shift end
release?(release_name)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 65 def release?(release_name) track_release(release_name) unless releases.key?(release_name) releases[release_name] end
replay?()
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 35 def replay? @replay end
schedule(command)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 39 def schedule(command) # Fast-forward event IDs to skip all the markers (version markers can # be removed, so we can't rely on them being scheduled during a replay) command_id = next_event_id while marker_ids.include?(command_id) do command_id = next_event_id end cancelation_id = case command when Command::ScheduleActivity command.activity_id ||= command_id when Command::StartChildWorkflow command.workflow_id ||= command_id when Command::StartTimer command.timer_id ||= command_id end state_machine = command_tracker[command_id] state_machine.requested if state_machine.state == CommandStateMachine::NEW_STATE commands << [command_id, command] return [event_target_from(command_id, command), cancelation_id] end
Private Instance Methods
apply_event(event)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 98 def apply_event(event) state_machine = command_tracker[event.originating_event_id] target = History::EventTarget.from_event(event) case event.type when 'WORKFLOW_EXECUTION_STARTED' state_machine.start dispatch( History::EventTarget.workflow, 'started', from_payloads(event.attributes.input), Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes) ) when 'WORKFLOW_EXECUTION_COMPLETED' # todo when 'WORKFLOW_EXECUTION_FAILED' # todo when 'WORKFLOW_EXECUTION_TIMED_OUT' # todo when 'WORKFLOW_TASK_SCHEDULED' # todo when 'WORKFLOW_TASK_STARTED' # todo when 'WORKFLOW_TASK_COMPLETED' # todo when 'WORKFLOW_TASK_TIMED_OUT' # todo when 'WORKFLOW_TASK_FAILED' # todo when 'ACTIVITY_TASK_SCHEDULED' state_machine.schedule discard_command(target) when 'ACTIVITY_TASK_STARTED' state_machine.start when 'ACTIVITY_TASK_COMPLETED' state_machine.complete dispatch(target, 'completed', from_result_payloads(event.attributes.result)) when 'ACTIVITY_TASK_FAILED' state_machine.fail dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) when 'ACTIVITY_TASK_TIMED_OUT' state_machine.time_out dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) when 'ACTIVITY_TASK_CANCEL_REQUESTED' state_machine.requested discard_command(target) when 'REQUEST_CANCEL_ACTIVITY_TASK_FAILED' state_machine.fail dispatch(target, 'failed', event.attributes.cause, nil) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) when 'TIMER_STARTED' state_machine.start discard_command(target) when 'TIMER_FIRED' state_machine.complete dispatch(target, 'fired') when 'CANCEL_TIMER_FAILED' state_machine.failed dispatch(target, 'failed', event.attributes.cause, nil) when 'TIMER_CANCELED' state_machine.cancel dispatch(target, 'canceled') when 'WORKFLOW_EXECUTION_CANCEL_REQUESTED' # todo when 'WORKFLOW_EXECUTION_CANCELED' # todo when 'REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED' # todo when 'REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED' # todo when 'EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED' # todo when 'MARKER_RECORDED' state_machine.complete handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data'])) when 'WORKFLOW_EXECUTION_SIGNALED' dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' # todo when 'START_CHILD_WORKFLOW_EXECUTION_INITIATED' state_machine.schedule discard_command(target) when 'START_CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail dispatch(target, 'failed', 'StandardError', from_payloads(event.attributes.cause)) when 'CHILD_WORKFLOW_EXECUTION_STARTED' state_machine.start when 'CHILD_WORKFLOW_EXECUTION_COMPLETED' state_machine.complete dispatch(target, 'completed', from_result_payloads(event.attributes.result)) when 'CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_CANCELED' state_machine.cancel dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_TIMED_OUT' state_machine.time_out dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_TERMINATED' # todo when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED' # todo when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED' # todo when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' # todo when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES' # todo else raise UnsupportedEvent, event.type end end
discard_command(target)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 284 def discard_command(target) # Pop the first command from the list, it is expected to match existing_command_id, existing_command = commands.shift if !existing_command_id raise NonDeterministicWorkflowError, "A command #{target} was not scheduled upon replay" end existing_target = event_target_from(existing_command_id, existing_command) if target != existing_target raise NonDeterministicWorkflowError, "Unexpected command #{existing_target} (expected #{target})" end end
dispatch(target, name, *attributes)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 280 def dispatch(target, name, *attributes) dispatcher.dispatch(target, name, attributes) end
event_target_from(command_id, command)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 258 def event_target_from(command_id, command) target_type = case command when Command::ScheduleActivity History::EventTarget::ACTIVITY_TYPE when Command::RequestActivityCancellation History::EventTarget::CANCEL_ACTIVITY_REQUEST_TYPE when Command::RecordMarker History::EventTarget::MARKER_TYPE when Command::StartTimer History::EventTarget::TIMER_TYPE when Command::CancelTimer History::EventTarget::CANCEL_TIMER_REQUEST_TYPE when Command::CompleteWorkflow, Command::FailWorkflow History::EventTarget::WORKFLOW_TYPE when Command::StartChildWorkflow History::EventTarget::CHILD_WORKFLOW_TYPE end History::EventTarget.new(command_id, target_type) end
handle_marker(id, type, details)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 298 def handle_marker(id, type, details) marker_ids << id case type when SIDE_EFFECT_MARKER side_effects << [id, details] when RELEASE_MARKER releases[details] = true else raise UnsupportedMarkerType, event.type end end
next_event_id()
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 94 def next_event_id @last_event_id += 1 end
track_release(release_name)
click to toggle source
# File lib/temporal/workflow/state_manager.rb, line 311 def track_release(release_name) # replay does not allow untracked (via marker) releases if replay? releases[release_name] = false else releases[release_name] = true schedule(Command::RecordMarker.new(name: RELEASE_MARKER, details: release_name)) end end