class Temporal::Workflow::Context

Attributes

dispatcher[R]
metadata[R]
state_manager[R]
workflow_class[R]

Public Class Methods

new(state_manager, dispatcher, workflow_class, metadata) click to toggle source
# File lib/temporal/workflow/context.rb, line 20
def initialize(state_manager, dispatcher, workflow_class, metadata)
  @state_manager = state_manager
  @dispatcher = dispatcher
  @workflow_class = workflow_class
  @metadata = metadata
  @completed = false
end

Public Instance Methods

cancel(target, cancelation_id) click to toggle source
# File lib/temporal/workflow/context.rb, line 247
def cancel(target, cancelation_id)
  case target.type
  when History::EventTarget::ACTIVITY_TYPE
    cancel_activity(cancelation_id)
  when History::EventTarget::TIMER_TYPE
    cancel_timer(cancelation_id)
  else
    raise "#{target} can not be canceled"
  end
end
cancel_activity(activity_id) click to toggle source
# File lib/temporal/workflow/context.rb, line 241
def cancel_activity(activity_id)
  command = Command::RequestActivityCancellation.new(activity_id: activity_id)

  schedule_command(command)
end
cancel_timer(timer_id) click to toggle source
# File lib/temporal/workflow/context.rb, line 174
def cancel_timer(timer_id)
  command = Command::CancelTimer.new(timer_id: timer_id)
  schedule_command(command)
end
complete(result = nil) click to toggle source

TODO: check if workflow can be completed

# File lib/temporal/workflow/context.rb, line 180
def complete(result = nil)
  command = Command::CompleteWorkflow.new(result: result)
  schedule_command(command)
  completed!
end
completed?() click to toggle source
# File lib/temporal/workflow/context.rb, line 28
def completed?
  @completed
end
continue_as_new(*input, **args) click to toggle source
# File lib/temporal/workflow/context.rb, line 193
def continue_as_new(*input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow_class, options)

  command = Command::ContinueAsNew.new(
    workflow_type: execution_options.name,
    task_queue: execution_options.task_queue,
    input: input,
    timeouts: execution_options.timeouts,
    retry_policy: execution_options.retry_policy,
    headers: execution_options.headers
  )
  schedule_command(command)
  completed!
end
execute_activity(activity_class, *input, **args) click to toggle source
# File lib/temporal/workflow/context.rb, line 46
def execute_activity(activity_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(activity_class, options)

  command = Command::ScheduleActivity.new(
    activity_id: options[:activity_id],
    activity_type: execution_options.name,
    input: input,
    namespace: execution_options.namespace,
    task_queue: execution_options.task_queue,
    retry_policy: execution_options.retry_policy,
    timeouts: execution_options.timeouts,
    headers: execution_options.headers
  )

  target, cancelation_id = schedule_command(command)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'completed') do |result|
    future.set(result)
    future.success_callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'failed') do |exception|
    future.fail(exception)
    future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) }
  end

  future
end
execute_activity!(activity_class, *input, **args) click to toggle source
# File lib/temporal/workflow/context.rb, line 79
def execute_activity!(activity_class, *input, **args)
  future = execute_activity(activity_class, *input, **args)
  result = future.get

  raise result if future.failed?

  result
end
execute_local_activity(activity_class, *input, **args) click to toggle source

TODO: how to handle failures?

# File lib/temporal/workflow/context.rb, line 89
def execute_local_activity(activity_class, *input, **args)
  input << args unless args.empty?

  side_effect do
    # TODO: this probably requires a local context implementation
    context = Activity::Context.new(nil, nil)
    activity_class.execute_in_context(context, input)
  end
