module ActionPubsub

dead letter routing not working ATM

Constants

VERSION

Attributes

config[RW]
configuration[RW]

Public Class Methods

channel?(channel_path) click to toggle source
# File lib/action_pubsub.rb, line 40
def self.channel?(channel_path)
  channels.key?(channel_path)
end
channels() click to toggle source
# File lib/action_pubsub.rb, line 36
def self.channels
  @channels ||= ::ActionPubsub::Channels.new
end
configure(&block) click to toggle source
# File lib/action_pubsub.rb, line 32
def self.configure(&block)
  block.call(config)
end
deserialize_event(event) click to toggle source
# File lib/action_pubsub.rb, line 121
def self.deserialize_event(event)
  event
end
destination_tuple_from_path(path_string) click to toggle source
# File lib/action_pubsub.rb, line 61
def self.destination_tuple_from_path(path_string)
  segs = path_string.split("/")
  worker_index = segs.pop
  action = segs.pop

  [segs.join("/"), action, worker_index]
end
destination_tuple_from_sender_path(path_string) click to toggle source
# File lib/action_pubsub.rb, line 69
def self.destination_tuple_from_sender_path(path_string)
  segs = path_string.split("/")
  action = segs.pop
  [segs.join("/"), action]
end
disable_all!() click to toggle source
# File lib/action_pubsub.rb, line 44
def self.disable_all!
  configure do |config|
    config.disabled = true
  end

  subscriptions.all.map{ |_subscription| _subscription << :terminate! }
  self
end
event_count() click to toggle source
# File lib/action_pubsub.rb, line 53
def self.event_count
  @event_count ||= ::Concurrent::Agent.new(0)
end
exchanges() click to toggle source
# File lib/action_pubsub.rb, line 57
def self.exchanges
  @exchanges ||= ::ActionPubsub::Exchanges.new
end
on(*paths, as:nil, &block) click to toggle source
# File lib/action_pubsub.rb, line 75
def self.on(*paths, as:nil, &block)
  paths.map do |path|
    target_channel = ::ActionPubsub.channels[path]
    subscription_path = "#{path}:#{as || SecureRandom.uuid}"

    ::ActionPubsub.subscriptions[subscription_path] ||= ::Concurrent::Actor::Utils::AdHoc.spawn(subscription_path) do
      target_channel << :subscribe
      -> message {
        block.call(message)
      }
    end
  end
end
publish(path, message) click to toggle source
# File lib/action_pubsub.rb, line 93
def self.publish(path, message)
  self[path] << message
end
publish_event(routing_key, event) click to toggle source
# File lib/action_pubsub.rb, line 97
def self.publish_event(routing_key, event)
  #need to loop through exchanges and publish to them
  #maybe there is a better way to do this?
  exchanges[routing_key].keys.each do |queue_name|
    exchanges[routing_key][queue_name] << serialize_event(event)
  end
end
serialize_event(event) click to toggle source
# File lib/action_pubsub.rb, line 105
def self.serialize_event(event)
  event
end
silent_dead_letter_handler() click to toggle source
# File lib/action_pubsub.rb, line 113
def self.silent_dead_letter_handler
  @silent_dead_letter_handler ||= ::ActionPubsub::Actors::SilentDeadLetterHandler.spawn('action_pubsub/silent_dead_letter_handler')
end
subscription?(path) click to toggle source
# File lib/action_pubsub.rb, line 117
def self.subscription?(path)
  subscriptions.key?(path)
end
subscriptions() click to toggle source
# File lib/action_pubsub.rb, line 109
def self.subscriptions
  @subscriptions ||= ::ActionPubsub::Subscriptions.new
end
symbolize_routing_key(routing_key) click to toggle source
# File lib/action_pubsub.rb, line 89
def self.symbolize_routing_key(routing_key)
  :"#{routing_key.split('.').join('_')}"
end