class Pakyow::Data::Subscribers::Adapters::Memory
Manages data subscriptions in memory.
Great for development, not for use in production!
@api private
Constants
- SERIALIZABLE_IVARS
Public Class Methods
generate_subscription_id(subscription)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 23 def generate_subscription_id(subscription) Digest::SHA1.hexdigest(Marshal.dump(subscription)) end
new(*)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 33 def initialize(*) @subscriptions_by_id = Concurrent::Hash.new @subscription_ids_by_source = Concurrent::Hash.new @subscribers_by_subscription_id = Concurrent::Hash.new @subscription_ids_by_subscriber = Concurrent::Hash.new @expirations_for_subscriber = Concurrent::Hash.new Concurrent::TimerTask.new(execution_interval: 10, timeout_interval: 10) { @expirations_for_subscriber.each do |subscriber, timeout| if timeout < Time.now unsubscribe(subscriber) end end }.execute end
Public Instance Methods
expire(subscriber, seconds)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 71 def expire(subscriber, seconds) @expirations_for_subscriber[subscriber] = Time.now + seconds end
expiring?(subscriber)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 79 def expiring?(subscriber) @expirations_for_subscriber.key?(subscriber) end
persist(subscriber)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 75 def persist(subscriber) @expirations_for_subscriber.delete(subscriber) end
register_subscriptions(subscriptions, subscriber: nil)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 49 def register_subscriptions(subscriptions, subscriber: nil) subscriptions.map { |subscription| self.class.generate_subscription_id(subscription).tap do |subscription_id| register_subscription_with_subscription_id(subscription, subscription_id) register_subscription_id_for_source(subscription_id, subscription[:source]) register_subscriber_for_subscription_id(subscriber, subscription_id) end } end
serialize()
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 94 def serialize SERIALIZABLE_IVARS.each_with_object({}) do |ivar, hash| hash[ivar] = instance_variable_get(ivar) end end
subscribers_for_subscription_id(subscription_id)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 83 def subscribers_for_subscription_id(subscription_id) @subscribers_by_subscription_id[subscription_id] || [] end
subscriptions_for_source(source)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 59 def subscriptions_for_source(source) subscription_ids_for_source(source).map { |subscription_id| subscription_with_id(subscription_id) } end
unsubscribe(subscriber)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 65 def unsubscribe(subscriber) subscription_ids_for_subscriber(subscriber).dup.each do |subscription_id| unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id) end end
Protected Instance Methods
register_subscriber_for_subscription_id(subscriber, subscription_id)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 129 def register_subscriber_for_subscription_id(subscriber, subscription_id) @subscribers_by_subscription_id[subscription_id] ||= Concurrent::Array.new (@subscribers_by_subscription_id[subscription_id] << subscriber).uniq! @subscription_ids_by_subscriber[subscriber] ||= Concurrent::Array.new (@subscription_ids_by_subscriber[subscriber] << subscription_id).uniq! end
register_subscription_id_for_source(subscription_id, source)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 124 def register_subscription_id_for_source(subscription_id, source) @subscription_ids_by_source[source] ||= Concurrent::Array.new (@subscription_ids_by_source[source] << subscription_id).uniq! end
register_subscription_with_subscription_id(subscription, subscription_id)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 120 def register_subscription_with_subscription_id(subscription, subscription_id) @subscriptions_by_id[subscription_id] = subscription end
subscription_ids_for_source(source)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 102 def subscription_ids_for_source(source) (@subscription_ids_by_source[source] || []).select { |subscription_id| subscribers_for_subscription_id(subscription_id).any? { |subscriber| !expiring?(subscriber) } } end
subscription_ids_for_subscriber(subscriber)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 116 def subscription_ids_for_subscriber(subscriber) @subscription_ids_by_subscriber[subscriber] || [] end
subscription_with_id(subscription_id)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 110 def subscription_with_id(subscription_id) subscription = @subscriptions_by_id[subscription_id].deep_dup subscription[:id] = subscription_id subscription end
unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id)
click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 137 def unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id) subscribers_for_subscription_id(subscription_id).delete(subscriber) subscription_ids_for_subscriber(subscriber).delete(subscription_id) if subscribers_for_subscription_id(subscription_id).empty? @subscriptions_by_id.delete(subscription_id) @subscription_ids_by_source.each do |_, ids| ids.delete(subscription_id) end end end