end
execute_workflow(workflow_class, *input, **args) click to toggle source
# File lib/temporal/workflow/context.rb, line 99
def execute_workflow(workflow_class, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow_class, options)

  command = Command::StartChildWorkflow.new(
    workflow_id: options[:workflow_id] || SecureRandom.uuid,
    workflow_type: execution_options.name,
    input: input,
    namespace: execution_options.namespace,
    task_queue: execution_options.task_queue,
    retry_policy: execution_options.retry_policy,
    timeouts: execution_options.timeouts,
    headers: execution_options.headers
  )

  target, cancelation_id = schedule_command(command)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'completed') do |result|
    future.set(result)
    future.success_callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'failed') do |exception|
    future.fail(exception)
    future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) }
  end

  future
end
execute_workflow!(workflow_class, *input, **args) click to toggle source
# File lib/temporal/workflow/context.rb, line 132
def execute_workflow!(workflow_class, *input, **args)
  future = execute_workflow(workflow_class, *input, **args)
  result = future.get

  raise result if future.failed?

  result
end
fail(exception) click to toggle source

TODO: check if workflow can be failed

# File lib/temporal/workflow/context.rb, line 187
def fail(exception)
  command = Command::FailWorkflow.new(exception: exception)
  schedule_command(command)
  completed!
end
has_release?(release_name) click to toggle source
# File lib/temporal/workflow/context.rb, line 42
def has_release?(release_name)
  state_manager.release?(release_name.to_s)
end
headers() click to toggle source
# File lib/temporal/workflow/context.rb, line 38
def headers
  metadata.headers
end
logger() click to toggle source
# File lib/temporal/workflow/context.rb, line 32
def logger
  @logger ||= ReplayAwareLogger.new(Temporal.logger)
  @logger.replay = state_manager.replay?
  @logger
end
now() click to toggle source
# File lib/temporal/workflow/context.rb, line 229
def now
  state_manager.local_time
end
on_signal(&block) click to toggle source
# File lib/temporal/workflow/context.rb, line 233
def on_signal(&block)
  target = History::EventTarget.workflow

  dispatcher.register_handler(target, 'signaled') do |signal, input|
    call_in_fiber(block, signal, input)
  end
end
side_effect(&block) click to toggle source
# File lib/temporal/workflow/context.rb, line 141
def side_effect(&block)
  marker = state_manager.next_side_effect
  return marker.last if marker

  result = block.call
  command = Command::RecordMarker.new(name: StateManager::SIDE_EFFECT_MARKER, details: result)
  schedule_command(command)

  result
end
sleep(timeout) click to toggle source
# File lib/temporal/workflow/context.rb, line 152
def sleep(timeout)
  start_timer(timeout).wait
end
start_timer(timeout, timer_id = nil) click to toggle source
# File lib/temporal/workflow/context.rb, line 156
def start_timer(timeout, timer_id = nil)
  command = Command::StartTimer.new(timeout: timeout, timer_id: timer_id)
  target, cancelation_id = schedule_command(command)
  future = Future.new(target, self, cancelation_id: cancelation_id)

  dispatcher.register_handler(target, 'fired') do |result|
    future.set(result)
    future.success_callbacks.each { |callback| call_in_fiber(callback, result) }
  end

  dispatcher.register_handler(target, 'canceled') do |exception|
    future.fail(exception)
    future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) }
  end

  future
end
wait_for(future) click to toggle source
# File lib/temporal/workflow/context.rb, line 217
def wait_for(future)
  fiber = Fiber.current

  dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
    fiber.resume if future.finished?
  end

  Fiber.yield

  return
end
wait_for_all(*futures) click to toggle source
# File lib/temporal/workflow/context.rb, line 211
def wait_for_all(*futures)
  futures.each(&:wait)

  return
end

Private Instance Methods

call_in_fiber(block, *args) click to toggle source
# File lib/temporal/workflow/context.rb, line 270
def call_in_fiber(block, *args)
  Fiber.new do
    Temporal::ThreadLocalContext.set(self)
    block.call(*args)
  end.resume
end
completed!() click to toggle source
# File lib/temporal/workflow/context.rb, line 262
def completed!
  @completed = true
end
schedule_command(command) click to toggle source
# File lib/temporal/workflow/context.rb, line 266
def schedule_command(command)
  state_manager.schedule(command)
end