class Downstream::Stateless::Subscriber

Attributes

async[R]
callable[R]
notification_subscriber[R]

Public Class Methods

new(callable, async: false) click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 10
def initialize(callable, async: false)
  @callable = callable
  @async = async
end

Public Instance Methods

async?() click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 15
def async?
  !!async
end
call(_name, event) click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 19
def call(_name, event)
  if async?
    if callable.is_a?(Proc) || callable.name.nil?
      raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) cannot be asynchronous"
    end

    raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

    after_commit do
      SubscriberJob.then do |job|
        if (queue_name = async_queue_name)
          job.set(queue: queue_name)
        else
          job
        end
      end.perform_later(event, callable.name)
    end
  else
    callable.call(event)
  end
end
subscribe(identifier) click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 41
def subscribe(identifier)
  @notification_subscriber = ActiveSupport::Notifications.subscribe(
    identifier,
    self
  )
end
unsubscribe() click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 48
def unsubscribe
  ActiveSupport::Notifications.unsubscribe(notification_subscriber)
end

Private Instance Methods

async_queue_name() click to toggle source
# File lib/downstream/pubsub_adapters/stateless/subscriber.rb, line 56
def async_queue_name
  return @async_queue_name if defined?(@async_queue_name)

  name = async[:queue] if async.is_a?(Hash)
  name ||= Downstream.config.async_queue

  @async_queue_name = name
end