class BitBroker::Subscriber
Constants
- SIGNAL_TERM
Public Class Methods
new(config)
click to toggle source
Calls superclass method
BitBroker::Broker::new
# File lib/bitbroker/broker.rb, line 66 def initialize(config) super(config) end
Public Instance Methods
recv_data(&block)
click to toggle source
# File lib/bitbroker/broker.rb, line 70 def recv_data(&block) recv(RKEY_DATA, &block) end
recv_metadata(&block)
click to toggle source
# File lib/bitbroker/broker.rb, line 73 def recv_metadata(&block) recv(RKEY_METADATA, &block) end
recv_p_data(&block)
click to toggle source
# File lib/bitbroker/broker.rb, line 76 def recv_p_data(&block) recv(RKEY_DATA + Mac.addr, &block) end
recv_p_metadata(&block)
click to toggle source
# File lib/bitbroker/broker.rb, line 79 def recv_p_metadata(&block) recv(RKEY_METADATA + Mac.addr, &block) end
Private Instance Methods
decode(encrypted_data)
click to toggle source
# File lib/bitbroker/broker.rb, line 101 def decode(encrypted_data) cipher = OpenSSL::Cipher::Cipher.new(ENCRYPT_ALGORITHM) cipher.decrypt cipher.pkcs5_keyivgen(@passwd) cipher.update(encrypted_data) + cipher.final end
recv(rkey, &block)
click to toggle source
# File lib/bitbroker/broker.rb, line 84 def recv(rkey, &block) queue = @channel.queue('', :exclusive => true) queue.bind(@exchange, :routing_key => rkey) begin queue.subscribe(:block => true) do |info, prop, binary| msg = MessagePack.unpack(decode(binary)) if msg['from'] != Mac.addr block.call(msg['data'], msg['from']) end end rescue OpenSSL::Cipher::CipherError => e Log.warn("[Subscriber] #{e.to_s}") rescue Exception => _ finish end end