module Temporal

This class is responsible for matching an executable (activity or workflow) name to a class implementing it.

TODO: This class should be responsible for handling executable versions

when these are implemented

Provides context for Temporal::Activity::WorkflowConvenienceMethods

This class implements a very simple ThreadPool with the ability to block until at least one thread becomes available. This allows Pollers to only poll when there's an available thread in the pool.

NOTE: There's a minor race condition that can occur between calling

#wait_for_available_threads and #schedule, but should be rare

This context class is available in the workflow implementation and provides context and methods for interacting with Temporal

Constants

VERSION

Public Class Methods

await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil) click to toggle source

Long polls for a workflow to be completed and returns whatever the execute function returned. This function times out after 30 seconds and throws Temporal::TimeoutError, not to be confused with Temporal::WorkflowTimedOut which reports that the workflow itself timed out. run_id of nil: await the entire workflow completion. This can span multiple runs in the case where the workflow uses continue-as-new. timeout: seconds to wait for the result. This cannot be longer than 30 seconds because that is the maximum the server supports. namespace: if nil, choose the one declared on the Workflow, or the global default

# File lib/temporal.rb, line 96
def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil)
  options = namespace ? {namespace: namespace} : {}
  execution_options = ExecutionOptions.new(workflow, options)
  max_timeout = Temporal::Client::GRPCClient::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
  history_response = nil
  begin
    history_response = client.get_workflow_execution_history(
      namespace: execution_options.namespace,
      workflow_id: workflow_id,
      run_id: run_id,
      wait_for_new_event: true,
      event_type: :close,
      timeout: timeout || max_timeout,
    )
  rescue GRPC::DeadlineExceeded => e
    message = if timeout 
      "Timed out after your specified limit of timeout: #{timeout} seconds"
    else
      "Timed out after #{max_timeout} seconds, which is the maximum supported amount."
    end
    raise TimeoutError.new(message)
  end
  history = Workflow::History.new(history_response.history.events)
  closed_event = history.events.first
  case closed_event.type
  when 'WORKFLOW_EXECUTION_COMPLETED'
    payloads = closed_event.attributes.result
    return ResultConverter.from_result_payloads(payloads)
  when 'WORKFLOW_EXECUTION_TIMED_OUT'
    raise Temporal::WorkflowTimedOut
  when 'WORKFLOW_EXECUTION_TERMINATED'
    raise Temporal::WorkflowTerminated
  when 'WORKFLOW_EXECUTION_CANCELED'
    raise Temporal::WorkflowCanceled
  when 'WORKFLOW_EXECUTION_FAILED'
    raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure)
  when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW'
    new_run_id = closed_event.attributes.new_execution_run_id
    # Throw to let the caller know they're not getting the result
    # they wanted.  They can re-call with the new run_id to poll.
    raise Temporal::WorkflowRunContinuedAsNew.new(new_run_id: new_run_id)
  else
    raise NotImplementedError, "Unexpected event type #{closed_event.type}."
  end
end
complete_activity(async_token, result = nil) click to toggle source
# File lib/temporal.rb, line 179
def complete_activity(async_token, result = nil)
  details = Activity::AsyncToken.decode(async_token)

  client.respond_activity_task_completed_by_id(
    namespace: details.namespace,
    activity_id: details.activity_id,
    workflow_id: details.workflow_id,
    run_id: details.run_id,
    result: result
  )
end
configuration() click to toggle source
# File lib/temporal.rb, line 207
def configuration
  @configuration ||= Configuration.new
end
configure() { |configuration| ... } click to toggle source
# File lib/temporal.rb, line 203
def configure(&block)
  yield configuration
end
fail_activity(async_token, exception) click to toggle source
# File lib/temporal.rb, line 191
def fail_activity(async_token, exception)
  details = Activity::AsyncToken.decode(async_token)

  client.respond_activity_task_failed_by_id(
    namespace: details.namespace,
    activity_id: details.activity_id,
    workflow_id: details.workflow_id,
    run_id: details.run_id,
    exception: exception
  )
