module Nutella::Net

This module implements the pub/sub and request/response APIs at the run level

Public Class Methods

async_request( channel, message=nil, callback ) click to toggle source

Performs an asynchronous request

@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

@param [Proc] callback the callback that is fired whenever a response is received. It takes one parameter (response).

# File lib/nutella_lib/net.rb, line 64
def self.async_request( channel, message=nil, callback )
  async_request_to(channel, message, callback, Nutella.app_id, Nutella.run_id)
end
callbacks() click to toggle source

Provides access to callbacks

# File lib/nutella_lib/net.rb, line 14
def self.callbacks; @callbacks end
handle_requests( channel, callback ) click to toggle source

Handles requests on a certain channel

@param [String] channel we want to listen for requests on. Can contain wildcard(s). @param [Proc] callback a lambda expression that is fired whenever a message is received.

The passed callback takes the following parameters:
- [String] the received message (payload). Messages that are not JSON are discarded.
- [Hash] the sender's identifiers (run_id, app_id, component_id and optionally resource_id)
- [*returns* Hash] The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.
# File lib/nutella_lib/net.rb, line 78
def self.handle_requests( channel, callback )
  handle_requests_on(channel, callback, Nutella.app_id, Nutella.run_id)
end
listen() click to toggle source

Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.

# File lib/nutella_lib/net.rb, line 99
def self.listen
  begin
    sleep
  rescue Interrupt
    # Simply returns once interrupted
  end
end
publish( channel, message=nil ) click to toggle source

Publishes a message to a channel

@param [String] channel we want to publish the message to. CANNOT contain wildcard(s)! @param [Object] message the message we are publishing. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
# File lib/nutella_lib/net.rb, line 43
def self.publish( channel, message=nil )
  publish_to( channel, message, Nutella.app_id, Nutella.run_id)
end
start_pinging() click to toggle source

Sends a ping every 5 seconds to pings channel of the proper level

# File lib/nutella_lib/net.rb, line 86
def self.start_pinging
  Nutella.ping_thread = Thread.new do
    loop do
      publish_to('pings', 'ping', Nutella.app_id, Nutella.run_id)
      sleep(5)
    end
  end
end
subscribe( channel, callback ) click to toggle source

Subscribes to a channel or to a set of channels.

@param [String] channel the channel or filter we are subscribing to. Can contain wildcard(s) @param [Proc] callback a lambda expression that is fired whenever a message is received.

The passed callback takes the following parameters:
- [String] message: the received message. Messages that are not JSON are discarded.
- [String] channel: the channel the message was received on (optional, only for wildcard subscriptions)
- [Hash] from: the sender's identifiers (run_id, app_id, component_id and optionally resource_id)
# File lib/nutella_lib/net.rb, line 25
def self.subscribe( channel, callback )
  subscribe_to( channel, callback, Nutella.app_id, Nutella.run_id)
end
subscriptions() click to toggle source

Provides access to the subscriptions

# File lib/nutella_lib/net.rb, line 11
def self.subscriptions; @subscriptions end
sync_request( channel, message=nil ) click to toggle source

Performs a synchronous request.

@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
# File lib/nutella_lib/net.rb, line 53
def self.sync_request( channel, message=nil )
  sync_request_to(channel, message, Nutella.app_id, Nutella.run_id)
end
unsubscribe( channel ) click to toggle source

Un-subscribes from a channel

@param [String] channel we want to unsubscribe from. Can contain wildcard(s).

# File lib/nutella_lib/net.rb, line 33
def self.unsubscribe( channel )
  unsubscribe_to( channel, Nutella.app_id, Nutella.run_id)
end

Private Class Methods

assemble_from() click to toggle source
# File lib/nutella_lib/net.rb, line 130
def self.assemble_from
  from = Hash.new
  if Nutella.run_id.nil?
    if Nutella.app_id.nil?
      from['type'] = 'framework'
    else
      from['type'] = 'app'
      from['app_id'] = Nutella.app_id
    end
  else
    from['type'] = 'run'
    from['run_id'] = Nutella.run_id
  end
  from['component_id'] = Nutella.component_id
  from['resource_id'] = Nutella.resource_id unless Nutella.resource_id.nil?
  from
end
async_request_to( channel, message=nil, callback, app_id, run_id ) click to toggle source

Performs an asynchronous request

@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

