class UDPAcceptor

Public Class Methods

log(msg) click to toggle source
# File bin/amqp-json-udp-acceptor, line 45
def self.log(msg)
  puts "[#{Time.now.strftime("%F %T")}] #{msg}"
end
run(opts = {}) click to toggle source
# File bin/amqp-json-udp-acceptor, line 19
def self.run(opts = {})
  EventMachine.run do
    error_handler = proc do |*args|
      log("[error] AMQP Error")
      shutdown(1)
    end

    amqp_opts = {
      :host => opts[:amqp_host],
      :port => (opts[:amqp_port] || 5672),
      :timeout => 1,
      :on_possible_authentication_failure => error_handler,
      :on_tcp_connection_failure => error_handler
    }

    begin
      $amqp = AMQP.start(amqp_opts)
    rescue AMQP::PossibleAuthenticationFailureError, AMQP::TCPConnectionFailed
      shutdown(1)
    end

    EventMachine.open_datagram_socket(
      "0.0.0.0", opts[:listen_port], self)
  end
end
shutdown(code = 0) click to toggle source
# File bin/amqp-json-udp-acceptor, line 49
def self.shutdown(code = 0)
  if EventMachine.reactor_running?
    EventMachine.stop{ exit(code) }
  end
end

Public Instance Methods

post_init() click to toggle source
# File bin/amqp-json-udp-acceptor, line 55
def post_init
  @amqp_channel  = AMQP::Channel.new($amqp)
  @amqp_exchange = @amqp_channel.default_exchange
  self.class.log("Listening...")
end
receive_data(chunk) click to toggle source
# File bin/amqp-json-udp-acceptor, line 61
def receive_data(chunk)
  if chunk[0] == '{'
    channel = JSON.parse(chunk)["channel"] rescue nil
    return unless channel
    self.class.log "#{channel} => #{chunk}"
    @amqp_exchange.publish chunk, :routing_key => channel
  else
    self.class.log "[error] not a json object: #{chunk[0..60]}"
    return
  end
end