class Temporal::Activity::Poller
Constants
- DEFAULT_OPTIONS
Attributes
activity_lookup[R]
middleware[R]
namespace[R]
options[R]
task_queue[R]
thread[R]
Public Class Methods
new(namespace, task_queue, activity_lookup, middleware = [], options = {})
click to toggle source
# File lib/temporal/activity/poller.rb, line 14 def initialize(namespace, task_queue, activity_lookup, middleware = [], options = {}) @namespace = namespace @task_queue = task_queue @activity_lookup = activity_lookup @middleware = middleware @shutting_down = false @options = DEFAULT_OPTIONS.merge(options) end
Public Instance Methods
cancel_pending_requests()
click to toggle source
# File lib/temporal/activity/poller.rb, line 33 def cancel_pending_requests client.cancel_polling_request end
start()
click to toggle source
# File lib/temporal/activity/poller.rb, line 23 def start @shutting_down = false @thread = Thread.new(&method(:poll_loop)) end
stop_polling()
click to toggle source
# File lib/temporal/activity/poller.rb, line 28 def stop_polling @shutting_down = true Temporal.logger.info('Shutting down activity poller', { namespace: namespace, task_queue: task_queue }) end
wait()
click to toggle source
# File lib/temporal/activity/poller.rb, line 37 def wait thread.join thread_pool.shutdown end
Private Instance Methods
client()
click to toggle source
# File lib/temporal/activity/poller.rb, line 46 def client @client ||= Temporal::Client.generate end
poll_for_task()
click to toggle source
# File lib/temporal/activity/poller.rb, line 75 def poll_for_task client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) Temporal::ErrorHandler.handle(error) nil end
poll_loop()
click to toggle source
# File lib/temporal/activity/poller.rb, line 54 def poll_loop last_poll_time = Time.now metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze loop do thread_pool.wait_for_available_threads return if shutting_down? time_diff_ms = ((Time.now - last_poll_time) * 1000).round Temporal.metrics.timing('activity_poller.time_since_last_poll', time_diff_ms, metrics_tags) Temporal.logger.debug("Polling activity task queue", { namespace: namespace, task_queue: task_queue }) task = poll_for_task last_poll_time = Time.now next unless task&.activity_type thread_pool.schedule { process(task) } end end
process(task)
click to toggle source
# File lib/temporal/activity/poller.rb, line 85 def process(task) middleware_chain = Middleware::Chain.new(middleware) TaskProcessor.new(task, namespace, activity_lookup, client, middleware_chain).process end
shutting_down?()
click to toggle source
# File lib/temporal/activity/poller.rb, line 50 def shutting_down? @shutting_down end
thread_pool()
click to toggle source
# File lib/temporal/activity/poller.rb, line 91 def thread_pool @thread_pool ||= ThreadPool.new(options[:thread_pool_size]) end