class SimpleMQTTClient

This class implements a simple MQTT client. It wraps the mqtt gem (github.com/njh/ruby-mqtt) and offers a friendlier interface.

Public Class Methods

new( host, client_id=nil ) click to toggle source

Creates a new MQTT client The method registers a thread whose job is to listen for new packets and execute all the callbacks associated to that channel.

@param [String] host the hostname of the MQTT broker we are connecting to
@param [String] client_id the **unique** client identifier
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 19
def initialize( host, client_id=nil )
  # list of channels we are subscribed to. associates a set of callbacks to each channel
  @channels = Hash.new 
  @client = client_id.nil? ? MQTT::Client.connect(host: host) : MQTT::Client.connect(host: host, client_id: client_id)
  # thread exists as along as the class is not garbage collected
  @thread = Thread.new('mqtt') do 
    @client.get do |channel, message|
      # Execute all the appropriate callbacks:
      # the ones specific to this channel with a single parameter (message)
      # the ones associated to a wildcard channel, with two parameters (message and channel)
      cbs = get_callbacks(channel)
      begin
        cbs.each { |cb| cb.parameters.length==1 ? cb.call(message) : cb.call(message, channel) }
      rescue ArgumentError
        STDERR.puts "The callback you passed for #{channel} has the #{$!}"
      end
    end
  end
end

Public Instance Methods

disconnect() click to toggle source

Disconnects this simple MQTT client instance from the broker

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 82
def disconnect
  @thread.exit
  @client.disconnect
  @channels.clear
end
get_subscribed_channels() click to toggle source

Returns a hash of all the channels this client is currently subscribed to with relative callbacks

@return [Hash] all channels this client is currently subscribed to, and relative callbacks
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 90
def get_subscribed_channels
  @channels
end
is_channel_wildcard?( channel ) click to toggle source

Returns true if a channel is a wildcard channel

@return [Boolean] true if the channel is a wildcard channel. See MQTT specification for wildcard channels

{docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106 here} @param [String] channel the channel we are testing for wildcard

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 104
def is_channel_wildcard?( channel )
  channel.include?('#') || channel.include?('+')
end
publish( channel, message ) click to toggle source

Publishes a message to a channel This method will prevent publishing to wildcard channels.

@param [String] channel the channel we are publishing to
@param [String] message the message we are publishing
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 75
def publish( channel, message )
  # Check we are not publishing to a wildcard channel
  STDERR.puts 'Can\'t publish to a wildcard channel!' if is_channel_wildcard?(channel)
  @client.publish(channel, message)
end
subscribe( channel, callback ) click to toggle source

Subscribes to a channel and registers a callback

  • Single channel callbacks take only one parameter: the received message.

  • Wildcard callbacks take two parameters: the received message and the channel the message was sent to.

It is possible to register multiple callbacks per channel. All of them will be executed whenever a message on that channel is received. Note that overlaps between channel-specific callbacks and wildcard-filters are allowed.

@param [String] channel the channel or filter we are subscribing to
@param [Proc] callback the callback that gets executed when a message is received
whenever a messages is received
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 48
def subscribe( channel, callback )
  if @channels.include?(channel)
    @channels[channel] << callback
  else
    @channels[channel]=[callback]
    @client.subscribe channel
  end
end
subscriptions() click to toggle source

Returns a hash of all the channels this client is currently subscribed to with relative callbacks

@return [Hash] all channels this client is currently subscribed to, and relative callbacks
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 96
def subscriptions
  @channels
end
unsubscribe( channel, callback ) click to toggle source

Un-subscribes a specific callback from a channel. After the last callback is removed, we actually unsibscribe.

@param [String] channel the channel we are un-subscribing from
@param [Proc] callback the specific callback we want to remove
# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 61
def unsubscribe( channel, callback )
  if @channels.include? channel
    @channels[channel].delete(callback)
  end
  if @channels[channel].empty?
    @client.unsubscribe channel
    @channels.delete(channel)
  end
end

Private Instance Methods

build_regex_from_pattern( pattern ) click to toggle source

Escape '/' Substitute '+' for '[^“/”]+' (a string of one or more characters that are not '/') Substitute '/#' with '.*' (a string of zero or more characters) Substitute '#' for '.*' (a string of zero or more characters)

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 156
def build_regex_from_pattern( pattern )
  regex_str = pattern.gsub('/','\\/').gsub('+','[^"\/"]+').gsub('\/#','.*').gsub('#','.*')
  Regexp.new regex_str
end
get_callbacks( channel ) click to toggle source

Gets all the callbacks associated to a channel When a message is received it will contain the exact channel name where the message was received. This method, first tries to retrieve the callbacks that match the channel name exactly (i.e. channel specific ones), then it tries to see what filters the exact channel matches and adds those callbacks to the execution list.

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 117
def get_callbacks( channel )
  cbs = Array.new
  # First, fetch all the channel-specific callbacks...
  cbs.concat(@channels[channel]) if @channels.has_key? channel
  # ...then fetch the callbacks the channel matches
  cbs.concat(get_wildcard_callbacks(channel))
  cbs
end
get_wildcard_callbacks( channel ) click to toggle source

Gets all wildcard callbacks associated to a channel This method identifies which filters (among the ones we are subscribed to) actually match the exact channel the message was received on

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 129
def get_wildcard_callbacks( channel )
  # First select all filters
  filters = @channels.keys.select { |ch| is_channel_wildcard? ch}
  # Then select filters that match channel
  matching_filters = filters.select { |filter| matches_wildcard_pattern(channel, filter) }
  # Add all callbacks that are associated to matching filters
  cbs = Array.new
  matching_filters.each { |ch| cbs.concat @channels[ch] }
  cbs
end
matches_wildcard_pattern(str, pattern) click to toggle source

Returns true if the string matches a pattern See docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107 for a formal description of the rules

# File lib/simple_mqtt_client/simple_mqtt_client.rb, line 143
def matches_wildcard_pattern(str, pattern)
  # First we need to build a regex out of the pattern
  regex = build_regex_from_pattern pattern
  # Then we check if the regex matches the string
  match_data = regex.match str
  # and if the matched data is actually the whole string (avoid errors with channels with the same prefix)
  !match_data.nil? && match_data[0] == str
end