class RailwayIpc::Rabbitmq::Adapter

Attributes

channel[R]
connection[R]
exchange[R]
exchange_name[R]
queue[R]
queue_name[R]

Public Class Methods

new(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 19
def initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {})
  @queue_name = queue_name
  @exchange_name = exchange_name
  settings = AMQ::Settings.parse_amqp_url(amqp_url)
  vhost = settings[:vhost] || '/'
  @connection = Bunny.new({
    host: settings[:host],
    user: settings[:user],
    pass: settings[:pass],
    port: settings[:port],
    vhost: vhost,
    automatic_recovery: false,
    logger: RailwayIpc.logger
  }.merge(options))
end

Public Instance Methods

bind_queue_to_exchange() click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 90
def bind_queue_to_exchange
  queue.bind(exchange)
  self
end
check_for_message(timeout: 10, time_elapsed: 0, &block) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 49
def check_for_message(timeout: 10, time_elapsed: 0, &block)
  raise TimeoutError.new if time_elapsed >= timeout

  block ||= ->(result) { result }

  result = queue.pop
  return block.call(*result) if result.compact.any?

  sleep 1
  check_for_message(timeout: timeout, time_elapsed: time_elapsed + 1, &block)
end
connect() click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 61
def connect
  connection.start
  @channel = connection.channel
  self
end
create_exchange(strategy: :fanout, options: { durable: true }) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 73
def create_exchange(strategy: :fanout, options: { durable: true })
  @exchange = Bunny::Exchange.new(connection.channel, strategy, exchange_name, options)
  self
end
create_queue(options={ durable: true }) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 85
def create_queue(options={ durable: true })
  @queue = @channel.queue(queue_name, options)
  self
end
delete_exchange() click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 78
def delete_exchange
  # rubocop:disable Style/SafeNavigation
  exchange.delete if exchange
  # rubocop:enable Style/SafeNavigation
  self
end
delete_queue() click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 95
def delete_queue
  # rubocop:disable Style/SafeNavigation
  queue.delete if queue
  # rubocop:enable Style/SafeNavigation
  self
end
disconnect() click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 67
def disconnect
  channel.close
  connection.close
  self
end
publish(message, options={}) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 35
def publish(message, options={})
  # rubocop:disable Style/SafeNavigation
  exchange.publish(message, options) if exchange
  # rubocop:enable Style/SafeNavigation
end
reply(message, from) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 41
def reply(message, from)
  channel.default_exchange.publish(message, routing_key: from)
end
subscribe(&block) click to toggle source
# File lib/railway_ipc/rabbitmq/adapter.rb, line 45
def subscribe(&block)
  queue.subscribe(&block)
end