class UDPAcceptor
Public Class Methods
log(msg)
click to toggle source
# File bin/amqp-json-udp-acceptor, line 45 def self.log(msg) puts "[#{Time.now.strftime("%F %T")}] #{msg}" end
run(opts = {})
click to toggle source
# File bin/amqp-json-udp-acceptor, line 19 def self.run(opts = {}) EventMachine.run do error_handler = proc do |*args| log("[error] AMQP Error") shutdown(1) end amqp_opts = { :host => opts[:amqp_host], :port => (opts[:amqp_port] || 5672), :timeout => 1, :on_possible_authentication_failure => error_handler, :on_tcp_connection_failure => error_handler } begin $amqp = AMQP.start(amqp_opts) rescue AMQP::PossibleAuthenticationFailureError, AMQP::TCPConnectionFailed shutdown(1) end EventMachine.open_datagram_socket( "0.0.0.0", opts[:listen_port], self) end end
shutdown(code = 0)
click to toggle source
# File bin/amqp-json-udp-acceptor, line 49 def self.shutdown(code = 0) if EventMachine.reactor_running? EventMachine.stop{ exit(code) } end end
Public Instance Methods
post_init()
click to toggle source
# File bin/amqp-json-udp-acceptor, line 55 def post_init @amqp_channel = AMQP::Channel.new($amqp) @amqp_exchange = @amqp_channel.default_exchange self.class.log("Listening...") end
receive_data(chunk)
click to toggle source
# File bin/amqp-json-udp-acceptor, line 61 def receive_data(chunk) if chunk[0] == '{' channel = JSON.parse(chunk)["channel"] rescue nil return unless channel self.class.log "#{channel} => #{chunk}" @amqp_exchange.publish chunk, :routing_key => channel else self.class.log "[error] not a json object: #{chunk[0..60]}" return end end