class Downstream::Stateless::Subscriber
Attributes
async[R]
callable[R]
notification_subscriber[R]
Public Class Methods
new(callable, async: false)
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 10 def initialize(callable, async: false) @callable = callable @async = async end
Public Instance Methods
async?()
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 15 def async? !!async end
call(_name, event)
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 19 def call(_name, event) if async? if callable.is_a?(Proc) || callable.name.nil? raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) cannot be asynchronous" end raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module) after_commit do SubscriberJob.then do |job| if (queue_name = async_queue_name) job.set(queue: queue_name) else job end end.perform_later(event, callable.name) end else callable.call(event) end end
subscribe(identifier)
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 41 def subscribe(identifier) @notification_subscriber = ActiveSupport::Notifications.subscribe( identifier, self ) end
unsubscribe()
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 48 def unsubscribe ActiveSupport::Notifications.unsubscribe(notification_subscriber) end
Private Instance Methods
async_queue_name()
click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 56 def async_queue_name return @async_queue_name if defined?(@async_queue_name) name = async[:queue] if async.is_a?(Hash) name ||= Downstream.config.async_queue @async_queue_name = name end