class Stomp::Client
Typical Stomp
client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
Attributes
Parameters hash
Public Class Methods
Source
# File lib/stomp/client.rb, line 83 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) @logger = @parameters[:logger] ||= Stomp::NullLogger.new @start_timeout = @parameters[:start_timeout] || 0 @parameters[:client_main] = Thread::current ## p [ "CLINDBG", @parameters[:client_main] ] check_arguments!() # p [ "cldbg01", @parameters ] begin Timeout::timeout(@start_timeout) { create_error_handler create_connection(autoflush) start_listeners() } rescue Timeout::Error # p [ "cldbg02" ] ex = Stomp::Error::StartTimeoutException.new(@start_timeout) raise ex end end
A new Client
object can be initialized using three forms:
Hash
(this is the recommended Client
initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], # These are the default parameters and do not need to be set :reliable => true, # reliable (use failover) :initial_reconnect_delay => 0.01, # initial delay before reconnect (secs) :max_reconnect_delay => 30.0, # max delay before reconnect :use_exponential_back_off => true, # increase delay between reconnect attpempts :back_off_multiplier => 2, # next delay multiplier :max_reconnect_attempts => 0, # retry forever, use # for maximum attempts :randomize => false, # do not radomize hosts hash before reconnect :connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds :connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+) :parse_timeout => 5, # IO::select wait time on socket reads :logger => nil, # user suplied callback logger instance :dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover :closed_check => true, # check first if closed in each protocol method :hbser => false, # raise on heartbeat send exception :stompconn => false, # Use STOMP instead of CONNECT :usecrlf => false, # Use CRLF command and header line ends (1.2+) :max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry :max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry :fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ... # For fast heartbeat senders. 'fast' == YMMV. If not # correct for your environment, expect unnecessary fail overs :connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs :tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm :start_timeout => 0, # Timeout around Stomp::Client initialization :sslctx_newparm => nil, # Param for SSLContext.new :ssl_post_conn_check => true, # Further verify broker identity :nto_cmd_read => true, # No timeout on COMMAND read } e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp
URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port e.g. c = Stomp::Client.new(urlstring)
Source
# File lib/stomp/client.rb, line 139 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
open is syntactic sugar for ‘Client.new’, see ‘initialize’ for usage.
Public Instance Methods
Source
# File lib/stomp/client.rb, line 155 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| find_listener(message) # find_listener also calls the listener end end end
Abort aborts work in a transaction by name.
Source
# File lib/stomp/client.rb, line 204 def ack(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end context = ack_context_for(message, headers) @connection.ack context[:message_id], context[:headers] end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => ‘client’}). Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
Source
# File lib/stomp/client.rb, line 232 def ack_context_for(message, headers) id = case protocol when Stomp::SPL_12 'ack' when Stomp::SPL_11 headers = headers.merge(:subscription => message.headers['subscription']) 'message-id' else 'message-id' end {:message_id => message.headers[id], :headers => headers} end
Source
# File lib/stomp/client.rb, line 358 def autoflush() @connection.autoflush() end
autoflush returns the current connection’s autoflush setting.
Source
# File lib/stomp/client.rb, line 353 def autoflush=(af) @connection.autoflush = af end
autoflush= sets the current connection’s autoflush setting.
Source
# File lib/stomp/client.rb, line 150 def begin(name, headers = {}) @connection.begin(name, headers) end
Begin starts work in a a transaction by name.
Source
# File lib/stomp/client.rb, line 290 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
Source
# File lib/stomp/client.rb, line 279 def closed?() @connection.closed?() end
close? tests if this client connection is closed.
Source
# File lib/stomp/client.rb, line 168 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
Commit commits work in a transaction by name.
Source
# File lib/stomp/client.rb, line 264 def connection_frame() @connection.connection_frame end
Return the broker’s CONNECTED frame to the client. Misnamed.
Source
# File lib/stomp/client.rb, line 110 def create_error_handler client_thread = Thread.current if client_thread.respond_to?(:report_on_exception=) client_thread.report_on_exception=false end @error_listener = lambda do |error| exception = case error.body when /ResourceAllocationException/i Stomp::Error::ProducerFlowControlException.new(error) when /ProtocolException/i Stomp::Error::ProtocolException.new(error) else Stomp::Error::BrokerException.new(error) end @receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id'] client_thread.raise exception end end
Source
# File lib/stomp/client.rb, line 269 def disconnect_receipt() @connection.disconnect_receipt end
Return any RECEIPT frame received by DISCONNECT.
Source
# File lib/stomp/client.rb, line 342 def hbrecv_count() @connection.hbrecv_count() end
hbrecv_count
returns the current connection’s heartbeat receive count.
Source
# File lib/stomp/client.rb, line 332 def hbrecv_interval() @connection.hbrecv_interval() end
hbrecv_interval
returns the connection’s heartbeat receive interval.
Source
# File lib/stomp/client.rb, line 337 def hbsend_count() @connection.hbsend_count() end
hbsend_count
returns the current connection’s heartbeat send count.
Source
# File lib/stomp/client.rb, line 327 def hbsend_interval() @connection.hbsend_interval() end
hbsend_interval
returns the connection’s heartbeat send interval.
Source
# File lib/stomp/client.rb, line 145 def join(limit = nil) @listener_thread.join(limit) end
join the listener thread for this client, generally used to wait for a quit signal.
Source
# File lib/stomp/client.rb, line 284 def jruby?() @connection.jruby end
jruby? tests if the connection has detcted a JRuby environment
Source
# File lib/stomp/client.rb, line 226 def nack(message, headers = {}) context = ack_context_for(message, headers) @connection.nack context[:message_id], context[:headers] end
Stomp
1.1+ NACK.
Source
# File lib/stomp/client.rb, line 274 def open? @connection.open?() end
open? tests if this client connection is open.
Source
# File lib/stomp/client.rb, line 348 def poll() @connection.poll() end
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
Source
# File lib/stomp/client.rb, line 307 def protocol() @connection.protocol() end
protocol returns the current client’s protocol level.
Source
# File lib/stomp/client.rb, line 254 def publish(destination, message, headers = {}) headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end @connection.publish(destination, message, headers) end
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
Source
# File lib/stomp/client.rb, line 296 def running() @listener_thread && !!@listener_thread.status end
running checks if the thread was created and is not dead.
Source
# File lib/stomp/client.rb, line 301 def set_logger(logger) @logger = logger @connection.set_logger(logger) end
set_logger
identifies a new callback logger.
Source
# File lib/stomp/client.rb, line 317 def sha1(data) @connection.sha1(data) end
sha1 returns a SHA1 sum of a given string.
Source
# File lib/stomp/client.rb, line 177 def subscribe(destination, headers = {}) raise Stomp::Error::NoListenerGiven unless block_given? headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. headers = headers.merge(:id => build_subscription_id(destination, headers)) if @listeners[headers[:id]] raise Stomp::Error::DuplicateSubscription end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
Source
# File lib/stomp/client.rb, line 246 def unreceive(message, options = {}) @connection.unreceive(message, options) end
Unreceive a message, sending it back to its queue or to the DLQ.
Source
# File lib/stomp/client.rb, line 193 def unsubscribe(destination, headers = {}) headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination headers = headers.merge(:id => build_subscription_id(destination, headers)) @connection.unsubscribe(destination, headers) @listeners[headers[:id]] = nil end
Unsubscribe from a subscription by name.
Source
# File lib/stomp/client.rb, line 322 def uuid() @connection.uuid() end
uuid returns a type 4 UUID.
Source
# File lib/stomp/client.rb, line 312 def valid_utf8?(s) @connection.valid_utf8?(s) end
valid_utf8? validates any given string for UTF8 compliance.
Private Instance Methods
Source
# File lib/client/utils.rb, line 80 def build_subscription_id(destination, headers) return headers[:id] until headers[:id].nil? return headers['id'] until headers['id'].nil? # p [ "DBBSID1", destination, headers ] Digest::SHA1.hexdigest(destination) end
Source
# File lib/client/utils.rb, line 106 def check_arguments!() raise ArgumentError.new("missing :hosts parameter") unless @parameters[:hosts] raise ArgumentError.new("invalid :hosts type") unless @parameters[:hosts].is_a?(Array) @parameters[:hosts].each do |hv| # Validate port requested raise ArgumentError.new("empty :port value in #{hv.inspect}") if hv[:port] == '' unless hv[:port].nil? tpv = hv[:port].to_i raise ArgumentError.new("invalid :port value=#{tpv} from #{hv.inspect}") if tpv < 1 || tpv > 65535 end # Validate host requested (no validation here. if nil or '', localhost will # be used in #Connection.) end raise ArgumentError unless @parameters[:reliable].is_a?(TrueClass) || @parameters[:reliable].is_a?(FalseClass) # if @parameters[:reliable] && @start_timeout > 0 warn "WARN detected :reliable == true and :start_timeout > 0" warn "WARN this may cause incorrect fail-over behavior" warn "WARN use :start_timeout => 0 to correct fail-over behavior" end end
A sanity check of required arguments.
Source
# File lib/stomp/client.rb, line 131 def create_connection(autoflush) # p [ "ccon01", @parameters ] @connection = Connection.new(@parameters) @connection.autoflush = autoflush end
Source
# File lib/client/utils.rb, line 171 def create_listener_maps @listeners = {} @receipt_listeners = {} @replay_messages_by_txn = {} @listener_map = Hash.new do |message| @failure = nil unless @connection.slog(:on_miscerr, @connection.log_params, "Received unknown frame type: '#{message.command}'\n") warn "Received unknown frame type: '#{message.command}'\n" end end @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) } @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) } @listener_map[Stomp::CMD_ERROR] = @error_listener end
Source
# File lib/client/utils.rb, line 129 def filter_options(options) new_options = {} new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i new_options[:randomize] = options["randomize"] == "true" # Default: false new_options[:connect_timeout] = 0 new_options end
filter_options
returns a new Hash
of filtered options.
Source
# File lib/client/utils.rb, line 143 def find_listener(message) subscription_id = message.headers['subscription'] if subscription_id == nil # For backward compatibility, some messages may already exist with no # subscription id, in which case we can attempt to synthesize one. set_subscription_id_if_missing(message.headers['destination'], message.headers) subscription_id = message.headers[:id] end listener = @listeners[subscription_id] listener.call(message) if listener end
find_listener
returns the listener for a given subscription in a given message.
Source
# File lib/client/utils.rb, line 163 def find_receipt_listener(message) listener = @receipt_listeners[message.headers['receipt-id']] if listener listener.call(message) @receipt_listeners.delete(message.headers['receipt-id']) end end
Source
# File lib/client/utils.rb, line 40 def parse_failover_url(login) rval = nil original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings md = FAILOVER_REGEX.match(login) $VERBOSE = original_verbose if md finhosts = parse_hosts(login) options = {} if md_last = md[-1] parts = md_last.split(/&|=/) raise Stomp::Error::MalformedFailoverOptionsError unless ( parts.size % 2 ) == 0 options = Hash[*parts] end @parameters = {:hosts => finhosts}.merge!(filter_options(options)) @parameters[:reliable] = true rval = true end rval end
e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
Source
# File lib/client/utils.rb, line 11 def parse_hash_params(params) return false unless params.is_a?(Hash) @parameters = params # Do not override user choice of false. @parameters[:reliable] = true unless @parameters[:reliable] == false true end
Source
# File lib/client/utils.rb, line 88 def parse_hosts(url) hosts = [] original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/ url.scan(host_match).each do |match| host = {} host[:ssl] = match[0] == "+ssl" ? true : false host[:login] = match[3] || "" host[:passcode] = match[4] || "" host[:host] = match[5] host[:port] = match[6].to_i hosts << host end $VERBOSE = original_verbose hosts end
Parse a stomp URL.
Source
# File lib/client/utils.rb, line 63 def parse_positional_params(login, passcode, host, port, reliable) @parameters = { :reliable => reliable, :hosts => [ { :login => login, :passcode => passcode, :host => host, :port => port.to_i } ] } true end
Source
# File lib/client/utils.rb, line 21 def parse_stomp_url(login) original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings regexp = /^stomp:\/\/#{URL_REPAT}/ url = regexp.match(login) $VERBOSE = original_verbose return false unless url @login = url[3] || "" @passcode = url[4] || "" @host = url[5] @port = url[6].to_i @parameters = { :reliable => false, :hosts => [ { :login => @login, :passcode => @passcode, :host => @host, :port => @port} ] } true end
Source
# File lib/client/utils.rb, line 157 def register_receipt_listener(listener) id = uuid @receipt_listeners[id] = listener id end
Register a receipt listener.
Source
# File lib/client/utils.rb, line 76 def set_subscription_id_if_missing(destination, headers) headers[:id] = build_subscription_id(destination, headers) end
Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.github.com/
Source
# File lib/client/utils.rb, line 189 def start_listeners() create_listener_maps @listener_thread = Thread.start do loop do message = @connection.receive # AMQ specific behavior if message.nil? && (!@parameters[:reliable]) raise Stomp::Error::NilMessageError end next unless message # message can be nil on rapid AMQ stop/start sequences @listener_map[message.command].call(message) end end end
Start a single listener thread. Misnamed I think.