@param [Proc] callback the callback that is fired whenever a response is received. It takes one parameter (response). @param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 311
def self.async_request_to( channel, message=nil, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 1 parameter (response) when making an asynchronous request!' if callback.parameters.length!=1
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Prepare message
  m, id = prepare_message_for_request message
  # Prepare callback
  mqtt_cb = lambda do |mqtt_message|
    type, _, payload, m_id = extract_fields_from_message mqtt_message
    if m_id==id && type=='response'
      callback.call(payload)
      Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
    end
  end
  # Subscribe
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Publish message
  Nutella.mqtt.publish( padded_channel, m )
end
extract_fields_from_message(message) click to toggle source
# File lib/nutella_lib/net.rb, line 111
def self.extract_fields_from_message(message)
  mh = JSON.parse message
  return mh['type'], mh['from'], mh['payload'], mh['id']
end
handle_requests_on( channel, callback, app_id, run_id ) click to toggle source

Handles requests on a certain channel

@param [String] channel we want to listen for requests on. Can contain wildcard(s). @param [Proc] callback a lambda expression that is fired whenever a message is received.

The passed callback takes the following parameters:
- [String] the received message (payload). Messages that are not JSON are discarded.
- [Hash] the sender's identifiers (run_id, app_id, component_id and optionally resource_id)
- [*returns* Hash] The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.

@param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 343
def self.handle_requests_on( channel, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  raise 'You need to pass a callback with 2 parameter (request, from) when handling requests!' if callback.parameters.length!=2
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  mqtt_cb = lambda do |request|
    begin
      # Extract nutella fields
      type, from, payload, id = extract_fields_from_message request
      # Only handle requests that have proper id set
      return if type!='request' || id.nil?
      # Execute callback and send response
      m = self.prepare_message_for_response( callback.call( payload, from), id )
      Nutella.mqtt.publish( padded_channel, m )
    rescue JSON::ParserError
      # Make sure that request contains JSON, if not drop the message
      return
    rescue ArgumentError
      # Check that the passed callback has the right number of arguments
      STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'"
    end
  end
  # Subscribe to the channel
  Nutella.mqtt.subscribe(padded_channel, mqtt_cb)
  # Notify subscription
  publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, app_id, run_id)
end
pad_channel( channel, app_id, run_id ) click to toggle source
# File lib/nutella_lib/net.rb, line 116
def self.pad_channel( channel, app_id, run_id )
  raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?)
  return "/nutella/#{channel}" if (app_id.nil? && run_id.nil?)
  return "/nutella/apps/#{app_id}/#{channel}" if (!app_id.nil? && run_id.nil?)
  "/nutella/apps/#{app_id}/runs/#{run_id}/#{channel}"
end
prepare_message_for_publish( message ) click to toggle source
# File lib/nutella_lib/net.rb, line 148
def self.prepare_message_for_publish( message )
  if message.nil?
    return {type: 'publish', from: assemble_from}.to_json
  end
  {type: 'publish', from: assemble_from, payload: message}.to_json
end
prepare_message_for_request( message ) click to toggle source
# File lib/nutella_lib/net.rb, line 155
def self.prepare_message_for_request( message )
  id = message.hash + Random.rand(100)
  if message.nil?
    return {id: id, type: 'request', from: assemble_from}.to_json, id
  end
  return {id: id, type: 'request', from: assemble_from, payload: message}.to_json, id
end
prepare_message_for_response( message, id ) click to toggle source
# File lib/nutella_lib/net.rb, line 163
def self.prepare_message_for_response( message, id )
  if message.nil?
    return {id: id, type: 'response', from: assemble_from}.to_json
  end
  {id: id, type: 'response', from: assemble_from, payload: message}.to_json
end
publish_to( channel, message=nil, app_id, run_id ) click to toggle source

Publishes a message to a channel

@param [String] channel we want to publish the message to. CANNOT contain wildcard(s)! @param [Object] message the message we are publishing. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

