class Pebbles::River::River

Constants

EXCHANGE_OPTIONS
MAX_BACKOFF_SECONDS
MAX_RETRY_TIMEOUT

Attributes

channel[R]
environment[R]
exchange[R]
exchange_name[R]
prefetch[R]
session[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/pebbles/river/river.rb, line 13
def initialize(options = {})
  options = {environment: options} if options.is_a?(String)  # Backwards compatibility

  @environment = (options[:environment] || ENV['RACK_ENV'] || 'development').dup.freeze

  @exchange_name = 'pebblebed.river'
  @exchange_name << ".#{environment}" if @environment != 'production'

  @last_connect_attempt = nil

  @prefetch = options[:prefetch]
end

Public Instance Methods

connect() click to toggle source
# File lib/pebbles/river/river.rb, line 30
def connect
  unless @session and @channel and @exchange
    disconnect

    @session = Bunny::Session.new(::Pebbles::River.rabbitmq_options)
    @session.start

    @channel = @session.create_channel
    @channel.prefetch(@prefetch) if @prefetch

    @exchange = @channel.exchange(@exchange_name, EXCHANGE_OPTIONS.dup)
  end
end
connected?() click to toggle source
# File lib/pebbles/river/river.rb, line 26
def connected?
  @session && @session.connected?
end
disconnect() click to toggle source
# File lib/pebbles/river/river.rb, line 44
def disconnect
  if @channel
    begin
      @channel.close
    rescue Bunny::Exception
      # Ignore
    end
    @channel = nil
  end
  if @session
    begin
      @session.stop
    rescue Bunny::Exception
      # Ignore
    end
    @session = nil
  end
  @exchange = nil
end
publish(options = {}) click to toggle source
# File lib/pebbles/river/river.rb, line 64
def publish(options = {})
  connect

  # Note: Using self.exchange so it can be stubbed in tests
  self.exchange.publish(options.to_json,
    persistent: options.fetch(:persistent, true),
    key: Routing.routing_key_for(options.slice(:event, :uid)))
end
queue(options = {}) click to toggle source
# File lib/pebbles/river/river.rb, line 73
def queue(options = {})
  options.assert_valid_keys(:name, :ttl, :event, :path, :klass,
    :dead_letter_routing_key, :routing_key, :no_default_routing_keys)

  raise ArgumentError.new 'Queue must be named' unless options[:name]

  queue_args = {}
  if (ttl = options[:ttl])
    queue_args['x-message-ttl'] = ttl
  end
  if (dead_letter_routing_key = options[:dead_letter_routing_key])
    queue_args['x-dead-letter-exchange'] = @exchange_name
    queue_args['x-dead-letter-routing-key'] = dead_letter_routing_key
  end
  queue_opts = {durable: true, arguments: queue_args}

  connect
  queue = @channel.queue(options[:name], queue_opts)
  if (routing_key = options[:routing_key])
    queue.bind(exchange.name, key: routing_key)
  end
  unless options[:no_default_routing_keys]
    Routing.binding_routing_keys_for(options.slice(:event, :class, :path)).each do |key|
      queue.bind(exchange.name, key: key)
    end
  end
  queue
end

Private Instance Methods

backoff(iteration) click to toggle source
# File lib/pebbles/river/river.rb, line 104
def backoff(iteration)
  sleep([(1.0 / 2.0 * (2.0 ** [30, iteration].min - 1.0)).ceil, MAX_BACKOFF_SECONDS].min)
end