class Stomp::Connection
Low level connection which maps commands and supports synchronous receives
Attributes
Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.
The CONNECTED frame from the broker.
Any disconnect RECEIPT frame if requested.
Heartbeat receive has been on time.
Heartbeat send has been successful.
Currently-connected host and port
JRuby detected
Currently-connected host and port
The Stomp
Protocol version.
A unique session ID, assigned by the broker.
Public Class Methods
Source
# File lib/stomp/connection.rb, line 44 def self.default_port(ssl) ssl ? 61612 : 61613 end
default_port
returns the default port used by the gem for TCP or SSL.
Source
# File lib/stomp/connection.rb, line 112 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) @protocol = Stomp::SPL_10 # Assumed at first @hb_received = true # Assumed at first @hb_sent = true # Assumed at first @hbs = @hbr = false # Sending/Receiving heartbeats. Assume no for now. @jruby = false # Assumed at first # Initialize some variables @closed, @socket, @hhas10, @rt, @st = true, nil, false, nil, nil if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/ @jruby = true end ct = Thread.current if ct.respond_to?(:report_on_exception=) ct.report_on_exception=false end if login.is_a?(Hash) hashed_initialize(login) else @host = host @port = port @login = login @passcode = passcode @reliable = reliable @reconnect_delay = reconnect_delay @connect_headers = connect_headers @ssl = false @parameters = nil @parse_timeout = 5 # To override, use hashed parameters @connect_timeout = 0 # To override, use hashed parameters @logger = Stomp::NullLogger.new # To override, use hashed parameters @autoflush = false # To override, use hashed parameters or setter @closed_check = true # Run closed check in each protocol method @hbser = false # Raise if heartbeat send exception @stompconn = false # If true, use STOMP rather than CONNECT @usecrlf = false # If true, use \r\n as line ends (1.2 only) @max_hbread_fails = 0 # 0 means never retry for HB read failures @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment @connread_timeout = 0 # Connect read CONNECTED/ERROR timeout @tcp_nodelay = true # Disable Nagle @start_timeout = 0 # Client only, startup timeout @sslctx_newparm = nil # SSLContext.new paramater @ssl_post_conn_check = true # Additional broker verification @nto_cmd_read = true # No timeout on COMMAND read warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\// end # Use Mutexes: only one lock per each thread. # Reverted to original implementation attempt using Mutex. @transmit_semaphore = Mutex.new @read_semaphore = Mutex.new @socket_semaphore = Mutex.new @gets_semaphore = Mutex.new @subscriptions = {} @failure = nil @connection_attempts = 0 socket end
A new Connection
object can be initialized using two forms:
Hash
(this is the recommended Connection
initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], # These are the default parameters and do not need to be set :reliable => true, # reliable (use failover) :initial_reconnect_delay => 0.01, # initial delay before reconnect (secs) :max_reconnect_delay => 30.0, # max delay before reconnect :use_exponential_back_off => true, # increase delay between reconnect attpempts :back_off_multiplier => 2, # next delay multiplier :max_reconnect_attempts => 0, # retry forever, use # for maximum attempts :randomize => false, # do not radomize hosts hash before reconnect :connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds :connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+) :parse_timeout => 5, # IO::select wait time on socket reads :logger => nil, # user suplied callback logger instance :dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover :closed_check => true, # check first if closed in each protocol method :hbser => false, # raise on heartbeat send exception :stompconn => false, # Use STOMP instead of CONNECT :usecrlf => false, # Use CRLF command and header line ends (1.2+) :max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry :max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry :fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ... # For fast heartbeat senders. 'fast' == YMMV. If not # correct for your environment, expect unnecessary fail overs :connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs :tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm :start_timeout => 0, # Timeout around Stomp::Client initialization :sslctx_newparm => nil, # Param for SSLContext.new :ssl_post_conn_check => true, # Further verify broker identity :nto_cmd_read => true, # No timeout on COMMAND read } e.g. c = Stomp::Connection.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) reconnect_delay (Integer, default : 5) e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)
Source
# File lib/stomp/connection.rb, line 210 def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) end
open is syntactic sugar for ‘Connection.new’, see ‘initialize’ for usage.
Source
# File lib/stomp/connection.rb, line 49 def self.ssl_v2xoptions() require 'openssl' unless defined?(OpenSSL) # Mimic code in later versions of Ruby 2.x (and backported to later # versions of 1.9.3). opts = OpenSSL::SSL::OP_ALL opts &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS) opts |= OpenSSL::SSL::OP_NO_COMPRESSION if defined?(OpenSSL::SSL::OP_NO_COMPRESSION) opts |= OpenSSL::SSL::OP_NO_SSLv2 if defined?(OpenSSL::SSL::OP_NO_SSLv2) opts |= OpenSSL::SSL::OP_NO_SSLv3 if defined?(OpenSSL::SSL::OP_NO_SSLv3) end
SSL Helper
Public Instance Methods
Source
# File lib/connection/netio.rb, line 11 def _interruptible_gets(read_socket) # The gets thread may be interrupted by the heartbeat thread. Ensure that # if so interrupted, a new gets cannot start until after the heartbeat # thread finishes its work. This is PURELY to avoid a segfault bug # involving OpenSSL::Buffer. @gets_semaphore.synchronize { @getst = Thread.current } read_socket.gets ensure @gets_semaphore.synchronize { @getst = nil } end
Source
# File lib/stomp/connection.rb, line 334 def abort(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) slog(:on_abort, log_params, headers) transmit(Stomp::CMD_ABORT, headers) end
Abort aborts a transaction by name.
Source
# File lib/stomp/connection.rb, line 259 def ack(message_or_ack_id, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == "" headers = headers.symbolize_keys case @protocol when Stomp::SPL_12 # The ACK frame MUST include an "id" header matching the "ack" header # of the MESSAGE being acknowledged. headers[:id] = message_or_ack_id when Stomp::SPL_11 # ACK has two REQUIRED headers: "message-id", which MUST contain a value # matching the message-id header of the MESSAGE being acknowledged and # "subscription", which MUST be set to match the value of SUBSCRIBE's # id header. headers[:'message-id'] = message_or_ack_id raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription] else # Stomp::SPL_10 # ACK has one required header, "message-id", which must contain a value # matching the message-id for the MESSAGE being acknowledged. headers[:'message-id'] = message_or_ack_id end _headerCheck(headers) slog(:on_ack, log_params, headers) transmit(Stomp::CMD_ACK, headers) end
Acknowledge a message, used when a subscription has specified client acknowledgement e.g.:
connection.subscribe("/queue/a", :ack => 'client') connection.subscribe("/queue/a", :ack => 'client-individual')
as appropriate for the protocol level.
Accepts an optional transaction header ( :transaction => ‘some_transaction_id’ ).
When the connection protocol level is 1.0 or 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged e.g.:
connection.ack(message.headers['message-id'])
When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged e.g.:
connection.ack(message.headers['ack'])
In summary, the behavior is protocol level dependent, see the specifications and comments in the code.
Source
# File lib/stomp/connection.rb, line 225 def begin(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) slog(:on_begin, log_params, headers) transmit(Stomp::CMD_BEGIN, headers) end
Begin starts a transaction, and requires a name for the transaction
Source
# File lib/stomp/connection.rb, line 452 def client_ack?(message) headers = @subscriptions[message.headers[:destination]] !headers.nil? && headers[:ack] == "client" end
client_ack? determines if headers contain :ack => “client”.
Source
# File lib/stomp/connection.rb, line 220 def closed? @closed end
closed? tests if this connection is closed.
Source
# File lib/stomp/connection.rb, line 322 def commit(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) slog(:on_commit, log_params, headers) transmit(Stomp::CMD_COMMIT, headers) end
Commit commits a transaction by name.
Source
# File lib/stomp/connection.rb, line 459 def disconnect(headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys _headerCheck(headers) if @protocol >= Stomp::SPL_11 @st.kill if @st # Kill ticker thread if any @rt.kill if @rt # Kill ticker thread if any end transmit(Stomp::CMD_DISCONNECT, headers) @disconnect_receipt = receive if headers[:receipt] slog(:on_disconnect, log_params) close_socket end
disconnect closes this connection. If requested, a disconnect RECEIPT will be received.
Source
# File lib/stomp/connection.rb, line 177 def hashed_initialize(params) lp = _hdup(params) @parameters = refine_params(lp) @reliable = @parameters[:reliable] @reconnect_delay = @parameters[:initial_reconnect_delay] @connect_headers = @parameters[:connect_headers] @parse_timeout = @parameters[:parse_timeout] @connect_timeout = @parameters[:connect_timeout] @logger = @parameters[:logger] || Stomp::NullLogger.new @autoflush = @parameters[:autoflush] @closed_check = @parameters[:closed_check] @hbser = @parameters[:hbser] @stompconn = @parameters[:stompconn] @usecrlf = @parameters[:usecrlf] @max_hbread_fails = @parameters[:max_hbread_fails] @max_hbrlck_fails = @parameters[:max_hbrlck_fails] @fast_hbs_adjust = @parameters[:fast_hbs_adjust] @connread_timeout = @parameters[:connread_timeout] @sslctx_newparm = @parameters[:sslctx_newparm] @ssl_post_conn_check = @parameters[:ssl_post_conn_check] @nto_cmd_read = @parameters[:nto_cmd_read] # # Try to support Ruby 1.9.x and 2.x ssl. unless defined?(RSpec) @parameters[:hosts].each do |ah| ah[:ssl] = Stomp::SSLParams.new if ah[:ssl] == true end end #sets the first host to connect change_host end
hashed_initialize
prepares a new connection with a Hash
of initialization parameters.
Source
# File lib/stomp/connection.rb, line 573 def hbrecv_count() return 0 unless @hbrecv_count @hbrecv_count end
hbrecv_count
returns the current connection’s heartbeat receive count.
Source
# File lib/stomp/connection.rb, line 561 def hbrecv_interval() return 0 unless @hbrecv_interval @hbrecv_interval / 1000.0 # ms end
hbrecv_interval
returns the connection’s heartbeat receive interval.
Source
# File lib/stomp/connection.rb, line 567 def hbsend_count() return 0 unless @hbsend_count @hbsend_count end
hbsend_count
returns the current connection’s heartbeat send count.
Source
# File lib/stomp/connection.rb, line 555 def hbsend_interval() return 0 unless @hbsend_interval @hbsend_interval / 1000.0 # ms end
hbsend_interval
returns the connection’s heartbeat send interval.
Source
# File lib/stomp/connection.rb, line 297 def nack(message_or_ack_id, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10 raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::MessageIDRequiredError if message_or_ack_id.nil? || message_or_ack_id == "" headers = headers.symbolize_keys case @protocol when Stomp::SPL_12 # The NACK frame MUST include an id header matching the ack header # of the MESSAGE being acknowledged. headers[:id] = message_or_ack_id else # Stomp::SPL_11 only # NACK has two REQUIRED headers: message-id, which MUST contain a value # matching the message-id for the MESSAGE being acknowledged and # subscription, which MUST be set to match the value of the subscription's # id header. headers[:'message-id'] = message_or_ack_id raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription] end _headerCheck(headers) slog(:on_nack, log_params, headers) transmit(Stomp::CMD_NACK, headers) end
STOMP 1.1+ NACK.
When the connection protocol level is 1.1 the message_or_ack_id parameter should match the message-id header of the MESSAGE being acknowledged.
When the connection protocol level is 1.2 the message_or_ack_id parameter should match the ack header of the MESSAGE being acknowledged.
Behavior is protocol level dependent, see the specifications and comments below.
Source
# File lib/stomp/connection.rb, line 215 def open? !@closed end
open? tests if this connection is open.
Source
# File lib/stomp/connection.rb, line 477 def poll() raise Stomp::Error::NoCurrentConnection if @closed_check && closed? # No need for a read lock here. The receive method eventually fulfills # that requirement. return nil if @socket.nil? || !@socket.ready? receive() end
poll returns a pending message if one is available, otherwise returns nil.
Source
# File lib/stomp/connection.rb, line 398 def publish(destination, message, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination headers[:destination] = destination _headerCheck(headers) slog(:on_publish, log_params, message, headers) transmit(Stomp::CMD_SEND, headers, message) end
Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
Source
# File lib/stomp/connection.rb, line 489 def receive() raise Stomp::Error::NoCurrentConnection if @closed_check && closed? super_result = __old_receive() if super_result.nil? && @reliable && !closed? errstr = "connection.receive returning EOF as nil - resetting connection.\n" @failure = nil unless slog(:on_miscerr, log_params, "es_recv: " + errstr) $stderr.print errstr end # !!! This initiates a re-connect !!! # The call to __old_receive() will in turn call socket(). Before # that we should change the target host, otherwise the host that # just failed may be attempted first. _reconn_prep() # super_result = __old_receive() end # if super_result.nil? && !@reliable @st.kill if @st # Kill ticker thread if any @rt.kill if @rt # Kill ticker thread if any close_socket() @closed = true warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test) end slog(:on_receive, log_params, super_result) return super_result end
receive returns the next Message
off of the wire. this can return nil in cases where:
-
the broker has closed the connection
-
the connection is not reliable
Source
# File lib/stomp/connection.rb, line 520 def set_logger(logger) @logger = logger end
set_logger
selects a new callback logger instance.
Source
# File lib/stomp/connection.rb, line 536 def sha1(data) Digest::SHA1.hexdigest(data) end
sha1 returns a SHA1 digest for arbitrary string data.
Source
# File lib/stomp/connection.rb, line 579 def slog(name, *parms) return false unless @logger @logger.send(name, *parms) if @logger.respond_to?(:"#{name}") @logger.respond_to?(:"#{name}") end
log call router
Source
# File lib/stomp/connection.rb, line 347 def subscribe(destination, headers = {}, subId = nil) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination headers[:destination] = destination if @protocol >= Stomp::SPL_11 raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?) headers[:id] = subId if headers[:id].nil? end _headerCheck(headers) slog(:on_subscribe, log_params, headers) ## p [ "subId", subId ] ## p [ "subscriptions", @subscriptions ] # Store the subscription so that we can replay if we reconnect. if @reliable subId = destination if subId.nil? raise Stomp::Error::DuplicateSubscription if @subscriptions[subId] @subscriptions[subId] = headers end transmit(Stomp::CMD_SUBSCRIBE, headers) end
Subscribe subscribes to a destination. A subscription name is required. For Stomp
1.1+ a session unique subscription ID is also required.
Source
# File lib/stomp/connection.rb, line 414 def unreceive(message, options = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options) # Lets make sure all keys are symbols message.headers = message.headers.symbolize_keys retry_count = message.headers[:retry_count].to_i || 0 message.headers[:retry_count] = retry_count + 1 transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}" message_id = message.headers.delete(:'message-id') # Prevent duplicate 'subscription' headers on subsequent receives message.headers.delete(:subscription) if message.headers[:subscription] begin self.begin transaction_id if client_ack?(message) || options[:force_client_ack] self.ack(message_id, :transaction => transaction_id) end if message.headers[:retry_count] <= options[:max_redeliveries] self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id)) else # Poison ack, sending the message to the DLQ self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true)) end self.commit transaction_id rescue Exception => exception self.abort transaction_id raise exception end end
Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).
Source
# File lib/stomp/connection.rb, line 375 def unsubscribe(destination, headers = {}, subId = nil) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::ProtocolErrorEmptyHeaderKey if headers.has_key?("") raise Stomp::Error::ProtocolErrorEmptyHeaderValue if @protocol == Stomp::SPL_10 && headers.has_value?("") headers = headers.symbolize_keys raise Stomp::Error::DestinationRequired unless destination headers[:destination] = destination if @protocol >= Stomp::SPL_11 raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?) headers[:id] = subId unless headers[:id] end _headerCheck(headers) slog(:on_unsubscribe, log_params, headers) transmit(Stomp::CMD_UNSUBSCRIBE, headers) if @reliable subId = destination if subId.nil? @subscriptions.delete(subId) end end
Unsubscribe from a destination. A subscription name is required. For Stomp
1.1+ a session unique subscription ID is also required.
Source
# File lib/stomp/connection.rb, line 541 def uuid() b = [] 0.upto(15) do |i| b << rand(255) end b[6] = (b[6] & 0x0F) | 0x40 b[8] = (b[8] & 0xbf) | 0x80 # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x", b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]) rs end
uuid returns a type 4 UUID.
Source
# File lib/stomp/connection.rb, line 525 def valid_utf8?(s) case RUBY_VERSION when /1\.8/ rv = _valid_utf8?(s) else rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding? end rv end
valid_utf8? returns an indicator if the given string is a valid UTF8 string.
Private Instance Methods
Source
# File lib/connection/utils.rb, line 244 def __old_receive() # The receive may fail so we may need to retry. while true begin used_socket = socket() connread = false noiosel = (@ssl || @jruby) ? true : false return _receive(used_socket, connread, noiosel) rescue Stomp::Error::MaxReconnectAttempts @failure = $! unless slog(:on_miscerr, log_params, "Reached MaxReconnectAttempts") $stderr.print "Reached MaxReconnectAttempts\n" end raise rescue @failure = $! raise unless @reliable errstr = "receive failed: #{$!}" unless slog(:on_miscerr, log_params, "es1_oldrecv: " + errstr) $stderr.print "\non_miscerr\n" $stderr.print log_params.inspect $stderr.print "\n" $stderr.print "es2_oldrecv: " + errstr $stderr.print "\n" end # !!! This initiates a re-connect !!! _reconn_prep() end end end
__old_receive
receives a frame, blocks until the frame is received.
Source
# File lib/connection/utf8.rb, line 265 def _decodeHeaders(h) dh = {} h.each_pair do |k,v| # Keys here are NOT! symbolized if v.is_a?(Array) kdec = Stomp::HeaderCodec::decode(k) dh[kdec] = [] v.each do |e| dh[kdec] << Stomp::HeaderCodec::decode(e) end else vs = v.to_s dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs) end end dh end
decode returns a Hash
of decoded headers per the Stomp
1.1 specification.
Source
# File lib/connection/netio.rb, line 606 def _dump_callstack() i = 0 caller.each do |c| p [ "csn", i, c ] i += 1 end end
used for debugging
Source
# File lib/connection/netio.rb, line 600 def _dump_ctx(ctx) p [ "dc01", ctx.inspect ] p [ "dc02ciphers", ctx.ciphers ] end
Used for debugging
Source
# File lib/connection/netio.rb, line 615 def _dump_threads() tl = Thread::list tl.each do |at| p [ "THDMPN", at ] end p [ "THDMPMain", @parameters[:client_main] ] end
used for debugging
Source
# File lib/connection/utf8.rb, line 245 def _encodeHeaders(h) eh = {} h.each_pair do |k,v| # Keys are symbolized ks = k.to_s if v.is_a?(Array) kenc = Stomp::HeaderCodec::encode(ks) eh[kenc] = [] v.each do |e| eh[kenc] << Stomp::HeaderCodec::encode(e) end else vs = v.to_s eh[Stomp::HeaderCodec::encode(ks).to_sym] = Stomp::HeaderCodec::encode(vs) end end eh end
encode returns a Hash
of encoded headers per the Stomp
1.1 specification.
Source
# File lib/connection/utils.rb, line 14 def _expand_hosts(hash) new_hash = hash.clone new_hash[:hosts_cloned] = hash[:hosts].clone new_hash[:hosts] = [] # hash[:hosts].each do |host_parms| ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM) next if ai.nil? || ai.size == 0 info6 = ai.detect {|info| info[4] == Socket::AF_INET6} info4 = ai.detect {|info| info[4] == Socket::AF_INET} if info6 new_hostp = host_parms.clone new_hostp[:host] = info6[3] new_hash[:hosts] << new_hostp end if info4 new_hostp = host_parms.clone new_hostp[:host] = info4[3] new_hash[:hosts] << new_hostp end end return new_hash end
Support multi-homed servers.
Source
# File lib/connection/utils.rb, line 216 def _hdup(h) ldup = {} ldup.merge!(h) ldup[:hosts] = [] hvals = h[:hosts].nil? ? h["hosts"] : h[:hosts] hvals.each do |hv| ldup[:hosts] << hv.dup end ldup end
Duplicate parameters hash
Source
# File lib/connection/utf8.rb, line 221 def _headerCheck(h) return if @protocol == Stomp::SPL_10 # Do nothing for this environment # h.each_pair do |k,v| # Keys here are symbolized ks = k.to_s ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks) # if v.is_a?(Array) v.each do |e| e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e) end else vs = v.to_s + "" # Values are usually Strings, but could be TrueClass or Symbol # The + "" above forces an 'unfreeze' if necessary vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs) end end end
Stomp
1.1+ header check for UTF8 validity. Raises Stomp::Error::UTF8ValidationError
if header data is not valid UTF8.
Source
# File lib/connection/heartbeats.rb, line 21 def _init_heartbeats() return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats. OK. # Init. # @cx = @cy = @sx = @sy = 0 # Variable names as in spec # @hbsend_interval = @hbrecv_interval = 0.0 # Send/Receive ticker interval. # @hbsend_count = @hbrecv_count = 0 # Send/Receive ticker counts. # @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f) # @st = @rt = nil # Send/receive ticker thread # Handle current client / server capabilities. # cfh = @connection_frame.headers.symbolize_keys return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats # Conect header parts. parts = @connect_headers[:"heart-beat"].split(",") @cx = parts[0].to_i @cy = parts[1].to_i # Connected frame header parts. parts = cfh[:"heart-beat"].split(",") @sx = parts[0].to_i @sy = parts[1].to_i # Catch odd situations like server has used => heart-beat:000,00000 return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0) # See if we are doing anything at all. @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first. # Check if sending is possible. @hbs = false if @cx == 0 || @sy == 0 # Reset if neither side wants # Check if receiving is possible. @hbr = false if @sx == 0 || @cy == 0 # Reset if neither side wants # Check if we should not do heartbeats at all return if (!@hbs && !@hbr) # If sending if @hbs sm = @cx >= @sy ? @cx : @sy # ticker interval, ms @hbsend_interval = 1000.0 * sm # ticker interval, μs @ls = Time.now.to_f # best guess at start _start_send_ticker() end # If receiving if @hbr rm = @sx >= @cy ? @sx : @cy # ticker interval, ms @hbrecv_interval = 1000.0 * rm # ticker interval, μs @lr = Time.now.to_f # best guess at start _start_receive_ticker() end end
Source
# File lib/connection/netio.rb, line 564 def _init_line_read(read_socket) line = '' if @protocol == Stomp::SPL_10 || (@protocol >= Stomp::SPL_11 && !@hbr) if @jruby # Handle JRuby specific behavior. #p [ "ilrjr00", _is_ready?(read_socket), RUBY_VERSION ] if RUBY_VERSION < "2" while true #p [ "ilrjr01A1", _is_ready?(read_socket) ] line = _interruptible_gets(read_socket) # Data from wire break unless line == "\n" line = '' end else # RUBY_VERSION >= "2" while _is_ready?(read_socket) #p [ "ilrjr01B2", _is_ready?(read_socket) ] line = _interruptible_gets(read_socket) # Data from wire break unless line == "\n" line = '' end end else line = _interruptible_gets(read_socket) # The old way end else # We are >= 1.1 *AND* receiving heartbeats. while true line = _interruptible_gets(read_socket) # Data from wire break unless line == "\n" line = '' @lr = Time.now.to_f end end line end
Source
# File lib/connection/netio.rb, line 171 def _is_ready?(s) rdy = s.ready? #p [ "isr?", rdy ] return rdy unless @jruby #p [ "jrdychk", rdy.class ] if rdy.class == NilClass # rdy = true rdy = false # A test else rdy = (rdy.class == Integer || rdy.class == TrueClass) ? true : false end #p [ "isr?_last", rdy ] rdy end
This is a total hack, to try and guess how JRuby will behave today.
Source
# File lib/connection/netio.rb, line 190 def _normalize_line_end(line) return line unless @usecrlf # p [ "nleln", line ] line_len = line.respond_to?(:bytesize) ? line.bytesize : line.length last2 = line[line_len-2...line_len] # p [ "nlel2", last2 ] return line unless last2 == "\r\n" return line[0...line_len-2] + "\n" end
Normalize line ends because 1.2+ brokers can send ‘mixed mode’ headers, i.e.:
-
Some headers end with ‘n’
-
Other headers end with ‘rn’
Source
# File lib/connection/utils.rb, line 85 def _post_connect() return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0 if @connection_frame.command == Stomp::CMD_ERROR @connection_frame.headers = _decodeHeaders(@connection_frame.headers) return end # We are CONNECTed cfh = @connection_frame.headers.symbolize_keys @protocol = cfh[:version] if @protocol # Should not happen, but check anyway raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol) else # CONNECTed to a 1.0 server that does not return *any* 1.1 type headers @protocol = Stomp::SPL_10 # reset return end # Heartbeats return unless @connect_headers[:"heart-beat"] _init_heartbeats() end
_post_connect
handles low level logic just after a physical connect.
Source
# File lib/connection/utils.rb, line 62 def _pre_connect() @connect_headers = @connect_headers.symbolize_keys raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host]) raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host]) return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0 # Try 1.1 or greater @hhas10 = false okvers = [] avers = @connect_headers[:"accept-version"].split(",") avers.each do |nver| if Stomp::SUPPORTED.index(nver) okvers << nver @hhas10 = true if nver == Stomp::SPL_10 end end raise Stomp::Error::UnsupportedProtocolError if okvers == [] @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server # Heartbeats - pre connect return unless @connect_headers[:"heart-beat"] _validate_hbheader() end
_pre_connect
handles low level logic just prior to a physical connect.
Source
# File lib/connection/netio.rb, line 25 def _receive(read_socket, connread = false, noiosel = false) # p [ "ioscheck", @iosto, connread, noiosel, @nto_cmd_read ] # _dump_callstack() drdbg = ENV['DRDBG'] ? true : false @read_semaphore.synchronize do p [ "_receive_lock", Thread::current() ] if drdbg line = nil # ===== # Read COMMAND (frame name) # ===== if connread begin Timeout::timeout(@connread_timeout, Stomp::Error::ConnectReadTimeout) do line = _init_line_read(read_socket) end rescue Stomp::Error::ConnectReadTimeout => ex if @reliable _reconn_prep() end raise ex end else p [ "_receive_COMMAND" ] if drdbg _dump_callstack() if drdbg line = _init_line_read(read_socket) end # p [ "_receive_nilcheck", line.nil? ] if drdbg return nil if line.nil? #An extra \n at the beginning of the frame, possibly not caught by is_ready? line = '' if line == "\n" if line == HAND_SHAKE_DATA raise Stomp::Error::HandShakeDetectedError end p [ "_receive_norm_lend", line, Time.now ] if drdbg line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12 # ===== # Read Headers (if any) # ===== # Reads the headers until it runs into a empty line p [ "_receive_start_headers", line, Time.now ] if drdbg message_header = '' begin message_header += line unless connread || noiosel || @nto_cmd_read raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto) end p [ "_receive_next_header", line, Time.now ] if drdbg line = _interruptible_gets(read_socket) p [ "_receive_normle_header", line ] if drdbg raise Stomp::Error::StompServerError if line.nil? line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12 end until line =~ /^\s?\n$/ p [ "_receive_end_headers" ] if drdbg # Checks if it includes content_length header content_length = message_header.match(/content-length\s?:\s?(\d+)\s?\n/) message_body = '' # ===== # Read message body (if any) # ===== p [ "_receive_start_body", content_length ] if drdbg # If content_length is present, read the specified amount of bytes if content_length unless connread || noiosel raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto) end p [ "_receive_have_content_length" ] if drdbg message_body = read_socket.read content_length[1].to_i unless connread || noiosel raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto) end raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0" # Else read the rest of the message until the first \0 else unless connread || noiosel raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto) end p [ "no_content_length" ] if drdbg message_body = read_socket.readline("\0") message_body.chop! end # ===== # If the buffer isn't empty, reads/drains trailing new lines. # # Note: experiments with JRuby seem to show that socket.ready? never # returns true. It appears that in cases where Ruby returns true # that JRuby returns an Integer. We attempt to adjust for this # in the _is_ready? method. # # Note 2: the draining of new lines must be done _after_ a message # is read. Do _not_ leave them on the wire and attempt to drain them # at the start of the next read. Attempting to do that breaks the # asynchronous nature of the 'poll' method. # ===== p [ "_receive_start_drain_loop", "isr", _is_ready?(read_socket) ] if drdbg while _is_ready?(read_socket) unless connread || noiosel raise Stomp::Error::ReceiveTimeout unless IO.select([read_socket], nil, nil, @iosto) end p [ "_receive_next_drain" ] if drdbg last_char = read_socket.getc break unless last_char if parse_char(last_char) != "\n" read_socket.ungetc(last_char) break end end # ===== # Complete receive processing # ===== p [ "_receive_hb_update" ] if drdbg if @protocol >= Stomp::SPL_11 @lr = Time.now.to_f if @hbr end # Adds the excluded \n and \0 and tries to create a new message with it p [ "_receive_new_message" ] if drdbg msg = Message.new(message_header + "\n" + message_body + "\0", @protocol >= Stomp::SPL_11) p [ "_receive_decode_headers", msg.command, msg.headers ] if drdbg # Check for a valid frame name from the server. p [ "_receive_frame_name_check", msg.command ] if drdbg unless SERVER_FRAMES[msg.command] sfex = Stomp::Error::ServerFrameNameError.new(msg.command) raise sfex end # # Always decode headers, even for 1.0. Issue #160. if msg.command != Stomp::CMD_CONNECTED msg.headers = _decodeHeaders(msg.headers) end p [ "_receive_ends", msg.command, msg.headers ] if drdbg p [ "_receive_UNlock", Thread::current() ] if drdbg msg end end
Really read from the wire.
Source
# File lib/connection/utils.rb, line 278 def _reconn_prep() close_socket() if @parameters change_host() end @st.kill if @st # Kill ticker thread if any @rt.kill if @rt # Kill ticker thread if any @socket = nil end
_reconn_prep
prepares for a reconnect retry
Source
# File lib/connection/heartbeats.rb, line 252 def _reconn_prep_hb() if @parameters change_host() end @socket = nil used_socket = socket() return used_socket end
_reconn_prep_hb
prepares for a reconnect retry
Source
# File lib/connection/heartbeats.rb, line 148 def _start_receive_ticker() sleeptime = @hbrecv_interval / 1000000.0 # Sleep time secs read_fail_count = 0 lock_fail_count = 0 fail_hard = false @rt = Thread.new { # while true do sleep sleeptime next unless @socket # nil under some circumstances ?? rdrdy = _is_ready?(@socket) curt = Time.now.to_f slog(:on_hbfire, log_params, "receive_fire", :curt => curt) # begin delta = curt - @lr if delta > sleeptime slog(:on_hbfire, log_params, "receive_heartbeat", {:delta => delta}) # Client code could be off doing something else (that is, no reading of # the socket has been requested by the caller). Try to handle that case. lock = @read_semaphore.try_lock if lock lock_fail_count = 0 # clear rdrdy = _is_ready?(@socket) if rdrdy read_fail_count = 0 # clear last_char = @socket.getc if last_char.nil? # EOF from broker? fail_hard = true else @lr = Time.now.to_f plc = parse_char(last_char) if plc == "\n" # Server Heartbeat @hbrecv_count += 1 @hb_received = true # Reset if necessary else @socket.ungetc(last_char) end end @read_semaphore.unlock # Release read lock else # Socket is not ready @read_semaphore.unlock # Release read lock @hb_received = false read_fail_count += 1 slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime, "read_fail_count" => read_fail_count, "lock_fail" => false, "lock_fail_count" => lock_fail_count, "fail_point" => "not_ready"}) end else # try_lock failed # Shrug. Could not get lock. Client must be actually be reading. @hb_received = false # But notify caller if possible lock_fail_count += 1 slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime, "read_fail_count" => read_fail_count, "lock_fail" => true, "lock_fail_count" => lock_fail_count, "fail_point" => "try_lock_fail"}) end # of the try_lock else # delta <= sleeptime @hb_received = true # Reset if necessary read_fail_count = 0 # reset lock_fail_count = 0 # reset end # of the if delta > sleeptime rescue Exception => recvex slog(:on_hbread_fail, log_params, {"ticker_interval" => sleeptime, "exception" => recvex, "read_fail_count" => read_fail_count, "lock_fail_count" => lock_fail_count, "fail_point" => "receive_exception"}) fail_hard = true end # Do we want to attempt a retry? if @reliable # Retry on hard fail or max read fails if fail_hard || (@max_hbread_fails > 0 && read_fail_count >= @max_hbread_fails) # This is an attempt at a connection retry. @st.kill if @st # Kill the sender thread if one exists _reconn_prep_hb() # Drive reconnection logic Thread.exit # This receiver thread is done end # Retry on max lock fails. Different logic in order to avoid a deadlock. if (@max_hbrlck_fails > 0 && lock_fail_count >= @max_hbrlck_fails) # This is an attempt at a connection retry. @gets_semaphore.synchronize do @getst.raise(Errno::EBADF.new) if @getst # kill the socket reading thread if exists @socket.close rescue nil # Attempt a forced close end @st.kill if @st # Kill the sender thread if one exists Thread.exit # This receiver thread is done end end Thread.pass # Prior to next receive loop # end # of the "while true" } # end of the Thread.new end
_start_receive_ticker
starts a thread that receives heartbeats when required.
Source
# File lib/connection/heartbeats.rb, line 90 def _start_send_ticker() sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs reconn = false adjust = 0.0 @st = Thread.new { first_time = true while true do # slt = sleeptime - adjust - @fast_hbs_adjust sleep(slt) next unless @socket # nil under some circumstances ?? curt = Time.now.to_f slog(:on_hbfire, log_params, "send_fire", :curt => curt, :last_sleep => slt) delta = curt - @ls # Be tolerant (minus), and always do this the first time through. # Reintroduce logic removed in d922fa. compval = (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0 if delta > compval || first_time first_time = false slog(:on_hbfire, log_params, "send_heartbeat", {:last_sleep => slt, :curt => curt, :last_send => @ls, :delta => delta, :compval => compval}) # Send a heartbeat @transmit_semaphore.synchronize do begin @socket.puts @socket.flush # Do not buffer heartbeats @ls = Time.now.to_f # Update last send @hb_sent = true # Reset if necessary @hbsend_count += 1 rescue Exception => sendex @hb_sent = false # Set the warning flag slog(:on_hbwrite_fail, log_params, {"ticker_interval" => sleeptime, "exception" => sendex}) if @hbser raise # Re-raise if user requested this, otherwise ignore end if @reliable reconn = true break # exit the synchronize do end end end # of the synchronize if reconn # Attempt a fail over reconnect. This is 'safe' here because # this thread no longer holds the @transmit_semaphore lock. @rt.kill if @rt # Kill the receiver thread if one exists _reconn_prep_hb() # Drive reconnection logic Thread.exit # This sender thread is done end end adjust = Time.now.to_f - curt Thread.pass end } end
_start_send_ticker
starts a thread to send heartbeats when required.
Source
# File lib/connection/netio.rb, line 226 def _transmit(used_socket, command, headers = {}, body = '') dtrdbg = ENV['DTRDBG'] ? true : false # p [ "wirewrite" ] # _dump_callstack() p [ "_transmit_headers_in1", headers ] if dtrdbg if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT headers = _encodeHeaders(headers) p [ "_transmit_headers_in2", headers ] if dtrdbg end @transmit_semaphore.synchronize do p [ "_transmit_lock", Thread::current() ] if dtrdbg # Handle nil body body = '' if body.nil? # The content-length should be expressed in bytes. # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters # With Unicode strings, # of bytes != # of characters. So, use String#bytesize when available. body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length # ActiveMQ interprets every message as a BinaryMessage # if content_length header is included. # Using :suppress_content_length => true will suppress this behaviour # and ActiveMQ will interpret the message as a TextMessage. # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/ # Lets send this header in the message, so it can maintain state when using unreceive headers[:'content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length] headers[:'content-type'] = "text/plain; charset=UTF-8" unless headers[:'content-type'] || headers[:suppress_content_type] p [ "_transmit_command", command ] if dtrdbg _wire_write(used_socket,command) p [ "_transmit_headers", headers ] if dtrdbg headers.each do |k,v| if v.is_a?(Array) v.each do |e| _wire_write(used_socket,"#{k}:#{e}") end else _wire_write(used_socket,"#{k}:#{v}") end end p [ "_transmit_headers done" ] if dtrdbg _wire_write(used_socket,"") if body != '' p [ "_transmit_body", body ] if dtrdbg if headers[:suppress_content_length] if tz = body.index("\00") used_socket.write body[0..tz-1] else used_socket.write body end else used_socket.write body end end used_socket.write "\0" used_socket.flush if @autoflush # used_socket.flush if @protocol >= Stomp::SPL_11 @ls = Time.now.to_f if @hbs end p [ "_transmit_UNlock", Thread::current() ] if dtrdbg end end
_transmit is the real wire write logic.
Source
# File lib/connection/utf8.rb, line 19 def _valid_utf8?(string) case RUBY_VERSION when /1\.8\.[56]/ bytes = [] 0.upto(string.length-1) {|i| bytes << string[i] } else bytes = string.bytes end # valid = true index = -1 state = "start" # bytes.each do |next_byte| index += 1 case state # State: 'start' # The 'start' state: # * handles all occurrences of valid single byte characters i.e., the ASCII character set # * provides state transition logic for start bytes of valid characters with 2-4 bytes # * signals a validation failure for all other single bytes # when "start" # puts "state: start" if DEBUG case next_byte # ASCII # * Input = 0x00-0x7F : change state to START when (0x00..0x7f) # puts "state: start 1" if DEBUG state = "start" # Start byte of two byte characters # * Input = 0xC2-0xDF: change state to A when (0xc2..0xdf) # puts "state: start 2" if DEBUG state = "a" # Start byte of some three byte characters # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B when (0xe1..0xec) # puts "state: start 3" if DEBUG state = "b" when (0xee..0xef) # puts "state: start 4" if DEBUG state = "b" # Start byte of special three byte characters # * Input = 0xE0: change state to C when 0xe0 # puts "state: start 5" if DEBUG state = "c" # Start byte of the remaining three byte characters # * Input = 0xED: change state to D when 0xed # puts "state: start 6" if DEBUG state = "d" # Start byte of some four byte characters # * Input = 0xF1-0xF3:change state to E when (0xf1..0xf3) # puts "state: start 7" if DEBUG state = "e" # Start byte of special four byte characters # * Input = 0xF0: change state to F when 0xf0 # puts "state: start 8" if DEBUG state = "f" # Start byte of very special four byte characters # * Input = 0xF4: change state to G when 0xf4 # puts "state: start 9" if DEBUG state = "g" # All other single characters are invalid # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR else valid = false break end # of the inner case, the 'start' state # The last continuation byte of a 2, 3, or 4 byte character # State: 'a' # o Input = 0x80-0xBF: change state to START # o Others: ERROR when "a" # puts "state: a" if DEBUG if (0x80..0xbf) === next_byte state = "start" else valid = false break end # The first continuation byte for most 3 byte characters # (those with start bytes in: 0xe1-0xec or 0xee-0xef) # State: 'b' # o Input = 0x80-0xBF: change state to A # o Others: ERROR when "b" # puts "state: b" if DEBUG if (0x80..0xbf) === next_byte state = "a" else valid = false break end # The first continuation byte for some special 3 byte characters # (those with start byte 0xe0) # State: 'c' # o Input = 0xA0-0xBF: change state to A # o Others: ERROR when "c" # puts "state: c" if DEBUG if (0xa0..0xbf) === next_byte state = "a" else valid = false break end # The first continuation byte for the remaining 3 byte characters # (those with start byte 0xed) # State: 'd' # o Input = 0x80-0x9F: change state to A # o Others: ERROR when "d" # puts "state: d" if DEBUG if (0x80..0x9f) === next_byte state = "a" else valid = false break end # The first continuation byte for some 4 byte characters # (those with start bytes in: 0xf1-0xf3) # State: 'e' # o Input = 0x80-0xBF: change state to B # o Others: ERROR when "e" # puts "state: e" if DEBUG if (0x80..0xbf) === next_byte state = "b" else valid = false break end # The first continuation byte for some special 4 byte characters # (those with start byte 0xf0) # State: 'f' # o Input = 0x90-0xBF: change state to B # o Others: ERROR when "f" # puts "state: f" if DEBUG if (0x90..0xbf) === next_byte state = "b" else valid = false break end # The first continuation byte for the remaining 4 byte characters # (those with start byte 0xf4) # State: 'g' # o Input = 0x80-0x8F: change state to B # o Others: ERROR when "g" # puts "state: g" if DEBUG if (0x80..0x8f) === next_byte state = "b" else valid = false break end # else raise RuntimeError, "state: default" end end # # puts "State at end: #{state}" if DEBUG # Catch truncation at end of string if valid and state != 'start' # puts "Resetting valid value" if DEBUG valid = false end # valid end
Ref: unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String
CONSIDER replacing this with a dependency on the utf8_validator gem. This code has been copied from there.
Source
# File lib/connection/heartbeats.rb, line 13 def _validate_hbheader() return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats. OK. parts = @connect_headers[:"heart-beat"].split(",") if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s) raise Stomp::Error::InvalidHeartBeatHeaderError end end
Source
# File lib/connection/netio.rb, line 290 def _wire_write(sock, data) # p [ "debug_01", @protocol, @usecrlf ] dwrdbg = ENV['DWRDBG'] ? true : false if @protocol >= Stomp::SPL_12 && @usecrlf wiredata = "#{data}#{Stomp::CR}#{Stomp::LF}" # p [ "wiredataout_01:", wiredata ] sock.write(wiredata) else p [ "_wire_write_begin:", "#{data}" ] if dwrdbg if @jruby && @ssl p [ "_wire_write_jrbeg:" ] if dwrdbg # Same results for all of these write methods. # sock.puts data # sock.print "#{data}\n" # sock.syswrite "#{data}\n" sock.write "#{data}\n" p [ "_wire_write_jrend:" ] if dwrdbg else sock.puts data end p [ "_wire_write_end:" ] if dwrdbg end end
Use CRLF if protocol is >= 1.2, and the client requested CRLF
Source
# File lib/connection/utils.rb, line 201 def change_host @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize] # Set first as master and send it to the end of array current_host = @parameters[:hosts].shift @parameters[:hosts] << current_host @ssl = current_host[:ssl] @host = current_host[:host] @port = current_host[:port] || Connection::default_port(@ssl) @login = current_host[:login] || "" @passcode = current_host[:passcode] || "" end
change_host
selects the next host for retries.
Source
# File lib/connection/netio.rb, line 503 def close_socket() begin # Need to set @closed = true before closing the socket # within the @read_semaphore thread @closed = true @read_semaphore.synchronize do @socket.close end rescue #Ignoring if already closed end @closed end
close_socket
closes the current open socket, and hence the connection.
Source
# File lib/connection/netio.rb, line 541 def connect(used_socket) @connect_headers = {} unless @connect_headers # Caller said nil/false headers = @connect_headers.clone headers[:login] = @login unless @login.to_s.empty? headers[:passcode] = @passcode unless @login.to_s.empty? _pre_connect if !@hhas10 && @stompconn _transmit(used_socket, Stomp::CMD_STOMP, headers) else _transmit(used_socket, Stomp::CMD_CONNECT, headers) end connread = true noiosel = false @connection_frame = _receive(used_socket, connread, noiosel) _post_connect @disconnect_receipt = nil @session = @connection_frame.headers["session"] if @connection_frame # replay any subscriptions. @subscriptions.each {|k,v| _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v) } end
connect performs a basic STOMP CONNECT operation.
Source
# File lib/connection/utils.rb, line 235 def increase_reconnect_delay @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay] @reconnect_delay end
increase_reconnect_delay
increases the reconnect delay for the next connection attempt.
Source
# File lib/connection/utils.rb, line 44 def log_params() lparms = @parameters.clone if @parameters lparms = {} unless lparms lparms[:cur_host] = @host lparms[:cur_port] = @port lparms[:cur_login] = @login lparms[:cur_passcode] = @passcode lparms[:cur_ssl] = @ssl lparms[:cur_recondelay] = @reconnect_delay lparms[:cur_parseto] = @parse_timeout lparms[:cur_conattempts] = @connection_attempts lparms[:cur_failure] = @failure # To assist in debugging lparms[:openstat] = open? # lparms end
Create parameters for any callback logger.
Source
# File lib/connection/utils.rb, line 229 def max_reconnect_attempts? !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts] end
max_reconnect_attempts? returns nil or the number of maximum reconnect attempts.
Source
# File lib/connection/netio.rb, line 518 def open_socket() used_socket = @ssl ? open_ssl_socket : open_tcp_socket # try to close the old connection if any close_socket @closed = false if @parameters # nil in some rspec tests unless @reconnect_delay @reconnect_delay = @parameters[:initial_reconnect_delay] || iosto1 end end # Use keepalive used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) # TCP_NODELAY option (disables Nagle's algorithm) used_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, !!(@parameters && @parameters[:tcp_nodelay])) @iosto = @parse_timeout ? @parse_timeout.to_f : 0.0 used_socket end
open_socket
opens a TCP or SSL soclet as required.
Source
# File lib/connection/netio.rb, line 328 def open_ssl_socket() require 'openssl' unless defined?(OpenSSL) ossdbg = ENV['OSSDBG'] ? true : false begin # Any raised SSL exceptions ctx = @sslctx_newparm ? OpenSSL::SSL::SSLContext.new(@sslctx_newparm) : OpenSSL::SSL::SSLContext.new ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # Assume for now # # Note: if a client uses :ssl => true this would result in the gem using # the _default_ Ruby ciphers list. This is _known_ to fail in later # Ruby releases. The gem now detects :ssl => true, and replaces that # with: # * :ssl => Stomp::SSLParams.new # # The above results in the use of Stomp default parameters. # # To specifically request Stomp default parameters, use: # * :ssl => Stomp::SSLParams.new(..., :ciphers => Stomp::DEFAULT_CIPHERS) # # If connecting with an SSLParams instance, and the _default_ Ruby # ciphers list is actually required, use: # * :ssl => Stomp::SSLParams.new(..., :use_ruby_ciphers => true) # # If a custom ciphers list is required, connect with: # * :ssl => Stomp::SSLParams.new(..., :ciphers => custom_ciphers_list) # if @ssl != true # # Here @ssl is: # * an instance of Stomp::SSLParams # Control would not be here if @ssl == false or @ssl.nil?. # # Back reference the SSLContext @ssl.ctx = ctx # Server authentication parameters if required if @ssl.ts_files ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER truststores = OpenSSL::X509::Store.new fl = @ssl.ts_files.split(",") fl.each do |fn| # Add next cert file listed raise Stomp::Error::SSLNoTruststoreFileError if !File::exists?(fn) raise Stomp::Error::SSLUnreadableTruststoreFileError if !File::readable?(fn) truststores.add_file(fn) end ctx.cert_store = truststores end # p [ "OSSL50", "old code starts" ] if ossdbg usecert = nil usekey = nil # Client authentication # If cert exists as a file, then it should not be input as text raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? && !@ssl.cert_text.nil? # If cert exists as file, then key must exist, either as text or file raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? && @ssl.key_file.nil? && @ssl.key_text.nil? if @ssl.cert_file raise Stomp::Error::SSLNoCertFileError if !File::exists?(@ssl.cert_file) raise Stomp::Error::SSLUnreadableCertFileError if !File::readable?(@ssl.cert_file) p [ "OSSL51", "old code cert file read" ] if ossdbg usecert = OpenSSL::X509::Certificate.new(File.read(@ssl.cert_file)) end # If cert exists as file, then key must exist, either as text or file raise Stomp::Error::SSLClientParamsError if !@ssl.cert_text.nil? && @ssl.key_file.nil? && @ssl.key_text.nil? if @ssl.cert_text p [ "OSSL52", "old code cert text get" ] if ossdbg usecert = OpenSSL::X509::Certificate.new(@ssl.cert_text) end # If key exists as a text, then it should not be input as file raise Stomp::Error::SSLClientParamsError if !@ssl.key_text.nil? && !@ssl.key_file.nil? if @ssl.key_file raise Stomp::Error::SSLNoKeyFileError if !File::exists?(@ssl.key_file) raise Stomp::Error::SSLUnreadableKeyFileError if !File::readable?(@ssl.key_file) p [ "OSSL53", "old code key file read" ] if ossdbg usekey = OpenSSL::PKey::RSA.new(File.read(@ssl.key_file), @ssl.key_password) end if @ssl.key_text nt = @ssl.key_text.gsub(/\t/, "") p [ "OSSL54", "old code key text get" ] if ossdbg usekey = OpenSSL::PKey::RSA.new(nt, @ssl.key_password) end # # This style of code because: in newer Ruby versions the 'cert' # and 'key' attributes are deprecated. It is suggested that the # 'add_certificate' method be used instead. # if ctx.respond_to?(:add_certificate) # Newer Ruby version ?? p [ "OSSL55", "new code option", usecert, usekey ] if ossdbg if !usecert.nil? && !usekey.nil? p [ "OSSL55", "new code add_certificate" ] if ossdbg ctx.add_certificate(usecert, usekey) else p [ "OSSL56", "new code SKIP add_certificate" ] if ossdbg end else # Older Ruby versions p [ "OSSL56", "old code option", usecert, usekey ] if ossdbg ctx.cert = usecert ctx.key = usekey end p [ "OSSL99", "old code ends" ] if ossdbg # Cipher list # As of this writing, there are numerous problems with supplying # cipher lists to jruby. So we do not attempt to do that here. if !@ssl.use_ruby_ciphers # No Ruby ciphers (the default) if @ssl.ciphers # User ciphers list? ctx.ciphers = @ssl.ciphers # Accept user supplied ciphers else ctx.ciphers = Stomp::DEFAULT_CIPHERS # Just use Stomp defaults end end unless @jruby # Set SSLContext Options if user asks for it in Stomp::SSLParams # and SSL supports it. if @ssl.ssl_ctxopts && ctx.respond_to?(:options=) ctx.options = @ssl.ssl_ctxopts end end # ssl = nil slog(:on_ssl_connecting, log_params) # _dump_ctx(ctx) Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do tcp_socket = TCPSocket.open(@host, @port) ssl = OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx) ssl.hostname = @host if ssl.respond_to? :hostname= ssl.sync_close = true # Sync ssl close with underlying TCP socket ssl.connect if (ssl.context.verify_mode != OpenSSL::SSL::VERIFY_NONE) && @ssl_post_conn_check ssl.post_connection_check(@host) end end def ssl.ready? @ssl_ready_lock ||= Mutex.new @ssl_ready_lock.synchronize do ! @rbuffer.empty? || @io.ready? end end if @ssl != true # Pass back results if possible if RUBY_VERSION =~ /1\.8\.[56]/ @ssl.verify_result = "N/A for Ruby #{RUBY_VERSION}" else @ssl.verify_result = ssl.verify_result end @ssl.peer_cert = ssl.peer_cert end slog(:on_ssl_connected, log_params) ssl rescue Exception => ex lp = log_params.clone lp[:ssl_exception] = ex slog(:on_ssl_connectfail, lp) if ssl # shut down the TCP socket - we just failed to do the SSL handshake in time ssl.close end # puts ex.backtrace if ossdbg $stdout.flush if ossdbg raise # Reraise end end
open_ssl_socket
opens an SSL socket.
Source
# File lib/connection/netio.rb, line 315 def open_tcp_socket() ## $stderr.print("h: #{@host}, p: #{@port}\n") tcp_socket = nil slog(:on_connecting, log_params) Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do tcp_socket = TCPSocket.open(@host, @port) end tcp_socket end
open_tcp_socket
opens a TCP socket.
Source
# File lib/connection/utils.rb, line 39 def parse_char(char) RUBY_VERSION > '1.9' ? char : char.chr end
Handle 1.9+ character representation.
Source
# File lib/connection/utils.rb, line 163 def refine_params(params) params = params.uncamelize_and_symbolize_keys default_params = { :connect_headers => {}, :reliable => true, # Failover parameters :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :connect_timeout => 0, # Parse Timeout :parse_timeout => 5, :dmh => false, # Closed check logic :closed_check => true, :hbser => false, :stompconn => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, :fast_hbs_adjust => 0.0, :connread_timeout => 0, :tcp_nodelay => true, :start_timeout => 0, :sslctx_newparm => nil, :ssl_post_conn_check => true, } res_params = default_params.merge(params) if res_params[:dmh] res_params = _expand_hosts(res_params) end return res_params end
refine_params
sets up defaults for a Hash
initialize.
Source
# File lib/connection/utils.rb, line 107 def socket() @socket_semaphore.synchronize do used_socket = @socket used_socket = nil if closed? while used_socket.nil? || !@failure.nil? @failure = nil begin used_socket = open_socket() # sets @closed = false if OK # Open is complete connect(used_socket) slog(:on_connected, log_params) @connection_attempts = 0 rescue @failure = $! used_socket = nil @closed = true raise unless @reliable raise if @failure.is_a?(Stomp::Error::LoggerConnectionError) # Catch errors which are: # a) emitted from corrupted 1.1+ 'connect' (caller programming error) # b) should never be retried raise if @failure.is_a?(Stomp::Error::ProtocolError11p) begin unless slog(:on_connectfail,log_params) $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n" end rescue Exception => aex raise if aex.is_a?(Stomp::Error::LoggerConnectionError) end if max_reconnect_attempts? $stderr.print "In socket() Reached MaxReconnectAttempts" ### _dump_threads() mt = @parameters[:client_main] if !mt.nil? mt.raise Stomp::Error::MaxReconnectAttempts Thread::exit end raise Stomp::Error::MaxReconnectAttempts end sleep(@reconnect_delay) @connection_attempts += 1 if @parameters change_host() increase_reconnect_delay() end end end @socket = used_socket end end
socket creates and returns a new socket for use by the connection.
Source
# File lib/connection/netio.rb, line 201 def transmit(command, headers = {}, body = '') # p [ "XMIT01", command, headers ] # The transmit may fail so we may need to retry. while true begin used_socket = socket() _transmit(used_socket, command, headers, body) return rescue Stomp::Error::MaxReconnectAttempts => e _ = e raise rescue @failure = $! raise unless @reliable errstr = "transmit to #{@host} failed: #{$!}\n" unless slog(:on_miscerr, log_params, "es_trans: " + errstr) $stderr.print errstr end # !!! This loop initiates a re-connect !!! _reconn_prep() end end end
transmit logically puts a Message
on the wire.