class MQTT::Proxy
Class for implementing a proxy to filter/mangle MQTT
packets.
Attributes
client_filter[W]
A filter Proc for packets coming from the client (to the server).
local_host[R]
Address to bind listening socket to
local_port[R]
Port to bind listening socket to
logger[R]
Ruby Logger object to send informational messages to
select_timeout[R]
Time in seconds before disconnecting an idle connection
server_filter[W]
A filter Proc for packets coming from the server (to the client).
server_host[R]
Address of upstream server to send packets upstream to
server_port[R]
Port of upstream server to send packets upstream to.
Public Class Methods
new(args = {})
click to toggle source
Create a new MQTT
Proxy
instance.
Possible argument keys:
:local_host Address to bind listening socket to. :local_port Port to bind listening socket to. :server_host Address of upstream server to send packets upstream to. :server_port Port of upstream server to send packets upstream to. :select_timeout Time in seconds before disconnecting a connection. :logger Ruby Logger object to send informational messages to.
NOTE: be careful not to connect to yourself!
# File lib/qubitro-mqtt/proxy.rb, line 40 def initialize(args = {}) @local_host = args[:local_host] || '0.0.0.0' @local_port = args[:local_port] || MQTT::DEFAULT_PORT @server_host = args[:server_host] @server_port = args[:server_port] || 18_830 @select_timeout = args[:select_timeout] || 60 # Setup a logger @logger = args[:logger] if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::INFO end # Default is not to have any filters @client_filter = nil @server_filter = nil # Create TCP server socket @server = TCPServer.open(@local_host, @local_port) @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" end
Public Instance Methods
run()
click to toggle source
Start accepting connections and processing packets.
# File lib/qubitro-mqtt/proxy.rb, line 64 def run loop do # Wait for a client to connect and then create a thread for it Thread.new(@server.accept) do |client_socket| logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" server_socket = TCPSocket.new(@server_host, @server_port) begin process_packets(client_socket, server_socket) rescue Exception => exp logger.error exp.to_s end logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" server_socket.close client_socket.close end end end
Private Instance Methods
process_packets(client_socket, server_socket)
click to toggle source
# File lib/qubitro-mqtt/proxy.rb, line 84 def process_packets(client_socket, server_socket) loop do # Wait for some data on either socket selected = IO.select([client_socket, server_socket], nil, nil, @select_timeout) # Timeout raise 'Timeout in select' if selected.nil? # Iterate through each of the sockets with data to read if selected[0].include?(client_socket) packet = MQTT::Packet.read(client_socket) logger.debug "client -> <#{packet.type_name}>" packet = @client_filter.call(packet) unless @client_filter.nil? unless packet.nil? server_socket.write(packet) logger.debug "<#{packet.type_name}> -> server" end elsif selected[0].include?(server_socket) packet = MQTT::Packet.read(server_socket) logger.debug "server -> <#{packet.type_name}>" packet = @server_filter.call(packet) unless @server_filter.nil? unless packet.nil? client_socket.write(packet) logger.debug "<#{packet.type_name}> -> client" end else logger.error 'Problem with select: socket is neither server or client' end end end