class Announcer::Publishers::AsyncResquePublisher

Attributes

resque[R]

Public Class Methods

new(instance=nil, params={}) click to toggle source
Calls superclass method
# File lib/announcer/publishers/async_resque_publisher.rb, line 12
def initialize(instance=nil, params={})
  super
  @resque = config.resque? ? config.resque : Resque
end

Public Instance Methods

publish(event) click to toggle source
Calls superclass method
# File lib/announcer/publishers/async_resque_publisher.rb, line 56
def publish(event)
  super
  worker.async.publish(event) unless event.subscriptions.empty?
end
subscription_queue_formatter() click to toggle source

Needs to exist for the ResquePublisher::PublisherJob to succeed.

# File lib/announcer/publishers/async_resque_publisher.rb, line 52
def subscription_queue_formatter
  ResquePublisher.subscription_queue_formatter(config)
end
worker() click to toggle source

The suprvisor created in the initializer will restart the PublisherWorker but there can be a short period of time when the actor returned by Celluloid::Actor is dead. To avoid that we sleep for a millisecond to give it time to create a new worker thread. We try three times before giving up.

This should ensure that it’s unlikely for a dead worker to be returned. However, if a dead worker is returned, then async calls will silently fail, allowing normal execution. This makes firing events best-effort.

# File lib/announcer/publishers/async_resque_publisher.rb, line 31
def worker
  # Retrieve the PublisherWorker or start the supervisor.
  w = Celluloid::Actor[worker_id] || PublisherWorker.supervise(
    args: [self],
    as: worker_id
  ).send(worker_id)

  3.times {
    if w.dead?
      sleep(0.001)
      w = Celluloid::Actor[worker_id]
    else
      break
    end
  }

  w
end
worker_id() click to toggle source
# File lib/announcer/publishers/async_resque_publisher.rb, line 17
def worker_id
  @__worker_id ||= "announcer_resque_worker_#{object_id}".to_sym
end