class ActionPubsub::ActiveRecord::Subscriber

Attributes

current_event[RW]
resource[RW]

Public Class Methods

disable_all!() click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 33
def self.disable_all!
end
increment_event_failed_count!() click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 45
def self.increment_event_failed_count!
  self.event_failed_count.increment
end
increment_event_processed_count!() click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 49
def self.increment_event_processed_count!
  self.event_processed_count.increment
end
increment_event_triggered_count!() click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 53
def self.increment_event_triggered_count!
  self.event_triggered_count.increment
end
inherited(subklass) click to toggle source

the indirection here with the “subscription” dynamically created class, is for the sake of making subscribers immutable and not storing instance state. i.e. subscription is the actual actor, which just instantiates this subscriber class and performs the task it needs to

# File lib/action_pubsub/active_record/subscriber.rb, line 23
def self.inherited(subklass)
  subklass.subscription = subklass.const_set("Subscription", ::Class.new(::ActionPubsub::ActiveRecord::Subscription))
  subklass.subscription.subscriber = subklass
  subklass.reactions = {}
  subklass.observed_exchanges = ::Set.new
  subklass.event_triggered_count = ::Concurrent::AtomicFixnum.new(0)
  subklass.event_failed_count = ::Concurrent::AtomicFixnum.new(0)
  subklass.event_processed_count = ::Concurrent::AtomicFixnum.new(0)
end
new(record, event:nil) click to toggle source

Instance Methods ###

# File lib/action_pubsub/active_record/subscriber.rb, line 87
def initialize(record, event:nil)
  @resource = record
  @current_event = event if event
end
on(event_name, **options, &block) click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 36
def self.on(event_name, **options, &block)
  reactions[event_name] = {}.tap do |hash|
    hash[:block] = block
    hash[:conditions] = options.extract!(:if, :unless)
  end

  register_reaction_to_event(event_name)
end
react?(event_name, reaction, record) click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 61
def self.react?(event_name, reaction, record)
  return false if reaction[:block].blank?
  return true if reaction[:conditions].blank?
  result = true
  result &&= !reaction[:conditions][:unless].call(record) if reaction[:conditions].key?(:unless)
  result &&= reaction[:conditions][:if].call(record) if reaction[:conditions].key?(:if)
  return result
end
register_reaction_to_event(event_name) click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 70
def self.register_reaction_to_event(event_name)
  observed_exchanges.each do |exchange_prefix|
    target_exchange = [exchange_prefix, event_name].join("/")
    subscriber_key = name.underscore
    queue_key = [target_exchange, subscriber_key].join("/")
    ::ActionPubsub.register_queue(target_exchange, subscriber_key)

    self.concurrency.times do |i|
      queue_address = "#{queue_key}/#{i}"
      ::ActionPubsub.subscriptions[queue_address] ||= self.subscription.spawn(queue_address) do
        self.subscription.bind_subscription(target_exchange, subscriber_key)
      end
    end
  end
end
subscribe_to(*exchanges) click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 57
def self.subscribe_to(*exchanges)
  exchanges.each{ |exchange| self.observed_exchanges << exchange }
end