module Nutella::Net
This module implements the pub/sub and request/response APIs at the run level
Public Class Methods
Performs an asynchronous request
@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
@param [Proc] callback the callback that is fired whenever a response is received. It takes one parameter (response).
# File lib/nutella_lib/net.rb, line 64 def self.async_request( channel, message=nil, callback ) async_request_to(channel, message, callback, Nutella.app_id, Nutella.run_id) end
Provides access to callbacks
# File lib/nutella_lib/net.rb, line 14 def self.callbacks; @callbacks end
Handles requests on a certain channel
@param [String] channel we want to listen for requests on. Can contain wildcard(s). @param [Proc] callback a lambda expression that is fired whenever a message is received.
The passed callback takes the following parameters: - [String] the received message (payload). Messages that are not JSON are discarded. - [Hash] the sender's identifiers (run_id, app_id, component_id and optionally resource_id) - [*returns* Hash] The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.
# File lib/nutella_lib/net.rb, line 78 def self.handle_requests( channel, callback ) handle_requests_on(channel, callback, Nutella.app_id, Nutella.run_id) end
Listens for incoming messages. All this function does is to put the thread to sleep and wait for something to happen over the network to wake up.
# File lib/nutella_lib/net.rb, line 99 def self.listen begin sleep rescue Interrupt # Simply returns once interrupted end end
Publishes a message to a channel
@param [String] channel we want to publish the message to. CANNOT contain wildcard(s)! @param [Object] message the message we are publishing. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
# File lib/nutella_lib/net.rb, line 43 def self.publish( channel, message=nil ) publish_to( channel, message, Nutella.app_id, Nutella.run_id) end
Sends a ping every 5 seconds to pings channel of the proper level
# File lib/nutella_lib/net.rb, line 86 def self.start_pinging Nutella.ping_thread = Thread.new do loop do publish_to('pings', 'ping', Nutella.app_id, Nutella.run_id) sleep(5) end end end
Subscribes to a channel or to a set of channels.
@param [String] channel the channel or filter we are subscribing to. Can contain wildcard(s) @param [Proc] callback a lambda expression that is fired whenever a message is received.
The passed callback takes the following parameters: - [String] message: the received message. Messages that are not JSON are discarded. - [String] channel: the channel the message was received on (optional, only for wildcard subscriptions) - [Hash] from: the sender's identifiers (run_id, app_id, component_id and optionally resource_id)
# File lib/nutella_lib/net.rb, line 25 def self.subscribe( channel, callback ) subscribe_to( channel, callback, Nutella.app_id, Nutella.run_id) end
Provides access to the subscriptions
# File lib/nutella_lib/net.rb, line 11 def self.subscriptions; @subscriptions end
Performs a synchronous request.
@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
# File lib/nutella_lib/net.rb, line 53 def self.sync_request( channel, message=nil ) sync_request_to(channel, message, Nutella.app_id, Nutella.run_id) end
Un-subscribes from a channel
@param [String] channel we want to unsubscribe from. Can contain wildcard(s).
# File lib/nutella_lib/net.rb, line 33 def self.unsubscribe( channel ) unsubscribe_to( channel, Nutella.app_id, Nutella.run_id) end
Private Class Methods
# File lib/nutella_lib/net.rb, line 130 def self.assemble_from from = Hash.new if Nutella.run_id.nil? if Nutella.app_id.nil? from['type'] = 'framework' else from['type'] = 'app' from['app_id'] = Nutella.app_id end else from['type'] = 'run' from['run_id'] = Nutella.run_id end from['component_id'] = Nutella.component_id from['resource_id'] = Nutella.resource_id unless Nutella.resource_id.nil? from end
Performs an asynchronous request
@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
@param [Proc] callback the callback that is fired whenever a response is received. It takes one parameter (response). @param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 311 def self.async_request_to( channel, message=nil, callback, app_id, run_id ) # Check the passed callback has the right number of arguments raise 'You need to pass a callback with 1 parameter (response) when making an asynchronous request!' if callback.parameters.length!=1 # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Prepare message m, id = prepare_message_for_request message # Prepare callback mqtt_cb = lambda do |mqtt_message| type, _, payload, m_id = extract_fields_from_message mqtt_message if m_id==id && type=='response' callback.call(payload) Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end end # Subscribe Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Publish message Nutella.mqtt.publish( padded_channel, m ) end
# File lib/nutella_lib/net.rb, line 111 def self.extract_fields_from_message(message) mh = JSON.parse message return mh['type'], mh['from'], mh['payload'], mh['id'] end
Handles requests on a certain channel
@param [String] channel we want to listen for requests on. Can contain wildcard(s). @param [Proc] callback a lambda expression that is fired whenever a message is received.
The passed callback takes the following parameters: - [String] the received message (payload). Messages that are not JSON are discarded. - [Hash] the sender's identifiers (run_id, app_id, component_id and optionally resource_id) - [*returns* Hash] The response sent back to the client that performed the request. Whatever is returned by the callback is marshaled into a JSON string and sent via MQTT.
@param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 343 def self.handle_requests_on( channel, callback, app_id, run_id ) # Check the passed callback has the right number of arguments raise 'You need to pass a callback with 2 parameter (request, from) when handling requests!' if callback.parameters.length!=2 # Pad channel padded_channel = pad_channel(channel, app_id, run_id) mqtt_cb = lambda do |request| begin # Extract nutella fields type, from, payload, id = extract_fields_from_message request # Only handle requests that have proper id set return if type!='request' || id.nil? # Execute callback and send response m = self.prepare_message_for_response( callback.call( payload, from), id ) Nutella.mqtt.publish( padded_channel, m ) rescue JSON::ParserError # Make sure that request contains JSON, if not drop the message return rescue ArgumentError # Check that the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'" end end # Subscribe to the channel Nutella.mqtt.subscribe(padded_channel, mqtt_cb) # Notify subscription publish_to('subscriptions', {'type' => 'handle_requests', 'channel' => padded_channel}, app_id, run_id) end
# File lib/nutella_lib/net.rb, line 116 def self.pad_channel( channel, app_id, run_id ) raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?) return "/nutella/#{channel}" if (app_id.nil? && run_id.nil?) return "/nutella/apps/#{app_id}/#{channel}" if (!app_id.nil? && run_id.nil?) "/nutella/apps/#{app_id}/runs/#{run_id}/#{channel}" end
# File lib/nutella_lib/net.rb, line 148 def self.prepare_message_for_publish( message ) if message.nil? return {type: 'publish', from: assemble_from}.to_json end {type: 'publish', from: assemble_from, payload: message}.to_json end
# File lib/nutella_lib/net.rb, line 155 def self.prepare_message_for_request( message ) id = message.hash + Random.rand(100) if message.nil? return {id: id, type: 'request', from: assemble_from}.to_json, id end return {id: id, type: 'request', from: assemble_from, payload: message}.to_json, id end
# File lib/nutella_lib/net.rb, line 163 def self.prepare_message_for_response( message, id ) if message.nil? return {id: id, type: 'response', from: assemble_from}.to_json end {id: id, type: 'response', from: assemble_from, payload: message}.to_json end
Publishes a message to a channel
@param [String] channel we want to publish the message to. CANNOT contain wildcard(s)! @param [Object] message the message we are publishing. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
@param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 257 def self.publish_to( channel, message=nil, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Throw exception if trying to publish something that is not JSON begin m = self.prepare_message_for_publish message Nutella.mqtt.publish( padded_channel, m ) rescue STDERR.puts 'Error: you are trying to publish something that is not JSON' STDERR.puts $! end end
Subscribes to a channel or to a set of channels.
@param [String] channel the channel or filter we are subscribing to. Can contain wildcard(s) @param [Proc] callback a lambda expression that is fired whenever a message is received.
The passed callback takes the following parameters: - [String] message: the received message. Messages that are not JSON are discarded. - [String] channel: the channel the message was received on (optional, only for wildcard subscriptions) - [Hash] from: the sender's identifiers (run_id, app_id, component_id and optionally resource_id)
@param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 181 def self.subscribe_to( channel, callback, app_id, run_id ) # Check the passed callback has the right number of arguments if Nutella.mqtt.is_channel_wildcard?(channel) raise 'You need to pass a callback with 3 parameters (message, channel, from) when subscribing to a wildcard channel!' if callback.parameters.length!=3 else raise 'You need to pass a callback with 2 parameters (message, from) when subscribing to a channel!' if callback.parameters.length!=2 end # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Maintain unique subscriptions raise 'You can`t subscribe twice to the same channel`' if @subscriptions.include? padded_channel # Depending on what type of channel we are subscribing to (wildcard or simple) # register a different kind of callback if Nutella.mqtt.is_channel_wildcard?(padded_channel) mqtt_cb = lambda do |mqtt_message, mqtt_channel| begin type, from, payload, _ = extract_fields_from_message mqtt_message callback.call(payload, un_pad_channel(mqtt_channel, app_id, run_id), from) if type=='publish' rescue JSON::ParserError # Make sure the message is JSON, if not drop the message return rescue ArgumentError # Check the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload', 'channel' and 'from'" end end else mqtt_cb = lambda do |message| begin type, from, payload, _ = extract_fields_from_message message callback.call(payload, from) if type=='publish' rescue JSON::ParserError # Make sure the message is JSON, if not drop the message return rescue ArgumentError # Check the passed callback has the right number of arguments STDERR.puts "The callback you passed to subscribe has the #{$!}: it needs 'payload' and 'from'" end end end # Add to subscriptions, save mqtt callback and subscribe @subscriptions.push padded_channel @callbacks.push mqtt_cb Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Notify subscription publish_to('subscriptions', {'type' => 'subscribe', 'channel' => padded_channel}, app_id, run_id) end
Performs a synchronous request.
@param [String] channel we want to make the request to. CANNOT contain wildcard(s)! @param [Object] message the body of request. This can be,
nil/empty (default), a string, a hash and, in general, anything with a .to_json method.
@param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 278 def self.sync_request_to( channel, message=nil, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) # Prepare message m, id = prepare_message_for_request message # Initialize response response = nil # Prepare callback mqtt_cb = lambda do |mqtt_message| type, _, payload, m_id = extract_fields_from_message mqtt_message if m_id==id && type=='response' response = payload Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end end # Subscribe Nutella.mqtt.subscribe( padded_channel, mqtt_cb ) # Publish message Nutella.mqtt.publish( padded_channel, m ) # Wait for the response to come back sleep(0.1) while response.nil? response end
# File lib/nutella_lib/net.rb, line 123 def self.un_pad_channel( channel, app_id, run_id ) raise 'If the run_id is specified, app_id needs to also be specified' if (!run_id.nil? && app_id.nil?) return channel.gsub('/nutella/', '') if (app_id.nil? && run_id.nil?) return channel.gsub("/nutella/apps/#{app_id}/", '') if (!app_id.nil? && run_id.nil?) channel.gsub("/nutella/apps/#{app_id}/runs/#{run_id}/", '') end
Un-subscribes from a channel
@param [String] channel we want to unsubscribe from. Can contain wildcard(s). @param [String] app_id used to pad channels @param [String] run_id used to pad channels
# File lib/nutella_lib/net.rb, line 235 def self.unsubscribe_to( channel, app_id, run_id ) # Pad channel padded_channel = pad_channel(channel, app_id, run_id) idx = @subscriptions.index padded_channel # If we are not subscribed to this channel, return (no error is given) return if idx.nil? # Fetch the mqtt_callback associated with this channel/subscription mqtt_cb = @callbacks[idx] # Remove from subscriptions, callbacks and unsubscribe @subscriptions.delete_at idx @callbacks.delete_at idx Nutella.mqtt.unsubscribe( padded_channel, mqtt_cb ) end