class Temporal::Client::GRPCClient
Constants
- HISTORY_EVENT_FILTER
- SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
- WORKFLOW_ID_REUSE_POLICY
Attributes
identity[R]
poll_mutex[R]
poll_request[R]
url[R]
Public Class Methods
new(host, port, identity)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 26 def initialize(host, port, identity) @url = "#{host}:#{port}" @identity = identity @poll = true @poll_mutex = Mutex.new @poll_request = nil end
Public Instance Methods
cancel_polling_request()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 396 def cancel_polling_request poll_mutex.synchronize do @poll = false poll_request&.cancel end end
count_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 353 def count_workflow_executions raise NotImplementedError end
deprecate_namespace(name:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 68 def deprecate_namespace(name:) request = Temporal::Api::WorkflowService::V1::DeprecateNamespaceRequest.new(namespace: name) client.deprecate_namespace(request) end
describe_namespace(name:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 48 def describe_namespace(name:) request = Temporal::Api::WorkflowService::V1::DescribeNamespaceRequest.new(namespace: name) client.describe_namespace(request) end
describe_task_queue(namespace:, task_queue:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 384 def describe_task_queue(namespace:, task_queue:) request = Temporal::Api::WorkflowService::V1::DescribeTaskQueueRequest.new( namespace: namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), task_queue_type: Temporal::Api::Enums::V1::TaskQueueType::Workflow, include_task_queue_status: true ) client.describe_task_queue(request) end
describe_workflow_execution(namespace:, workflow_id:, run_id:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 373 def describe_workflow_execution(namespace:, workflow_id:, run_id:) request = Temporal::Api::WorkflowService::V1::DescribeWorkflowExecutionRequest.new( namespace: namespace, execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, run_id: run_id ) ) client.describe_workflow_execution(request) end
get_search_attributes()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 357 def get_search_attributes raise NotImplementedError end
get_workflow_execution_history( namespace:, workflow_id:, run_id:, next_page_token: nil, wait_for_new_event: false, event_type: :all, timeout: nil )
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 123 def get_workflow_execution_history( namespace:, workflow_id:, run_id:, next_page_token: nil, wait_for_new_event: false, event_type: :all, timeout: nil ) if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. raise "You must specify a timeout when wait_for_new_event = true." elsif timeout > SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL raise ClientError.new( "You may not specify a timeout of more than #{SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL} seconds, got: #{timeout}." ) end end request = Temporal::Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new( namespace: namespace, execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, run_id: run_id ), next_page_token: next_page_token, wait_new_event: wait_for_new_event, history_event_filter_type: HISTORY_EVENT_FILTER[event_type] ) deadline = timeout ? Time.now + timeout : nil client.get_workflow_execution_history(request, deadline: deadline) end
list_archived_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 345 def list_archived_workflow_executions raise NotImplementedError end
list_closed_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 337 def list_closed_workflow_executions raise NotImplementedError end
list_namespaces(page_size:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 53 def list_namespaces(page_size:) request = Temporal::Api::WorkflowService::V1::ListNamespacesRequest.new(pageSize: page_size) client.list_namespaces(request) end
list_open_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 333 def list_open_workflow_executions raise NotImplementedError end
list_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 341 def list_workflow_executions raise NotImplementedError end
poll_activity_task_queue(namespace:, task_queue:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 192 def poll_activity_task_queue(namespace:, task_queue:) request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new( identity: identity, namespace: namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ) ) poll_mutex.synchronize do return unless can_poll? @poll_request = client.poll_activity_task_queue(request, return_op: true) end poll_request.execute end
poll_workflow_task_queue(namespace:, task_queue:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 156 def poll_workflow_task_queue(namespace:, task_queue:) request = Temporal::Api::WorkflowService::V1::PollWorkflowTaskQueueRequest.new( identity: identity, namespace: namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ) ) poll_mutex.synchronize do return unless can_poll? @poll_request = client.poll_workflow_task_queue(request, return_op: true) end poll_request.execute end
query_workflow()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 369 def query_workflow raise NotImplementedError end
record_activity_task_heartbeat(task_token:, details: nil)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 209 def record_activity_task_heartbeat(task_token:, details: nil) request = Temporal::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new( task_token: task_token, details: to_details_payloads(details), identity: identity ) client.record_activity_task_heartbeat(request) end
record_activity_task_heartbeat_by_id()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 218 def record_activity_task_heartbeat_by_id raise NotImplementedError end
register_namespace(name:, description: nil, global: false, retention_period: 10)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 34 def register_namespace(name:, description: nil, global: false, retention_period: 10) request = Temporal::Api::WorkflowService::V1::RegisterNamespaceRequest.new( namespace: name, description: description, is_global_namespace: global, workflow_execution_retention_period: Google::Protobuf::Duration.new( seconds: retention_period * 24 * 60 * 60 ) ) client.register_namespace(request) rescue GRPC::AlreadyExists => e raise Temporal::NamespaceAlreadyExistsFailure, e.details end
request_cancel_workflow_execution()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 277 def request_cancel_workflow_execution raise NotImplementedError end
reset_sticky_task_queue()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 365 def reset_sticky_task_queue raise NotImplementedError end
reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 299 def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:) request = Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionRequest.new( namespace: namespace, workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, run_id: run_id, ), reason: reason, workflow_task_finish_event_id: workflow_task_event_id ) client.reset_workflow_execution(request) end
respond_activity_task_canceled(task_token:, details: nil)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 264 def respond_activity_task_canceled(task_token:, details: nil) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new( task_token: task_token, details: to_details_payloads(details), identity: identity ) client.respond_activity_task_canceled(request) end
respond_activity_task_canceled_by_id()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 273 def respond_activity_task_canceled_by_id raise NotImplementedError end
respond_activity_task_completed(task_token:, result:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 222 def respond_activity_task_completed(task_token:, result:) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedRequest.new( identity: identity, task_token: task_token, result: to_result_payloads(result), ) client.respond_activity_task_completed(request) end
respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 231 def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedByIdRequest.new( identity: identity, namespace: namespace, workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, result: to_result_payloads(result) ) client.respond_activity_task_completed_by_id(request) end
respond_activity_task_failed(task_token:, exception:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 243 def respond_activity_task_failed(task_token:, exception:) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedRequest.new( identity: identity, task_token: task_token, failure: Serializer::Failure.new(exception).to_proto ) client.respond_activity_task_failed(request) end
respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 252 def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new( identity: identity, namespace: namespace, workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, failure: Serializer::Failure.new(exception).to_proto ) client.respond_activity_task_failed_by_id(request) end
respond_query_task_completed()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 361 def respond_query_task_completed raise NotImplementedError end
respond_workflow_task_completed(task_token:, commands:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 173 def respond_workflow_task_completed(task_token:, commands:) request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( identity: identity, task_token: task_token, commands: Array(commands).map { |(_, command)| Serializer.serialize(command) } ) client.respond_workflow_task_completed(request) end
respond_workflow_task_failed(task_token:, cause:, exception: nil)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 182 def respond_workflow_task_failed(task_token:, cause:, exception: nil) request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskFailedRequest.new( identity: identity, task_token: task_token, cause: cause, failure: Serializer::Failure.new(exception).to_proto ) client.respond_workflow_task_failed(request) end
scan_workflow_executions()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 349 def scan_workflow_executions raise NotImplementedError end
signal_with_start_workflow_execution()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 295 def signal_with_start_workflow_execution raise NotImplementedError end
signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 281 def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil) request = Temporal::Api::WorkflowService::V1::SignalWorkflowExecutionRequest.new( namespace: namespace, workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, run_id: run_id ), signal_name: signal, input: to_signal_payloads(input), identity: identity ) client.signal_workflow_execution(request) end
start_workflow_execution( namespace:, workflow_id:, workflow_name:, task_queue:, input: nil, execution_timeout:, run_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil )
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 73 def start_workflow_execution( namespace:, workflow_id:, workflow_name:, task_queue:, input: nil, execution_timeout:, run_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil ) request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new( identity: identity, namespace: namespace, workflow_type: Temporal::Api::Common::V1::WorkflowType.new( name: workflow_name ), workflow_id: workflow_id, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), input: to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporal::Api::Common::V1::Header.new( fields: headers ), cron_schedule: cron_schedule ) if workflow_id_reuse_policy policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy] raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy request.workflow_id_reuse_policy = policy end client.start_workflow_execution(request) rescue GRPC::AlreadyExists => e # Feel like there should be cleaner way to do this... run_id = e.details[/RunId: ([a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+)/, 1] raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, run_id) end
terminate_workflow_execution( namespace:, workflow_id:, run_id:, reason: nil, details: nil )
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 312 def terminate_workflow_execution( namespace:, workflow_id:, run_id:, reason: nil, details: nil ) request = Temporal::Api::WorkflowService::V1::TerminateWorkflowExecutionRequest.new( identity: identity, namespace: namespace, workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, run_id: run_id, ), reason: reason, details: to_details_payloads(details) ) client.terminate_workflow_execution(request) end
update_namespace(name:, description:)
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 58 def update_namespace(name:, description:) request = Temporal::Api::WorkflowService::V1::UpdateNamespaceRequest.new( namespace: name, update_info: Temporal::Api::WorkflowService::V1::UpdateNamespaceInfo.new( description: description ) ) client.update_namespace(request) end
Private Instance Methods
can_poll?()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 415 def can_poll? @poll end
client()
click to toggle source
# File lib/temporal/client/grpc_client.rb, line 407 def client @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new( url, :this_channel_is_insecure, timeout: 60 ) end