class Temporal::Client::GRPCClient

Constants

HISTORY_EVENT_FILTER
SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
WORKFLOW_ID_REUSE_POLICY

Attributes

identity[R]
poll_mutex[R]
poll_request[R]
url[R]

Public Class Methods

new(host, port, identity) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 26
def initialize(host, port, identity)
  @url = "#{host}:#{port}"
  @identity = identity
  @poll = true
  @poll_mutex = Mutex.new
  @poll_request = nil
end

Public Instance Methods

cancel_polling_request() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 396
def cancel_polling_request
  poll_mutex.synchronize do
    @poll = false
    poll_request&.cancel
  end
end
count_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 353
def count_workflow_executions
  raise NotImplementedError
end
deprecate_namespace(name:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 68
def deprecate_namespace(name:)
  request = Temporal::Api::WorkflowService::V1::DeprecateNamespaceRequest.new(namespace: name)
  client.deprecate_namespace(request)
end
describe_namespace(name:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 48
def describe_namespace(name:)
  request = Temporal::Api::WorkflowService::V1::DescribeNamespaceRequest.new(namespace: name)
  client.describe_namespace(request)
end
describe_task_queue(namespace:, task_queue:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 384
def describe_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::DescribeTaskQueueRequest.new(
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    ),
    task_queue_type: Temporal::Api::Enums::V1::TaskQueueType::Workflow,
    include_task_queue_status: true
  )
  client.describe_task_queue(request)
end
describe_workflow_execution(namespace:, workflow_id:, run_id:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 373
def describe_workflow_execution(namespace:, workflow_id:, run_id:)
  request = Temporal::Api::WorkflowService::V1::DescribeWorkflowExecutionRequest.new(
    namespace: namespace,
    execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    )
  )
  client.describe_workflow_execution(request)
end
get_search_attributes() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 357
def get_search_attributes
  raise NotImplementedError
end
get_workflow_execution_history( namespace:, workflow_id:, run_id:, next_page_token: nil, wait_for_new_event: false, event_type: :all, timeout: nil ) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 123
def get_workflow_execution_history(
  namespace:,
  workflow_id:,
  run_id:,
  next_page_token: nil,
  wait_for_new_event: false,
  event_type: :all,
  timeout: nil
)
  if wait_for_new_event 
    if timeout.nil?
      # This is an internal error.  Wrappers should enforce this.
      raise "You must specify a timeout when wait_for_new_event = true."
    elsif timeout > SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
      raise ClientError.new(
        "You may not specify a timeout of more than #{SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL} seconds, got: #{timeout}."
      )
    end
  end
  request = Temporal::Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new(
    namespace: namespace,
    execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    ),
    next_page_token: next_page_token,
    wait_new_event: wait_for_new_event,
    history_event_filter_type: HISTORY_EVENT_FILTER[event_type]
  )
  deadline = timeout ? Time.now + timeout : nil
  client.get_workflow_execution_history(request, deadline: deadline)
end
list_archived_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 345
def list_archived_workflow_executions
  raise NotImplementedError
end
list_closed_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 337
def list_closed_workflow_executions
  raise NotImplementedError
end
list_namespaces(page_size:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 53
def list_namespaces(page_size:)
  request = Temporal::Api::WorkflowService::V1::ListNamespacesRequest.new(pageSize: page_size)
  client.list_namespaces(request)
end
list_open_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 333
def list_open_workflow_executions
  raise NotImplementedError
end
list_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 341
def list_workflow_executions
  raise NotImplementedError
end
poll_activity_task_queue(namespace:, task_queue:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 192
def poll_activity_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new(
    identity: identity,
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    )
  )

  poll_mutex.synchronize do
    return unless can_poll?
    @poll_request = client.poll_activity_task_queue(request, return_op: true)
  end

  poll_request.execute
end
poll_workflow_task_queue(namespace:, task_queue:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 156
def poll_workflow_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::PollWorkflowTaskQueueRequest.new(
    identity: identity,
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    )
  )

  poll_mutex.synchronize do
    return unless can_poll?
    @poll_request = client.poll_workflow_task_queue(request, return_op: true)
  end

  poll_request.execute
end
query_workflow() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 369
def query_workflow
  raise NotImplementedError
end
record_activity_task_heartbeat(task_token:, details: nil) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 209
def record_activity_task_heartbeat(task_token:, details: nil)
  request = Temporal::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new(
    task_token: task_token,
    details: to_details_payloads(details),
    identity: identity
  )
  client.record_activity_task_heartbeat(request)
end
record_activity_task_heartbeat_by_id() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 218
def record_activity_task_heartbeat_by_id
  raise NotImplementedError
end
register_namespace(name:, description: nil, global: false, retention_period: 10) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 34
def register_namespace(name:, description: nil, global: false, retention_period: 10)
  request = Temporal::Api::WorkflowService::V1::RegisterNamespaceRequest.new(
    namespace: name,
    description: description,
    is_global_namespace: global,
    workflow_execution_retention_period: Google::Protobuf::Duration.new(
      seconds: retention_period * 24 * 60 * 60
    )
  )
  client.register_namespace(request)
rescue GRPC::AlreadyExists => e
  raise Temporal::NamespaceAlreadyExistsFailure, e.details
end
request_cancel_workflow_execution() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 277
def request_cancel_workflow_execution
  raise NotImplementedError
end
reset_sticky_task_queue() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 365
def reset_sticky_task_queue
  raise NotImplementedError
end
reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 299
def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
  request = Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionRequest.new(
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id,
    ),
    reason: reason,
    workflow_task_finish_event_id: workflow_task_event_id
  )
  client.reset_workflow_execution(request)
end
respond_activity_task_canceled(task_token:, details: nil) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 264
def respond_activity_task_canceled(task_token:, details: nil)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new(
    task_token: task_token,
    details: to_details_payloads(details),
    identity: identity
  )
  client.respond_activity_task_canceled(request)
end
respond_activity_task_canceled_by_id() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 273
def respond_activity_task_canceled_by_id
  raise NotImplementedError
end
respond_activity_task_completed(task_token:, result:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 222
def respond_activity_task_completed(task_token:, result:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedRequest.new(
    identity: identity,
    task_token: task_token,
    result: to_result_payloads(result),
  )
  client.respond_activity_task_completed(request)
end
respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 231
def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedByIdRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    activity_id: activity_id,
    result: to_result_payloads(result)
  )
  client.respond_activity_task_completed_by_id(request)
end
respond_activity_task_failed(task_token:, exception:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 243
def respond_activity_task_failed(task_token:, exception:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedRequest.new(
    identity: identity,
    task_token: task_token,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_activity_task_failed(request)
end
respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 252
def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    activity_id: activity_id,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_activity_task_failed_by_id(request)
end
respond_query_task_completed() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 361
def respond_query_task_completed
  raise NotImplementedError
end
respond_workflow_task_completed(task_token:, commands:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 173
def respond_workflow_task_completed(task_token:, commands:)
  request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new(
    identity: identity,
    task_token: task_token,
    commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }
  )
  client.respond_workflow_task_completed(request)
end
respond_workflow_task_failed(task_token:, cause:, exception: nil) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 182
def respond_workflow_task_failed(task_token:, cause:, exception: nil)
  request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskFailedRequest.new(
    identity: identity,
    task_token: task_token,
    cause: cause,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_workflow_task_failed(request)
end
scan_workflow_executions() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 349
def scan_workflow_executions
  raise NotImplementedError
end
signal_with_start_workflow_execution() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 295
def signal_with_start_workflow_execution
  raise NotImplementedError
end
signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 281
def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil)
  request = Temporal::Api::WorkflowService::V1::SignalWorkflowExecutionRequest.new(
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    ),
    signal_name: signal,
    input: to_signal_payloads(input),
    identity: identity
  )
  client.signal_workflow_execution(request)
end
start_workflow_execution( namespace:, workflow_id:, workflow_name:, task_queue:, input: nil, execution_timeout:, run_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil ) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 73
def start_workflow_execution(
  namespace:,
  workflow_id:,
  workflow_name:,
  task_queue:,
  input: nil,
  execution_timeout:,
  run_timeout:,
  task_timeout:,
  workflow_id_reuse_policy: nil,
  headers: nil,
  cron_schedule: nil
)
  request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_type: Temporal::Api::Common::V1::WorkflowType.new(
      name: workflow_name
    ),
    workflow_id: workflow_id,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    ),
    input: to_payloads(input),
    workflow_execution_timeout: execution_timeout,
    workflow_run_timeout: run_timeout,
    workflow_task_timeout: task_timeout,
    request_id: SecureRandom.uuid,
    header: Temporal::Api::Common::V1::Header.new(
      fields: headers
    ),
    cron_schedule: cron_schedule
  )

  if workflow_id_reuse_policy
    policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy]
    raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy

    request.workflow_id_reuse_policy = policy
  end

  client.start_workflow_execution(request)
rescue GRPC::AlreadyExists => e
  # Feel like there should be cleaner way to do this...
  run_id = e.details[/RunId: ([a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+)/, 1]
  raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, run_id)
end
terminate_workflow_execution( namespace:, workflow_id:, run_id:, reason: nil, details: nil ) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 312
def terminate_workflow_execution(
  namespace:,
  workflow_id:,
  run_id:,
  reason: nil,
  details: nil
)
  request = Temporal::Api::WorkflowService::V1::TerminateWorkflowExecutionRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id,
    ),
    reason: reason,
    details: to_details_payloads(details)
  )

  client.terminate_workflow_execution(request)
end
update_namespace(name:, description:) click to toggle source
# File lib/temporal/client/grpc_client.rb, line 58
def update_namespace(name:, description:)
  request = Temporal::Api::WorkflowService::V1::UpdateNamespaceRequest.new(
    namespace: name,
    update_info: Temporal::Api::WorkflowService::V1::UpdateNamespaceInfo.new(
      description: description
    )
  )
  client.update_namespace(request)
end

Private Instance Methods

can_poll?() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 415
def can_poll?
  @poll
end
client() click to toggle source
# File lib/temporal/client/grpc_client.rb, line 407
def client
  @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new(
    url,
    :this_channel_is_insecure,
    timeout: 60
  )
end