class Pakyow::Data::Subscribers::Adapters::Redis

Manages data subscriptions in redis.

Use this in production.

@api private

Constants

INFINITY
KEY_PART_SEPARATOR
KEY_PREFIX
SCRIPTS

Public Class Methods

generate_subscription_id(subscription_string) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 27
def generate_subscription_id(subscription_string)
  Digest::SHA1.hexdigest(subscription_string)
end
new(config) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 40
def initialize(config)
  @redis = ConnectionPool.new(**config[:pool]) {
    ::Redis.new(config[:connection])
  }

  @prefix = [config[:key_prefix], KEY_PREFIX].join(KEY_PART_SEPARATOR)

  @scripts = {}
  load_scripts

  Concurrent::TimerTask.new(execution_interval: 300, timeout_interval: 300) {
    cleanup
  }.execute
end
stringify_subscription(subscription) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 23
def stringify_subscription(subscription)
  Zlib::Deflate.deflate(Marshal.dump(subscription))
end

Public Instance Methods

cleanup() click to toggle source

FIXME: Refactor this into a lua script. We'll want to stop using SCAN and instead store known sources in a set. Cleanup should then be based off the set of known sources and return the number of values that were removed.

# File lib/pakyow/data/subscribers/adapters/redis.rb, line 138
def cleanup
  @redis.with do |redis|
    redis.scan_each(match: key_subscription_ids_by_source("*")) do |key|
      Pakyow.logger.internal {
        "[Pakyow::Data::Subscribers::Adapters::Redis] Cleaning up expired subscriptions for #{key}"
      }

      removed_count = redis.zremrangebyscore(key, 0, Time.now.to_i)

      Pakyow.logger.internal {
        "[Pakyow::Data::Subscribers::Adapters::Redis] Removed #{removed_count} members for #{key}"
      }
    end
  end
end
expire(subscriber, seconds) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 87
def expire(subscriber, seconds)
  @redis.with do |redis|
    redis.evalsha(@scripts[:expire], argv: [
      @prefix,
      KEY_PART_SEPARATOR,
      subscriber.to_s,
      Time.now.to_i + seconds
    ])
  end
end
expiring?(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 108
def expiring?(subscriber)
  @redis.with do |redis|
    redis.ttl(key_subscription_ids_by_subscriber(subscriber)) > -1
  end
end
persist(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 98
def persist(subscriber)
  @redis.with do |redis|
    redis.evalsha(@scripts[:persist], argv: [
      @prefix,
      KEY_PART_SEPARATOR,
      subscriber.to_s
    ])
  end
end
register_subscriptions(subscriptions, subscriber:) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 55
def register_subscriptions(subscriptions, subscriber:)
  [].tap do |subscription_ids|
    subscriptions.each do |subscription|
      subscription_string = self.class.stringify_subscription(subscription)
      subscription_id = self.class.generate_subscription_id(subscription_string)
      source = subscription[:source]

      @redis.with do |redis|
        redis.evalsha(@scripts[:register], argv: [
          @prefix,
          KEY_PART_SEPARATOR,
          subscriber.to_s,
          subscription_id,
          subscription_string,
          source.to_s,
          Time.now.to_i
        ])
      end

      subscription_ids << subscription_id
    end
  end
end
subscribers_for_subscription_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 114
def subscribers_for_subscription_id(subscription_id)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_subscribers_by_subscription_id(
        subscription_id
      ), INFINITY, INFINITY
    ).map(&:to_sym)
  end
end
subscription_ids_for_source(source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 124
def subscription_ids_for_source(source)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_subscription_ids_by_source(
        source
      ), INFINITY, INFINITY
    )
  end
end
subscriptions_for_source(source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 79
def subscriptions_for_source(source)
  subscriptions_for_subscription_ids(subscription_ids_for_source(source)).compact
end
unsubscribe(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 83
def unsubscribe(subscriber)
  expire(subscriber, 0)
end

Private Instance Methods

build_key(*parts) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 175
def build_key(*parts)
  [@prefix].concat(parts).join(KEY_PART_SEPARATOR)
end
key_source_for_subscription_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 195
def key_source_for_subscription_id(subscription_id)
  build_key("subscription:#{subscription_id}", "source")
end
key_subscribers_by_subscription_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 183
def key_subscribers_by_subscription_id(subscription_id)
  build_key("subscription:#{subscription_id}", "subscribers")
end
key_subscription_id(subscription_id) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 179
def key_subscription_id(subscription_id)
  build_key("subscription:#{subscription_id}")
end
key_subscription_ids_by_source(source) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 191
def key_subscription_ids_by_source(source)
  build_key("source:#{source}")
end
key_subscription_ids_by_subscriber(subscriber) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 187
def key_subscription_ids_by_subscriber(subscriber)
  build_key("subscriber:#{subscriber}")
end
load_scripts() click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 199
def load_scripts
  @redis.with do |redis|
    SCRIPTS.each do |script|
      script_content = File.read(
        File.expand_path("../redis/scripts/_shared.lua", __FILE__)
      ) + File.read(
        File.expand_path("../redis/scripts/#{script}.lua", __FILE__)
      )

      @scripts[script] = redis.script(:load, script_content)
    end
  end
end
subscriptions_for_subscription_ids(subscription_ids) click to toggle source
# File lib/pakyow/data/subscribers/adapters/redis.rb, line 156
def subscriptions_for_subscription_ids(subscription_ids)
  return [] if subscription_ids.empty?

  @redis.with do |redis|
    redis.mget(subscription_ids.map { |subscription_id|
      key_subscription_id(subscription_id)
    }).zip(subscription_ids).map { |subscription_string, subscription_id|
      begin
        Marshal.restore(Zlib::Inflate.inflate(subscription_string)).tap do |subscription|
          subscription[:id] = subscription_id
        end
      rescue TypeError
        Pakyow.logger.error "could not find subscription for #{subscription_id}"
        {}
      end
    }
  end
end