class Temporal::Worker
Attributes
activities[R]
activity_middleware[R]
activity_poller_options[R]
pollers[R]
workflow_poller_options[R]
workflow_task_middleware[R]
workflows[R]
Public Class Methods
new( activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size] )
click to toggle source
activity_thread_pool_size: number of threads that the poller can use to run activities.
can be set to 1 if you want no paralellism in your activities, at the cost of throughput.
# File lib/temporal/worker.rb, line 12 def initialize( activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size] ) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @workflow_task_middleware = [] @activity_middleware = [] @shutting_down = false @activity_poller_options = { thread_pool_size: activity_thread_pool_size, } @workflow_poller_options = { thread_pool_size: workflow_thread_pool_size, } end
Public Instance Methods
add_activity_middleware(middleware_class, *args)
click to toggle source
# File lib/temporal/worker.rb, line 48 def add_activity_middleware(middleware_class, *args) @activity_middleware << Middleware::Entry.new(middleware_class, args) end
add_workflow_task_middleware(middleware_class, *args)
click to toggle source
# File lib/temporal/worker.rb, line 44 def add_workflow_task_middleware(middleware_class, *args) @workflow_task_middleware << Middleware::Entry.new(middleware_class, args) end
register_activity(activity_class, options = {})
click to toggle source
# File lib/temporal/worker.rb, line 37 def register_activity(activity_class, options = {}) execution_options = ExecutionOptions.new(activity_class, options) key = [execution_options.namespace, execution_options.task_queue] @activities[key].add(execution_options.name, activity_class) end
register_workflow(workflow_class, options = {})
click to toggle source
# File lib/temporal/worker.rb, line 30 def register_workflow(workflow_class, options = {}) execution_options = ExecutionOptions.new(workflow_class, options) key = [execution_options.namespace, execution_options.task_queue] @workflows[key].add(execution_options.name, workflow_class) end
start()
click to toggle source
# File lib/temporal/worker.rb, line 52 def start workflows.each_pair do |(namespace, task_queue), lookup| pollers << workflow_poller_for(namespace, task_queue, lookup) end activities.each_pair do |(namespace, task_queue), lookup| pollers << activity_poller_for(namespace, task_queue, lookup) end trap_signals pollers.each(&:start) # keep the main thread alive sleep 1 while !shutting_down? end
stop()
click to toggle source
# File lib/temporal/worker.rb, line 69 def stop @shutting_down = true Thread.new do pollers.each(&:stop_polling) # allow workers to drain in-transit tasks. # https://github.com/temporalio/temporal/issues/1058 sleep 1 pollers.each(&:cancel_pending_requests) pollers.each(&:wait) end.join end
Private Instance Methods
activity_poller_for(namespace, task_queue, lookup)
click to toggle source
# File lib/temporal/worker.rb, line 96 def activity_poller_for(namespace, task_queue, lookup) Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options) end
shutting_down?()
click to toggle source
# File lib/temporal/worker.rb, line 88 def shutting_down? @shutting_down end
trap_signals()
click to toggle source
# File lib/temporal/worker.rb, line 100 def trap_signals %w[TERM INT].each do |signal| Signal.trap(signal) { stop } end end
workflow_poller_for(namespace, task_queue, lookup)
click to toggle source
# File lib/temporal/worker.rb, line 92 def workflow_poller_for(namespace, task_queue, lookup) Workflow::Poller.new(namespace, task_queue, lookup.freeze, workflow_task_middleware, workflow_poller_options) end