class Pebbles::River::River
Constants
- EXCHANGE_OPTIONS
- MAX_BACKOFF_SECONDS
- MAX_RETRY_TIMEOUT
Attributes
channel[R]
environment[R]
exchange[R]
exchange_name[R]
prefetch[R]
session[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/pebbles/river/river.rb, line 13 def initialize(options = {}) options = {environment: options} if options.is_a?(String) # Backwards compatibility @environment = (options[:environment] || ENV['RACK_ENV'] || 'development').dup.freeze @exchange_name = 'pebblebed.river' @exchange_name << ".#{environment}" if @environment != 'production' @last_connect_attempt = nil @prefetch = options[:prefetch] end
Public Instance Methods
connect()
click to toggle source
# File lib/pebbles/river/river.rb, line 30 def connect unless @session and @channel and @exchange disconnect @session = Bunny::Session.new(::Pebbles::River.rabbitmq_options) @session.start @channel = @session.create_channel @channel.prefetch(@prefetch) if @prefetch @exchange = @channel.exchange(@exchange_name, EXCHANGE_OPTIONS.dup) end end
connected?()
click to toggle source
# File lib/pebbles/river/river.rb, line 26 def connected? @session && @session.connected? end
disconnect()
click to toggle source
# File lib/pebbles/river/river.rb, line 44 def disconnect if @channel begin @channel.close rescue Bunny::Exception # Ignore end @channel = nil end if @session begin @session.stop rescue Bunny::Exception # Ignore end @session = nil end @exchange = nil end
publish(options = {})
click to toggle source
# File lib/pebbles/river/river.rb, line 64 def publish(options = {}) connect # Note: Using self.exchange so it can be stubbed in tests self.exchange.publish(options.to_json, persistent: options.fetch(:persistent, true), key: Routing.routing_key_for(options.slice(:event, :uid))) end
queue(options = {})
click to toggle source
# File lib/pebbles/river/river.rb, line 73 def queue(options = {}) options.assert_valid_keys(:name, :ttl, :event, :path, :klass, :dead_letter_routing_key, :routing_key, :no_default_routing_keys) raise ArgumentError.new 'Queue must be named' unless options[:name] queue_args = {} if (ttl = options[:ttl]) queue_args['x-message-ttl'] = ttl end if (dead_letter_routing_key = options[:dead_letter_routing_key]) queue_args['x-dead-letter-exchange'] = @exchange_name queue_args['x-dead-letter-routing-key'] = dead_letter_routing_key end queue_opts = {durable: true, arguments: queue_args} connect queue = @channel.queue(options[:name], queue_opts) if (routing_key = options[:routing_key]) queue.bind(exchange.name, key: routing_key) end unless options[:no_default_routing_keys] Routing.binding_routing_keys_for(options.slice(:event, :class, :path)).each do |key| queue.bind(exchange.name, key: key) end end queue end
Private Instance Methods
backoff(iteration)
click to toggle source
# File lib/pebbles/river/river.rb, line 104 def backoff(iteration) sleep([(1.0 / 2.0 * (2.0 ** [30, iteration].min - 1.0)).ceil, MAX_BACKOFF_SECONDS].min) end