class MiniMqtt::Client

Attributes

clean_session[RW]
client_id[RW]
host[RW]
password[RW]
port[RW]
user[RW]

Public Class Methods

new(params = {}) click to toggle source
# File lib/mini_mqtt/client.rb, line 7
def initialize params = {}
  @host = params[:host] || 'localhost'
  @port = params[:port] || 1883
  @user = params[:user]
  @password = params[:password]
  @keep_alive = params[:keep_alive] || 10
  @client_id = params[:client_id] || generate_client_id
  @clean_session = params.fetch :clean_session, true
end

Public Instance Methods

connect(options = {}) click to toggle source
# File lib/mini_mqtt/client.rb, line 17
def connect options = {}
  # Create socket and packet handler
  @socket = TCPSocket.new @host, @port
  @handler = PacketHandler.new @socket

  # Send ConnectPacket
  @handler.write_packet ConnectPacket.new user: @user,
    password: @password, keep_alive: @keep_alive, client_id: @client_id,
    clean_session: @clean_session, will_topic: options[:will_topic],
    will_message: options[:will_message], will_retain: options[:will_retain]

  # Receive connack packet
  connack = @handler.get_packet

  if connack.accepted?
    @received_messages = Queue.new
    @last_ping_response = Time.now
    spawn_read_thread!
    spawn_keepalive_thread!
  else
    raise StandardError.new(connack.error)
  end
end
connected?() click to toggle source
# File lib/mini_mqtt/client.rb, line 80
def connected?
  @socket && !@socket.closed?
end
disconnect() click to toggle source
# File lib/mini_mqtt/client.rb, line 62
def disconnect
  # Send DisconnectPacket, then kill threads and close socket
  @handler.write_packet DisconnectPacket.new
  @read_thread.kill
  @keepalive_thread.kill
  @socket.close
end
get_message() click to toggle source
# File lib/mini_mqtt/client.rb, line 70
def get_message
  @received_messages.pop
end
get_messages() { |message, topic| ... } click to toggle source
# File lib/mini_mqtt/client.rb, line 74
def get_messages
  while message = get_message
    yield message.message, message.topic
  end
end
publish(topic, message, options = {}) click to toggle source
# File lib/mini_mqtt/client.rb, line 56
def publish topic, message, options = {}
  packet = PublishPacket.new topic: topic, message: message.to_s,
    retain: options[:retain], qos: options[:qos]
  @handler.write_packet packet
end
subscribe(*params) click to toggle source
# File lib/mini_mqtt/client.rb, line 41
def subscribe *params
  # Each param can be a topic or a topic with its max qos.
  # Example: subscribe 'topic1', 'topic2' => 1
  topics = params.map do |arg|
    arg.is_a?(Hash) ? arg : { arg => 0 }
  end
  topics = topics.inject :merge
  packet = SubscribePacket.new topics: topics
  @handler.write_packet packet
end
unsubscribe(*topics) click to toggle source
# File lib/mini_mqtt/client.rb, line 52
def unsubscribe *topics
  @handler.write_packet UnsubscribePacket.new topics: topics
end

Private Instance Methods

generate_client_id() click to toggle source
# File lib/mini_mqtt/client.rb, line 99
def generate_client_id
  "client_#{ rand(10000000) }"
end
handle_received_packet(packet) click to toggle source
# File lib/mini_mqtt/client.rb, line 86
def handle_received_packet packet
  case packet
  when PingrespPacket
    @last_ping_response = Time.now
  when PublishPacket
    @received_messages << packet
    if packet.qos > 0
      @handler.write_packet PubackPacket.new packet_id: packet.packet_id
    end
  when PubackPacket
  end
end
spawn_keepalive_thread!() click to toggle source
# File lib/mini_mqtt/client.rb, line 112
def spawn_keepalive_thread!
  @keepalive_thread = Thread.new do
    while connected? do
      @handler.write_packet PingreqPacket.new
      sleep @keep_alive
      if Time.now - @last_ping_response > 2 * @keep_alive
        puts "Error: MQTT Server not responding to ping. Disconnecting."
        @socket.close
      end
    end
  end
end
spawn_read_thread!() click to toggle source
# File lib/mini_mqtt/client.rb, line 103
def spawn_read_thread!
  @read_thread = Thread.new do
    while connected? do
      handle_received_packet @handler.get_packet
    end
    @received_messages << nil
  end
end