@param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 257
def self.publish_to( channel, message=nil, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Throw exception if trying to publish something that is not JSON
  begin
    m = self.prepare_message_for_publish message
    Nutella.mqtt.publish( padded_channel, m )
  rescue
    STDERR.puts 'Error: you are trying to publish something that is not JSON'
    STDERR.puts $!
  end
end
subscribe_to( channel, callback, app_id, run_id ) click to toggle source

Subscribes to a channel or to a set of channels.

@param [String] channel the channel or filter we are subscribing to. Can contain wildcard(s) @param [Proc] callback a lambda expression that is fired whenever a message is received.

The passed callback takes the following parameters:
- [String] message: the received message. Messages that are not JSON are discarded.
- [String] channel: the channel the message was received on (optional, only for wildcard subscriptions)
- [Hash] from: the sender's identifiers (run_id, app_id, component_id and optionally resource_id)

@param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 181
def self.subscribe_to( channel, callback, app_id, run_id )
  # Check the passed callback has the right number of arguments
  if Nutella.mqtt.is_channel_wildcard?(channel)
    raise 'You need to pass a callback with 3 parameters (message, channel, from) when subscribing to a wildcard channel!' if callback.parameters.length!=3
  else
    raise 'You need to pass a callback with 2 parameters (message, from) when subscribing to a channel!' if callback.parameters.length!=2
  end
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Maintain unique subscriptions
  raise 'You can`t subscribe twice to the same channel`' if @subscriptions.include? padded_channel
  # Depending on what type of channel we are subscribing to (wildcard or simple)
  # register a different kind of callback
  if Nutella.mqtt.is_channel_wildcard?(padded_channel)
    mqtt_cb = lambda do |mqtt_message, mqtt_channel|
      begin
        type, from, payload, _ = extract_fields_from_message mqtt_message
        callback.call(payload, un_pad_channel(mqtt_channel, app_id, run_id), from) if type=='publish'
      rescue JSON::ParserError
        # Make sure the message is JSON, if not drop the message
        return
      rescue ArgumentError
        # Check the passed callback has the right number of arguments
        STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'channel' and 'from'"
      end
    end
  else
    mqtt_cb = lambda do |message|
      begin
        type, from, payload, _ = extract_fields_from_message message
        callback.call(payload, from)  if type=='publish'
      rescue JSON::ParserError
        # Make sure the message is JSON, if not drop the message
        return
      rescue ArgumentError
        # Check the passed callback has the right number of arguments
        STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'"
      end
    end
  end
  # Add to subscriptions, save mqtt callback and subscribe
  @subscriptions.push padded_channel
  @callbacks.push mqtt_cb
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Notify subscription
  publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, app_id, run_id)
end
sync_request_to( channel, message=nil, app_id, run_id ) click to toggle source

Performs a synchronous request.

@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,

nil/empty (default), a string, a hash and, in general, anything with a .to_json method.

@param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 278
def self.sync_request_to( channel, message=nil, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  # Prepare message
  m, id = prepare_message_for_request message
  # Initialize response
  response = nil
  # Prepare callback
  mqtt_cb = lambda do |mqtt_message|
    type, _, payload, m_id = extract_fields_from_message mqtt_message
    if m_id==id && type=='response'
      response = payload
      Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
    end
  end
  # Subscribe
  Nutella.mqtt.subscribe( padded_channel, mqtt_cb )
  # Publish message
  Nutella.mqtt.publish( padded_channel, m )
  # Wait for the response to come back
  sleep(0.1) while response.nil?
  response
end
un_pad_channel( channel, app_id, run_id ) click to toggle source
# File lib/nutella_lib/net.rb, line 123
def self.un_pad_channel( channel, app_id, run_id )
  raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?)
  return channel.gsub('/nutella/', '') if (app_id.nil? && run_id.nil?)
  return channel.gsub("/nutella/apps/#{app_id}/", '') if (!app_id.nil? && run_id.nil?)
  channel.gsub("/nutella/apps/#{app_id}/runs/#{run_id}/", '')
end
unsubscribe_to( channel, app_id, run_id ) click to toggle source

Un-subscribes from a channel

@param [String] channel we want to unsubscribe from. Can contain wildcard(s). @param [String] app_id used to pad channels @param [String] run_id used to pad channels

# File lib/nutella_lib/net.rb, line 235
def self.unsubscribe_to( channel, app_id, run_id )
  # Pad channel
  padded_channel = pad_channel(channel, app_id, run_id)
  idx = @subscriptions.index padded_channel
  # If we are not subscribed to this channel, return (no error is given)
  return if idx.nil?
  # Fetch the mqtt_callback associated with this channel/subscription
  mqtt_cb = @callbacks[idx]
  # Remove from subscriptions, callbacks and unsubscribe
  @subscriptions.delete_at idx
  @callbacks.delete_at idx
  Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb )
end