class AwsIotDevice::MqttShadowClient::ShadowActionManager

Attributes

logger[RW]

This the main AWS action manager It enables the AWS IoT actions (get, update, delete) It enables the time control the time out after an action have been start Actions requests are send on the general actions topic and answer is retreived from accepted/refused/delta topics

Public Class Methods

new(shadow_name, mqtt_client, persistent_subscribe=false) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 18
def initialize(shadow_name, mqtt_client, persistent_subscribe=false)
  @shadow_name = shadow_name
  @topic_manager = ShadowTopicManager.new(mqtt_client, shadow_name)
  @payload_parser = JSONPayloadParser.new
  @is_subscribed = {}
  @is_subscribed[:get] = false
  @is_subscribed[:update] = false
  @is_subscribed[:delete] = false
  @token_handler = TokenCreator.new(shadow_name, mqtt_client.client_id)
  @persistent_subscribe = persistent_subscribe
  @last_stable_version = -1 #Mean no currentely stable
  @topic_subscribed_callback = {}
  @topic_subscribed_callback[:get] = nil
  @topic_subscribed_callback[:update] = nil
  @topic_subscribed_callback[:delta] = nil
  @topic_subscribed_task_count = {}
  @topic_subscribed_task_count[:get] = 0
  @topic_subscribed_task_count[:update] = 0
  @topic_subscribed_task_count[:delete] = 0
  @token_pool = {}
  @token_callback = {}
  @task_count_mutex = Mutex.new
  @token_mutex = Mutex.new
  @parser_mutex = Mutex.new
  set_basic_callback
end

Public Instance Methods

logger?() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 116
def logger?
  !@logger.nil? && @logger.is_a?(Logger)
