class Temporal::Worker

Attributes

activities[R]
activity_middleware[R]
activity_poller_options[R]
pollers[R]
workflow_poller_options[R]
workflow_task_middleware[R]
workflows[R]

Public Class Methods

new( activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size] ) click to toggle source

activity_thread_pool_size: number of threads that the poller can use to run activities.

can be set to 1 if you want no paralellism in your activities, at the cost of throughput.
# File lib/temporal/worker.rb, line 12
def initialize(
  activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size],
  workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size]
)
  @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @pollers = []
  @workflow_task_middleware = []
  @activity_middleware = []
  @shutting_down = false
  @activity_poller_options = {
    thread_pool_size: activity_thread_pool_size,
  }
  @workflow_poller_options = {
    thread_pool_size: workflow_thread_pool_size,
  }
end

Public Instance Methods

add_activity_middleware(middleware_class, *args) click to toggle source
# File lib/temporal/worker.rb, line 48
def add_activity_middleware(middleware_class, *args)
  @activity_middleware << Middleware::Entry.new(middleware_class, args)
end
add_workflow_task_middleware(middleware_class, *args) click to toggle source
# File lib/temporal/worker.rb, line 44
def add_workflow_task_middleware(middleware_class, *args)
  @workflow_task_middleware << Middleware::Entry.new(middleware_class, args)
end
register_activity(activity_class, options = {}) click to toggle source
# File lib/temporal/worker.rb, line 37
def register_activity(activity_class, options = {})
  execution_options = ExecutionOptions.new(activity_class, options)
  key = [execution_options.namespace, execution_options.task_queue]

  @activities[key].add(execution_options.name, activity_class)
end
register_workflow(workflow_class, options = {}) click to toggle source
# File lib/temporal/worker.rb, line 30
def register_workflow(workflow_class, options = {})
  execution_options = ExecutionOptions.new(workflow_class, options)
  key = [execution_options.namespace, execution_options.task_queue]

  @workflows[key].add(execution_options.name, workflow_class)
end
start() click to toggle source
# File lib/temporal/worker.rb, line 52
def start
  workflows.each_pair do |(namespace, task_queue), lookup|
    pollers << workflow_poller_for(namespace, task_queue, lookup)
  end

  activities.each_pair do |(namespace, task_queue), lookup|
    pollers << activity_poller_for(namespace, task_queue, lookup)
  end

  trap_signals

  pollers.each(&:start)

  # keep the main thread alive
  sleep 1 while !shutting_down?
end
stop() click to toggle source
# File lib/temporal/worker.rb, line 69
def stop
  @shutting_down = true

  Thread.new do
    pollers.each(&:stop_polling)
    # allow workers to drain in-transit tasks.
    # https://github.com/temporalio/temporal/issues/1058
    sleep 1
    pollers.each(&:cancel_pending_requests)
    pollers.each(&:wait)
  end.join
end

Private Instance Methods

activity_poller_for(namespace, task_queue, lookup) click to toggle source
# File lib/temporal/worker.rb, line 96
def activity_poller_for(namespace, task_queue, lookup)
  Activity::Poller.new(namespace, task_queue, lookup.freeze, activity_middleware, activity_poller_options)
end
shutting_down?() click to toggle source
# File lib/temporal/worker.rb, line 88
def shutting_down?
  @shutting_down
end
trap_signals() click to toggle source
# File lib/temporal/worker.rb, line 100
def trap_signals
  %w[TERM INT].each do |signal|
    Signal.trap(signal) { stop }
  end
end
workflow_poller_for(namespace, task_queue, lookup) click to toggle source
# File lib/temporal/worker.rb, line 92
def workflow_poller_for(namespace, task_queue, lookup)
  Workflow::Poller.new(namespace, task_queue, lookup.freeze, workflow_task_middleware, workflow_poller_options)
end