class ActionPubsub::ActiveRecord::Subscriber
Attributes
current_event[RW]
resource[RW]
Public Class Methods
disable_all!()
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 33 def self.disable_all! end
increment_event_failed_count!()
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 45 def self.increment_event_failed_count! self.event_failed_count.increment end
increment_event_processed_count!()
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 49 def self.increment_event_processed_count! self.event_processed_count.increment end
increment_event_triggered_count!()
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 53 def self.increment_event_triggered_count! self.event_triggered_count.increment end
inherited(subklass)
click to toggle source
the indirection here with the “subscription” dynamically created class, is for the sake of making subscribers immutable and not storing instance state. i.e. subscription is the actual actor, which just instantiates this subscriber class and performs the task it needs to
# File lib/action_pubsub/active_record/subscriber.rb, line 23 def self.inherited(subklass) subklass.subscription = subklass.const_set("Subscription", ::Class.new(::ActionPubsub::ActiveRecord::Subscription)) subklass.subscription.subscriber = subklass subklass.reactions = {} subklass.observed_exchanges = ::Set.new subklass.event_triggered_count = ::Concurrent::AtomicFixnum.new(0) subklass.event_failed_count = ::Concurrent::AtomicFixnum.new(0) subklass.event_processed_count = ::Concurrent::AtomicFixnum.new(0) end
new(record, event:nil)
click to toggle source
Instance Methods ###
# File lib/action_pubsub/active_record/subscriber.rb, line 87 def initialize(record, event:nil) @resource = record @current_event = event if event end
on(event_name, **options, &block)
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 36 def self.on(event_name, **options, &block) reactions[event_name] = {}.tap do |hash| hash[:block] = block hash[:conditions] = options.extract!(:if, :unless) end register_reaction_to_event(event_name) end
react?(event_name, reaction, record)
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 61 def self.react?(event_name, reaction, record) return false if reaction[:block].blank? return true if reaction[:conditions].blank? result = true result &&= !reaction[:conditions][:unless].call(record) if reaction[:conditions].key?(:unless) result &&= reaction[:conditions][:if].call(record) if reaction[:conditions].key?(:if) return result end
register_reaction_to_event(event_name)
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 70 def self.register_reaction_to_event(event_name) observed_exchanges.each do |exchange_prefix| target_exchange = [exchange_prefix, event_name].join("/") subscriber_key = name.underscore queue_key = [target_exchange, subscriber_key].join("/") ::ActionPubsub.register_queue(target_exchange, subscriber_key) self.concurrency.times do |i| queue_address = "#{queue_key}/#{i}" ::ActionPubsub.subscriptions[queue_address] ||= self.subscription.spawn(queue_address) do self.subscription.bind_subscription(target_exchange, subscriber_key) end end end end
subscribe_to(*exchanges)
click to toggle source
# File lib/action_pubsub/active_record/subscriber.rb, line 57 def self.subscribe_to(*exchanges) exchanges.each{ |exchange| self.observed_exchanges << exchange } end