end
fetch_workflow_execution_info(namespace, workflow_id, run_id) click to toggle source
# File lib/temporal.rb, line 169
def fetch_workflow_execution_info(namespace, workflow_id, run_id)
  response = client.describe_workflow_execution(
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id
  )

  Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
end
logger() click to toggle source
# File lib/temporal.rb, line 211
def logger
  configuration.logger
end
metrics() click to toggle source
# File lib/temporal.rb, line 215
def metrics
  @metrics ||= Metrics.new(configuration.metrics_adapter)
end
register_namespace(name, description = nil) click to toggle source
# File lib/temporal.rb, line 71
def register_namespace(name, description = nil)
  client.register_namespace(name: name, description: description)
end
reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason: 'manual reset') click to toggle source
# File lib/temporal.rb, line 142
def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason: 'manual reset')
  workflow_task_id ||= get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
  raise Error, 'Could not find a completed workflow task event' unless workflow_task_id

  response = client.reset_workflow_execution(
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    reason: reason,
    workflow_task_event_id: workflow_task_id
  )

  response.run_id
end
schedule_workflow(workflow, cron_schedule, *input, **args) click to toggle source
# File lib/temporal.rb, line 44
def schedule_workflow(workflow, cron_schedule, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow, options)
  workflow_id = options[:workflow_id] || SecureRandom.uuid

  response = client.start_workflow_execution(
    namespace: execution_options.namespace,
    workflow_id: workflow_id,
    workflow_name: execution_options.name,
    task_queue: execution_options.task_queue,
    input: input,
    execution_timeout: execution_options.timeouts[:execution],
    # Execution timeout is across all scheduled jobs, whereas run is for an individual run.
    # This default is here for backward compatibility.  Certainly, the run timeout shouldn't be higher
    # than the execution timeout.
    run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
    task_timeout: execution_options.timeouts[:task],
    workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
    headers: execution_options.headers,
    cron_schedule: cron_schedule
  )

  response.run_id
end
signal_workflow(workflow, signal, workflow_id, run_id, input = nil) click to toggle source
# File lib/temporal.rb, line 75
def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
  execution_options = ExecutionOptions.new(workflow)

  client.signal_workflow_execution(
    namespace: execution_options.namespace, # TODO: allow passing namespace instead
    workflow_id: workflow_id,
    run_id: run_id,
    signal: signal,
    input: input
  )
end
start_workflow(workflow, *input, **args) click to toggle source
# File lib/temporal.rb, line 20
def start_workflow(workflow, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow, options)
  workflow_id = options[:workflow_id] || SecureRandom.uuid

  response = client.start_workflow_execution(
    namespace: execution_options.namespace,
    workflow_id: workflow_id,
    workflow_name: execution_options.name,
    task_queue: execution_options.task_queue,
    input: input,
    execution_timeout: execution_options.timeouts[:execution],
    # If unspecified, individual runs should have the full time for the execution (which includes retries).
    run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
    task_timeout: execution_options.timeouts[:task],
    workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
    headers: execution_options.headers
  )

  response.run_id
end
terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) click to toggle source
# File lib/temporal.rb, line 157
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
  namespace ||= Temporal.configuration.namespace

  client.terminate_workflow_execution(
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    reason: reason,
    details: details
  )
end

Private Class Methods

client() click to toggle source
# File lib/temporal.rb, line 226
def client
  @client ||= Temporal::Client.generate
end
get_last_completed_workflow_task_id(namespace, workflow_id, run_id) click to toggle source
# File lib/temporal.rb, line 230
def get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
  history_response = client.get_workflow_execution_history(
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id
  )
  history = Workflow::History.new(history_response.history.events)
  workflow_task_event = history.get_last_completed_workflow_task
  workflow_task_event&.id
end