end
register_delete_callback(callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 86
def register_delete_callback(callback, &block)
  register_action_callback(:delete, callback, &block)
end
register_get_callback(callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 78
def register_get_callback(callback, &block)
  register_action_callback(:get, callback, &block)
end
register_shadow_delta_callback(callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 90
def register_shadow_delta_callback(callback, &block)
  if callback.is_a?(Proc)
    @topic_subscribed_callback[:delta] = callback
  elsif block_given?
    @topic_subscribed_callback[:delta] = block
  end
  @topic_manager.shadow_topic_subscribe("delta", @default_callback)
end
register_update_callback(callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 82
def register_update_callback(callback, &block)
  register_action_callback(:update, callback, &block)
end
remove_delete_callback() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 107
def remove_delete_callback
  remove_action_callback(:delete)
end
remove_get_callback() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 99
def remove_get_callback
  remove_action_callback(:get)
end
remove_shadow_delta_callback() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 111
def remove_shadow_delta_callback
  @topic_subscribe_callback.delete[:delta]
  @topic_manager.shadow_topic_unsubscribe("delta")
end
remove_update_callback() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 103
def remove_update_callback
  remove_action_callback(:update)
end
shadow_delete(timeout=5, callback=nil, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 74
def shadow_delete(timeout=5, callback=nil, &block)
  shadow_action(:delete, "", timeout, callback, &block)
end
shadow_get(timeout=5, callback=nil, &block) click to toggle source

Send and publish packet with an empty payload contains in a valid JSON format. A unique token is generate and send in the packet in order to trace the action. Subscribe to the two get/accepted and get/rejected of the coresponding shadow. If the request is accpeted, the answer would be send on the get/accepted topic. It contains all the details of the shadow state in JSON document. A specific callback in Proc could be send parameter. Before exit, the function start a timer count down in the separate thread. If the time ran out, the timer_handler function is called and the get action is cancelled using the token.

Parameter:

> callback: the Proc to execute when the answer to th get request would be received.
             It should accept three different paramter:
               - payload : the answer content
               - response_status : among ['accepted', 'refused', 'delta']
               - token : the token assoicate to the get request

> timeout: the period after which the request should be canceled and timer_handler should be call

Returns :

> the token associate to the current action (which also store in @token_pool)
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 66
def shadow_get(timeout=5, callback=nil, &block)
  shadow_action(:get, "", timeout, callback, &block)
end
shadow_update(payload, timeout=5, callback=nil, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 70
def shadow_update(payload, timeout=5, callback=nil, &block)
  shadow_action(:update, payload, timeout, callback, &block)
end

Private Instance Methods

accepted_tasks(message, action, token) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 260
def accepted_tasks(message, action, token)
  @topic_subscribed_callback[action].call(message)  unless @topic_subscribed_callback[action].nil?
  @token_callback[token].call(message) if @token_callback.has_key?(token)
  @token_callback.delete(token)
end
decresase_task_count(action) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 204
def decresase_task_count(action)
  @topic_subscribed_task_count[action] -= 1
  if @topic_subscribed_task_count[action] <= 0
    @topic_subscribed_task_count[action] = 0
    unless @persistent_subscribe
      @topic_manager.shadow_topic_unsubscribe(action.to_s)
      @is_subscribed[action] = false
    end
  end
end
do_accepted(message, action, token, type, new_version) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 248
def do_accepted(message, action, token, type, new_version)
  if new_version && new_version >= @last_stable_version
    @logger.info("The #{action} action with the token #{token} have been accepted.") if logger?
    type.eql?("delete") ? @last_stable_version = -1 : @last_stable_version = new_version
    Thread.new do
      accepted_tasks(message, action, token)
    end
  else
    @logger.warn("CATCH AN ACCEPTED #{action} BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger?
  end
end
do_delta(message, new_version) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 275
def do_delta(message, new_version)
  if new_version && new_version >= @last_stable_version
    @logger.info("A delta action have been accepted.") if logger?
    @last_stable_version = new_version
    Thread.new { @topic_subscribed_callback[:delta].call(message) } unless @topic_subscribed_callback[:delta].nil?
  else
    @logger.warn("CATCH A DELTA BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger?
  end
end
do_message_callback(message) click to toggle source

The default callback that is called by every actions It acknowledge the accepted status if action success Call a specific callback for each actions if it defined have been register previously

# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 218
def do_message_callback(message)
  topic = message.topic
  action = parse_action(topic)
  type = parse_type(topic)
  payload = message.payload
  token = nil
  new_version = -1
  @parser_mutex.synchronize() {
    @payload_parser.set_message(payload)
    new_version = @payload_parser.get_attribute_value("version")
    token = @payload_parser.get_attribute_value("clientToken")
  }
  if %w(get update delete).include?(action)
    if @token_pool.has_key?(token)
      @token_pool[token].cancel
      @token_pool.delete(token)
      if type.eql?("accepted")
        do_accepted(message, action.to_sym, token, type, new_version)
      else
        do_rejected(token, action, new_version)
      end
      @task_count_mutex.synchronize {
        decresase_task_count(action.to_sym)
      }
    end
  elsif %w(delta).include?(action)
    do_delta(message, new_version)
  end
end
do_rejected(token, action, new_version) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 266
def do_rejected(token, action, new_version)
  if new_version && new_version >= @last_stable_version
    @logger.info("The #{action} action with the token #{token} have been rejected.") if logger?
    @token_callback.delete(token)
  else
    @logger.warn("CATCH AN REJECTED #{action} BUT OUTDATED/INVALID VERSION (= #{new_version})\n") if logger?
  end
end
handle_subscription(action, timeout) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 285
def handle_subscription(action, timeout)
  @topic_manager.shadow_topic_subscribe(action.to_s, @default_callback)
  if @topic_manager.paho_client?
    ref = Time.now + timeout
    while !@is_subscribed[action] && handle_timeout(ref) do
      sleep 0.0001
    end
  else
    sleep 2
  end
end
handle_timeout(ref) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 297
def handle_timeout(ref)
  Time.now <= ref
end
parse_action(topic) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 305
def parse_action(topic)
  if topic.split('/')[5] == "delta"
    topic.split('/')[5]
  else
    topic.split('/')[4]
  end
end
parse_shadow_name(topic) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 301
def parse_shadow_name(topic)
  topic.split('/')[2]
end
parse_type(topic) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 313
def parse_type(topic)
  topic.split('/')[5]
end
register_action_callback(action, callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 192
def register_action_callback(action, callback, &block)
  if callback.is_a?(Proc)
    @topic_subscribed_callback[action] = callback
  elsif block_given?
    @topic_subscribed_callback[action] = block
  end
end
register_token_callback(token, callback, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 180
def register_token_callback(token, callback, &block)
  if callback.is_a?(Proc)
    @token_callback[token] = callback
  elsif block_given?
    @token_callback[token] = block
  end
end
remove_action_callback(action) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 200
def remove_action_callback(action)
  @topic_subscribed_callback[action] = nil
end
remove_token_callback(token) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 188
def remove_token_callback(token)
  @token_callback.delete(token)
end
set_basic_callback() click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 166
def set_basic_callback
  @default_callback = proc { |message| do_message_callback(message) }

  @topic_manager.on_suback = lambda do |topics|
    action = @topic_manager.retrieve_action(topics[0])
    @is_subscribed[action] ||= true unless action.nil?
  end

  @topic_manager.on_unsuback = lambda do |topics|
    action = @topic_manager.retrieve_action(topics[0])
    @is_subscribed[action] = false if action.nil?
  end
end
shadow_action(action, payload="", timeout=5, callback=nil, &block) click to toggle source
# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 122
def shadow_action(action, payload="", timeout=5, callback=nil, &block)
  current_token = Symbol
  timer = Timers::Group.new
  json_payload = ""
  @token_mutex.synchronize(){
    current_token = @token_handler.create_next_token
  }
  timer.after(timeout){ timeout_manager(action, current_token) }
  @parser_mutex.synchronize {
    @payload_parser.set_message(payload) unless payload == ""
    @payload_parser.set_attribute_value("clientToken", current_token)
    json_payload = @payload_parser.get_json
  }
  handle_subscription(action, timeout) unless @is_subscribed[action]
  @topic_manager.shadow_topic_publish(action.to_s, json_payload)
  @task_count_mutex.synchronize {
    @topic_subscribed_task_count[action] += 1
  }
  @token_pool[current_token] = timer
  register_token_callback(current_token, callback, &block)
  Thread.new{ timer.wait }
  current_token
end
timeout_manager(action_name, token) click to toggle source

Should cancel the token after a preset time interval

# File lib/aws_iot_device/mqtt_shadow_client/shadow_action_manager.rb, line 147
def timeout_manager(action_name, token)
  if @token_pool.has_key?(token)
    action = action_name.to_sym
    @token_pool.delete(token)
    @token_callback.delete(token)
    @logger.warn("The #{action_name} request with the token #{token} has timed out!\n") if logger?
    @task_count_mutex.synchronize {
      @topic_subscribed_task_count[action] -= 1
      unless @topic_subscribed_task_count[action] <= 0
        @topic_subscribed_task_count[action] = 0
        unless @persistent_subscribe
          @topic_manager.shadow_topic_unsubscribe(action)
          @is_subscribed[action.to_sym] = false
        end
      end
    }
  end
end