class Thrift::AMQPServer
Constants
- DEFAULT_TIMEOUT
Public Class Methods
new(processor, iprot_factory, oprot_factory = nil, opts = {})
click to toggle source
# File lib/thrift/amqp/server.rb, line 13 def initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) @processor = processor @iprot_factory = iprot_factory @oprot_factory = oprot_factory || iprot_factory @queue_name = opts[:queue_name] @amqp_uri = opts[:amqp_uri] @routing_key = opts[:routing_key] @exchange_name = opts[:exchange_name] @prefetch = (ENV['QOS_SIZE'] || opts[:prefetch]).to_i @timeout = opts[:timeout] ? opts[:timeout] * 1000 : DEFAULT_TIMEOUT @consumer_tag = opts[:consumer_tag] @fetching_disabled = ENV['RABBITMQ_QOS'] == '0' @queue_declare_args = opts[:queue_declare_args] || { durable: true } end
Public Instance Methods
handle(delivery_info, properties, payload)
click to toggle source
# File lib/thrift/amqp/server.rb, line 29 def handle(delivery_info, properties, payload) input = StringIO.new payload out = StringIO.new transport = IOStreamTransport.new input, out protocol = @iprot_factory.get_protocol transport begin @processor.process protocol, protocol if out.length > 0 out.rewind @channel.default_exchange.publish( out.read(out.length), routing_key: properties.reply_to ) end rescue => e LOGGER.error("Processor failure #{e}") end end
serve()
click to toggle source
# File lib/thrift/amqp/server.rb, line 50 def serve @conn = Bunny.new(@amqp_uri, continuation_timeout: @timeout * 1000) @conn.start @channel = @conn.create_channel(nil, @prefetch == 0 ? 1 : @prefetch) exchange = @channel.direct(@exchange_name) queue = @channel.queue(@queue_name, @queue_declare_args) queue.bind exchange, routing_key: @routing_key @consumer_tag ||= @channel.generate_consumer_tag @channel.prefetch @prefetch loop do if @fetching_disabled LOGGER.info("Fetching disabled") sleep @timeout next end LOGGER.info("Fetching message from #{@queue_name}") queue.subscribe( manual_ack: true, block: true, consumer_tag: @consumer_tag ) do |delivery_info, properties, payload| begin if @timeout begin Timeout.timeout(@timeout) do handle(delivery_info, properties, payload) end rescue Timeout::Error LOGGER.info("Timeout raised") end else handle delivery_info, properties, payload end rescue => e LOGGER.info("Error happened: #{e}") end @channel.acknowledge(delivery_info.delivery_tag, false) end end rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::ConnectionClosedError LOGGER.error("Can't establish the connection") sleep 5 retry end