class Temporal::Activity::Poller

Constants

DEFAULT_OPTIONS

Attributes

activity_lookup[R]
middleware[R]
namespace[R]
options[R]
task_queue[R]
thread[R]

Public Class Methods

new(namespace, task_queue, activity_lookup, middleware = [], options = {}) click to toggle source
# File lib/temporal/activity/poller.rb, line 14
def initialize(namespace, task_queue, activity_lookup, middleware = [], options = {})
  @namespace = namespace
  @task_queue = task_queue
  @activity_lookup = activity_lookup
  @middleware = middleware
  @shutting_down = false
  @options = DEFAULT_OPTIONS.merge(options)
end

Public Instance Methods

cancel_pending_requests() click to toggle source
# File lib/temporal/activity/poller.rb, line 33
def cancel_pending_requests
  client.cancel_polling_request
end
start() click to toggle source
# File lib/temporal/activity/poller.rb, line 23
def start
  @shutting_down = false
  @thread = Thread.new(&method(:poll_loop))
end
stop_polling() click to toggle source
# File lib/temporal/activity/poller.rb, line 28
def stop_polling
  @shutting_down = true
  Temporal.logger.info('Shutting down activity poller', { namespace: namespace, task_queue: task_queue })
end
wait() click to toggle source
# File lib/temporal/activity/poller.rb, line 37
def wait
  thread.join
  thread_pool.shutdown
end

Private Instance Methods

client() click to toggle source
# File lib/temporal/activity/poller.rb, line 46
def client
  @client ||= Temporal::Client.generate
end
poll_for_task() click to toggle source
# File lib/temporal/activity/poller.rb, line 75
def poll_for_task
  client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
rescue StandardError => error
  Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })

  Temporal::ErrorHandler.handle(error)

  nil
end
poll_loop() click to toggle source
# File lib/temporal/activity/poller.rb, line 54
def poll_loop
  last_poll_time = Time.now
  metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze

  loop do
    thread_pool.wait_for_available_threads

    return if shutting_down?

    time_diff_ms = ((Time.now - last_poll_time) * 1000).round
    Temporal.metrics.timing('activity_poller.time_since_last_poll', time_diff_ms, metrics_tags)
    Temporal.logger.debug("Polling activity task queue", { namespace: namespace, task_queue: task_queue })

    task = poll_for_task
    last_poll_time = Time.now
    next unless task&.activity_type

    thread_pool.schedule { process(task) }
  end
end
process(task) click to toggle source
# File lib/temporal/activity/poller.rb, line 85
def process(task)
  middleware_chain = Middleware::Chain.new(middleware)

  TaskProcessor.new(task, namespace, activity_lookup, client, middleware_chain).process
end
shutting_down?() click to toggle source
# File lib/temporal/activity/poller.rb, line 50
def shutting_down?
  @shutting_down
end
thread_pool() click to toggle source
# File lib/temporal/activity/poller.rb, line 91
def thread_pool
  @thread_pool ||= ThreadPool.new(options[:thread_pool_size])
end