class Temporal::Workflow::TaskProcessor

Constants

MAX_FAILED_ATTEMPTS

Attributes

client[R]
metadata[R]
middleware_chain[R]
namespace[R]
task[R]
task_token[R]
workflow_class[R]
workflow_name[R]

Public Class Methods

new(task, namespace, workflow_lookup, client, middleware_chain) click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 12
def initialize(task, namespace, workflow_lookup, client, middleware_chain)
  @task = task
  @namespace = namespace
  @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace)
  @task_token = task.task_token
  @workflow_name = task.workflow_type.name
  @workflow_class = workflow_lookup.find(workflow_name)
  @client = client
  @middleware_chain = middleware_chain
end

Public Instance Methods

process() click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 23
def process
  start_time = Time.now

  Temporal.logger.debug("Processing Workflow task", metadata.to_h)
  Temporal.metrics.timing('workflow_task.queue_time', queue_time_ms, workflow: workflow_name)

  if !workflow_class
    raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker'
  end

  history = fetch_full_history
  # TODO: For sticky workflows we need to cache the Executor instance
  executor = Workflow::Executor.new(workflow_class, history)

  commands = middleware_chain.invoke(metadata) do
    executor.run
  end

  complete_task(commands)
rescue StandardError => error
  Temporal::ErrorHandler.handle(error, metadata: metadata)

  fail_task(error)
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name)
  Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

Private Instance Methods

complete_task(commands) click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 81
def complete_task(commands)
  Temporal.logger.info("Workflow task completed", metadata.to_h)

  client.respond_workflow_task_completed(task_token: task_token, commands: commands)
end
fail_task(error) click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 87
def fail_task(error)
  Temporal.logger.error("Workflow task failed", metadata.to_h.merge(error: error.inspect))
  Temporal.logger.debug(error.backtrace.join("\n"))

  # Only fail the workflow task on the first attempt. Subsequent failures of the same workflow task
  # should timeout. This is to avoid spinning on the failed workflow task as the service doesn't
  # yet exponentially backoff on retries.
  return if task.attempt > MAX_FAILED_ATTEMPTS

  client.respond_workflow_task_failed(
    task_token: task_token,
    cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND,
    exception: error
  )
rescue StandardError => error
  Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect))

  Temporal::ErrorHandler.handle(error, metadata: metadata)
end
fetch_full_history() click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 62
def fetch_full_history
  events = task.history.events.to_a
  next_page_token = task.next_page_token

  while !next_page_token.empty? do
    response = client.get_workflow_execution_history(
      namespace: namespace,
      workflow_id: task.workflow_execution.workflow_id,
      run_id: task.workflow_execution.run_id,
      next_page_token: next_page_token
    )

    events += response.history.events.to_a
    next_page_token = response.next_page_token
  end

  Workflow::History.new(events)
end
queue_time_ms() click to toggle source
# File lib/temporal/workflow/task_processor.rb, line 56
def queue_time_ms
  scheduled = task.scheduled_time.to_f
  started = task.started_time.to_f
  ((started - scheduled) * 1_000).round
end