class MiniMqtt::Client
Attributes
clean_session[RW]
client_id[RW]
host[RW]
password[RW]
port[RW]
user[RW]
Public Class Methods
new(params = {})
click to toggle source
# File lib/mini_mqtt/client.rb, line 7 def initialize params = {} @host = params[:host] || 'localhost' @port = params[:port] || 1883 @user = params[:user] @password = params[:password] @keep_alive = params[:keep_alive] || 10 @client_id = params[:client_id] || generate_client_id @clean_session = params.fetch :clean_session, true end
Public Instance Methods
connect(options = {})
click to toggle source
# File lib/mini_mqtt/client.rb, line 17 def connect options = {} # Create socket and packet handler @socket = TCPSocket.new @host, @port @handler = PacketHandler.new @socket # Send ConnectPacket @handler.write_packet ConnectPacket.new user: @user, password: @password, keep_alive: @keep_alive, client_id: @client_id, clean_session: @clean_session, will_topic: options[:will_topic], will_message: options[:will_message], will_retain: options[:will_retain] # Receive connack packet connack = @handler.get_packet if connack.accepted? @received_messages = Queue.new @last_ping_response = Time.now spawn_read_thread! spawn_keepalive_thread! else raise StandardError.new(connack.error) end end
connected?()
click to toggle source
# File lib/mini_mqtt/client.rb, line 80 def connected? @socket && !@socket.closed? end
disconnect()
click to toggle source
# File lib/mini_mqtt/client.rb, line 62 def disconnect # Send DisconnectPacket, then kill threads and close socket @handler.write_packet DisconnectPacket.new @read_thread.kill @keepalive_thread.kill @socket.close end
get_message()
click to toggle source
# File lib/mini_mqtt/client.rb, line 70 def get_message @received_messages.pop end
get_messages() { |message, topic| ... }
click to toggle source
# File lib/mini_mqtt/client.rb, line 74 def get_messages while message = get_message yield message.message, message.topic end end
publish(topic, message, options = {})
click to toggle source
# File lib/mini_mqtt/client.rb, line 56 def publish topic, message, options = {} packet = PublishPacket.new topic: topic, message: message.to_s, retain: options[:retain], qos: options[:qos] @handler.write_packet packet end
subscribe(*params)
click to toggle source
# File lib/mini_mqtt/client.rb, line 41 def subscribe *params # Each param can be a topic or a topic with its max qos. # Example: subscribe 'topic1', 'topic2' => 1 topics = params.map do |arg| arg.is_a?(Hash) ? arg : { arg => 0 } end topics = topics.inject :merge packet = SubscribePacket.new topics: topics @handler.write_packet packet end
unsubscribe(*topics)
click to toggle source
# File lib/mini_mqtt/client.rb, line 52 def unsubscribe *topics @handler.write_packet UnsubscribePacket.new topics: topics end
Private Instance Methods
generate_client_id()
click to toggle source
# File lib/mini_mqtt/client.rb, line 99 def generate_client_id "client_#{ rand(10000000) }" end
handle_received_packet(packet)
click to toggle source
# File lib/mini_mqtt/client.rb, line 86 def handle_received_packet packet case packet when PingrespPacket @last_ping_response = Time.now when PublishPacket @received_messages << packet if packet.qos > 0 @handler.write_packet PubackPacket.new packet_id: packet.packet_id end when PubackPacket end end
spawn_keepalive_thread!()
click to toggle source
# File lib/mini_mqtt/client.rb, line 112 def spawn_keepalive_thread! @keepalive_thread = Thread.new do while connected? do @handler.write_packet PingreqPacket.new sleep @keep_alive if Time.now - @last_ping_response > 2 * @keep_alive puts "Error: MQTT Server not responding to ping. Disconnecting." @socket.close end end end end
spawn_read_thread!()
click to toggle source
# File lib/mini_mqtt/client.rb, line 103 def spawn_read_thread! @read_thread = Thread.new do while connected? do handle_received_packet @handler.get_packet end @received_messages << nil end end