class Pakyow::Data::Subscribers::Adapters::Memory

Manages data subscriptions in memory.

Great for development, not for use in production!

@api private

Constants

SERIALIZABLE_IVARS

Public Class Methods

generate_subscription_id(subscription) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 23
def generate_subscription_id(subscription)
  Digest::SHA1.hexdigest(Marshal.dump(subscription))
end
new(*) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 33
def initialize(*)
  @subscriptions_by_id = Concurrent::Hash.new
  @subscription_ids_by_source = Concurrent::Hash.new
  @subscribers_by_subscription_id = Concurrent::Hash.new
  @subscription_ids_by_subscriber = Concurrent::Hash.new
  @expirations_for_subscriber = Concurrent::Hash.new

  Concurrent::TimerTask.new(execution_interval: 10, timeout_interval: 10) {
    @expirations_for_subscriber.each do |subscriber, timeout|
      if timeout < Time.now
        unsubscribe(subscriber)
      end
    end
  }.execute
end

Public Instance Methods

expire(subscriber, seconds) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 71
def expire(subscriber, seconds)
  @expirations_for_subscriber[subscriber] = Time.now + seconds
end
expiring?(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 79
def expiring?(subscriber)
  @expirations_for_subscriber.key?(subscriber)
end
persist(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 75
def persist(subscriber)
  @expirations_for_subscriber.delete(subscriber)
end
register_subscriptions(subscriptions, subscriber: nil) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 49
def register_subscriptions(subscriptions, subscriber: nil)
  subscriptions.map { |subscription|
    self.class.generate_subscription_id(subscription).tap do |subscription_id|
      register_subscription_with_subscription_id(subscription, subscription_id)
      register_subscription_id_for_source(subscription_id, subscription[:source])
      register_subscriber_for_subscription_id(subscriber, subscription_id)
    end
  }
end
serialize() click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 94
def serialize
  SERIALIZABLE_IVARS.each_with_object({}) do |ivar, hash|
    hash[ivar] = instance_variable_get(ivar)
  end
end
subscribers_for_subscription_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 83
def subscribers_for_subscription_id(subscription_id)
  @subscribers_by_subscription_id[subscription_id] || []
end
subscriptions_for_source(source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 59
def subscriptions_for_source(source)
  subscription_ids_for_source(source).map { |subscription_id|
    subscription_with_id(subscription_id)
  }
end
unsubscribe(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 65
def unsubscribe(subscriber)
  subscription_ids_for_subscriber(subscriber).dup.each do |subscription_id|
    unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id)
  end
end

Protected Instance Methods

register_subscriber_for_subscription_id(subscriber, subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 129
def register_subscriber_for_subscription_id(subscriber, subscription_id)
  @subscribers_by_subscription_id[subscription_id] ||= Concurrent::Array.new
  (@subscribers_by_subscription_id[subscription_id] << subscriber).uniq!

  @subscription_ids_by_subscriber[subscriber] ||= Concurrent::Array.new
  (@subscription_ids_by_subscriber[subscriber] << subscription_id).uniq!
end
register_subscription_id_for_source(subscription_id, source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 124
def register_subscription_id_for_source(subscription_id, source)
  @subscription_ids_by_source[source] ||= Concurrent::Array.new
  (@subscription_ids_by_source[source] << subscription_id).uniq!
end
register_subscription_with_subscription_id(subscription, subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 120
def register_subscription_with_subscription_id(subscription, subscription_id)
  @subscriptions_by_id[subscription_id] = subscription
end
subscription_ids_for_source(source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 102
def subscription_ids_for_source(source)
  (@subscription_ids_by_source[source] || []).select { |subscription_id|
    subscribers_for_subscription_id(subscription_id).any? { |subscriber|
      !expiring?(subscriber)
    }
  }
end
subscription_ids_for_subscriber(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 116
def subscription_ids_for_subscriber(subscriber)
  @subscription_ids_by_subscriber[subscriber] || []
end
subscription_with_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 110
def subscription_with_id(subscription_id)
  subscription = @subscriptions_by_id[subscription_id].deep_dup
  subscription[:id] = subscription_id
  subscription
end
unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/memory.rb, line 137
def unsubscribe_subscriber_from_subscription_id(subscriber, subscription_id)
  subscribers_for_subscription_id(subscription_id).delete(subscriber)
  subscription_ids_for_subscriber(subscriber).delete(subscription_id)

  if subscribers_for_subscription_id(subscription_id).empty?
    @subscriptions_by_id.delete(subscription_id)

    @subscription_ids_by_source.each do |_, ids|
      ids.delete(subscription_id)
    end
  end
end