class RailwayIpc::Rabbitmq::Adapter
Attributes
channel[R]
connection[R]
exchange[R]
exchange_name[R]
queue[R]
queue_name[R]
Public Class Methods
new(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {})
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 19 def initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) @queue_name = queue_name @exchange_name = exchange_name settings = AMQ::Settings.parse_amqp_url(amqp_url) vhost = settings[:vhost] || '/' @connection = Bunny.new({ host: settings[:host], user: settings[:user], pass: settings[:pass], port: settings[:port], vhost: vhost, automatic_recovery: false, logger: RailwayIpc.logger }.merge(options)) end
Public Instance Methods
bind_queue_to_exchange()
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 90 def bind_queue_to_exchange queue.bind(exchange) self end
check_for_message(timeout: 10, time_elapsed: 0, &block)
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 49 def check_for_message(timeout: 10, time_elapsed: 0, &block) raise TimeoutError.new if time_elapsed >= timeout block ||= ->(result) { result } result = queue.pop return block.call(*result) if result.compact.any? sleep 1 check_for_message(timeout: timeout, time_elapsed: time_elapsed + 1, &block) end
connect()
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 61 def connect connection.start @channel = connection.channel self end
create_exchange(strategy: :fanout, options: { durable: true })
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 73 def create_exchange(strategy: :fanout, options: { durable: true }) @exchange = Bunny::Exchange.new(connection.channel, strategy, exchange_name, options) self end
create_queue(options={ durable: true })
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 85 def create_queue(options={ durable: true }) @queue = @channel.queue(queue_name, options) self end
delete_exchange()
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 78 def delete_exchange # rubocop:disable Style/SafeNavigation exchange.delete if exchange # rubocop:enable Style/SafeNavigation self end
delete_queue()
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 95 def delete_queue # rubocop:disable Style/SafeNavigation queue.delete if queue # rubocop:enable Style/SafeNavigation self end
disconnect()
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 67 def disconnect channel.close connection.close self end
publish(message, options={})
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 35 def publish(message, options={}) # rubocop:disable Style/SafeNavigation exchange.publish(message, options) if exchange # rubocop:enable Style/SafeNavigation end
reply(message, from)
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 41 def reply(message, from) channel.default_exchange.publish(message, routing_key: from) end
subscribe(&block)
click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 45 def subscribe(&block) queue.subscribe(&block) end