class Pakyow::Data::Subscribers

@api private

Constants

QUALIFIABLE_TYPES

Attributes

adapter[RW]
lookup[RW]

Public Class Methods

new(app, adapter = :memory, adapter_config = {}) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 19
def initialize(app, adapter = :memory, adapter_config = {})
  @app = app

  require "pakyow/data/subscribers/adapters/#{adapter}"
  @adapter = Pakyow::Data::Subscribers::Adapters.const_get(
    adapter.to_s.capitalize
  ).new(
    adapter_config.to_h.merge(
      app.config.data.subscriptions.adapter_settings.to_h
    )
  )

  @executor = Concurrent::ThreadPoolExecutor.new(
    auto_terminate: false,
    min_threads: 1,
    max_threads: 10,
    max_queue: 0
  )
rescue LoadError, NameError => error
  raise UnknownSubscriberAdapter.build(error, adapter: adapter)
end

Public Instance Methods

did_mutate(source_name, changed_values = nil, result_source = nil) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 56
def did_mutate(source_name, changed_values = nil, result_source = nil)
  @executor.post(source_name, changed_values, result_source, Pakyow.logger.target) do |source_name, changed_values, result_source, logger|
    logger.internal {
      "[Pakyow::Data::Subscribers] did mutate #{source_name}"
    }

    subscriptions = @adapter.subscriptions_for_source(source_name)

    logger.internal {
      "[Pakyow::Data::Subscribers] fetched #{subscriptions.count} subscriptions"
    }

    subscriptions.uniq { |subscription|
      subscription.dig(:payload, :id) || subscription
    }.select { |subscription|
      process?(subscription, changed_values, result_source)
    }.each do |subscription|
      if subscription[:version] == @app.config.data.subscriptions.version
        begin
          logger.internal {
            "[Pakyow::Data::Subscribers] processing subscription #{subscription[:id]}"
          }

          process(subscription, result_source)

          logger.internal {
            "[Pakyow::Data::Subscribers] finished processing subscription #{subscription[:id]}"
          }
        rescue => error
          logger.error {
            "[Pakyow::Data::Subscribers] did_mutate failed: #{error}"
          }
        end
      end
    end

    logger.internal {
      "[Pakyow::Data::Subscribers] finished mutate for #{source_name}"
    }
  end
end
expire(subscriber, seconds) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 102
def expire(subscriber, seconds)
  @adapter.expire(subscriber, seconds)
end
persist(subscriber) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 106
def persist(subscriber)
  @adapter.persist(subscriber)
end
register_subscriptions(subscriptions, subscriber: nil) { |ids| ... } click to toggle source
# File lib/pakyow/data/subscribers.rb, line 46
def register_subscriptions(subscriptions, subscriber: nil, &block)
  subscriptions.each do |subscription|
    subscription[:version] = @app.config.data.subscriptions.version
  end

  @adapter.register_subscriptions(subscriptions, subscriber: subscriber).tap do |ids|
    yield ids if block_given?
  end
end
shutdown() click to toggle source
# File lib/pakyow/data/subscribers.rb, line 41
def shutdown
  @executor.shutdown
  @executor.wait_for_termination(30)
end
unsubscribe(subscriber) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 98
def unsubscribe(subscriber)
  @adapter.unsubscribe(subscriber)
end

Private Instance Methods

process(subscription, mutated_source) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 116
def process(subscription, mutated_source)
  callback = subscription[:handler].new(@app)
  arguments = {}

  if callback.method(:call).keyword_argument?(:id)
    arguments[:id] = subscription[:id]
  end

  if callback.method(:call).keyword_argument?(:result)
    arguments[:result] = if subscription[:ephemeral]
      mutated_source
    else
      subscription[:proxy]
    end
  end

  if callback.method(:call).keyword_argument?(:subscription)
    arguments[:subscription] = subscription
  end

  callback.call(subscription[:payload], **arguments)
end
process?(subscription, changed_values, result_source) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 112
def process?(subscription, changed_values, result_source)
  subscription[:handler] && qualified_subscription?(subscription, changed_values, result_source)
end
qualified?(qualifications, changed_values, changed_results, original_results) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 159
def qualified?(qualifications, changed_values, changed_results, original_results)
  qualifications.all? do |key, value|
    (
      QUALIFIABLE_TYPES.include?(changed_values.class) && (
        (
          value.is_a?(Array) && value.include?(changed_values.to_h[key])
        ) || changed_values.to_h[key] == value
      )
    ) || qualified_result?(key, value, changed_results, original_results)
  end
end
qualified_result?(key, value, changed_results, original_results) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 171
def qualified_result?(key, value, changed_results, original_results)
  original_results.concat(changed_results).any? do |result|
    (value.is_a?(Array) && value.include?(result[key])) || result[key] == value
  end
end
qualified_subscription?(subscription, changed_values, result_source) click to toggle source
# File lib/pakyow/data/subscribers.rb, line 139
def qualified_subscription?(subscription, changed_values, result_source)
  if subscription[:ephemeral]
    result_source.qualifications == subscription[:qualifications]
  else
    original_results = if result_source
      result_source.original_results
    else
      []
    end

    qualified?(
      subscription.delete(:qualifications).to_a,
      changed_values,
      result_source.to_a,
      original_results.to_a
    )
  end
end