class Mysql::Protocol
MySQL network protocol
Constants
- MAX_PACKET_LENGTH
- VERSION
Attributes
Public Class Methods
Convert netdata to Ruby value @param data [Packet] packet data @param type [Integer] field type @param unsigned [true or false] true if value is unsigned @return [Object] converted value.
# File lib/mysql/protocol.rb, line 22 def self.net2value(pkt, type, unsigned) case type when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_NEWDECIMAL, Field::TYPE_BLOB, Field::TYPE_JSON return pkt.lcs when Field::TYPE_TINY v = pkt.utiny return unsigned ? v : v < 128 ? v : v-256 when Field::TYPE_SHORT v = pkt.ushort return unsigned ? v : v < 32768 ? v : v-65536 when Field::TYPE_INT24, Field::TYPE_LONG v = pkt.ulong return unsigned ? v : v < 0x8000_0000 ? v : v-0x10000_0000 when Field::TYPE_LONGLONG n1, n2 = pkt.ulong, pkt.ulong v = (n2 << 32) | n1 return unsigned ? v : v < 0x8000_0000_0000_0000 ? v : v-0x10000_0000_0000_0000 when Field::TYPE_FLOAT return pkt.read(4).unpack('e').first when Field::TYPE_DOUBLE return pkt.read(8).unpack('E').first when Field::TYPE_DATE len = pkt.utiny y, m, d = pkt.read(len).unpack("vCC") t = Time.new(y, m, d) rescue nil return t when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP len = pkt.utiny y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV") return Time.new(y, m, d, h, mi, Rational((s.to_i*1000000+sp.to_i)/1000000)) rescue nil when Field::TYPE_TIME len = pkt.utiny sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV") r = d.to_i*86400 + h.to_i*3600 + mi.to_i*60 + s.to_i + sp.to_f/1000000 r *= -1 if sign != 0 return r when Field::TYPE_YEAR return pkt.ushort when Field::TYPE_BIT return pkt.lcs else raise "not implemented: type=#{type}" end end
make socket connection to server. @param opts [Hash] @option :host [String] hostname mysqld running @option :username [String] username to connect to mysqld @option :password [String] password to connect to mysqld @option :database [String] initial database name @option :port [String] port number (used if host is not ‘localhost’ or nil) @option :socket [String] socket filename (used if host is ‘localhost’ or nil) @option :flags [Integer] connection flag. Mysql::CLIENT_* ORed @option :charset [Mysql::Charset] character set @option :connect_timeout [Numeric, nil] @option :read_timeout [Numeric, nil] @option :write_timeout [Numeric, nil] @option :local_infile [Boolean] @option :load_data_local_dir [String] @option :ssl_mode [Integer] @option :get_server_public_key [Boolean] @raise [ClientError] connection timeout
# File lib/mysql/protocol.rb, line 142 def initialize(opts) @opts = opts @charset = Mysql::Charset.by_name("utf8mb4") @insert_id = 0 @warning_count = 0 @gc_stmt_queue = [] # stmt id list which GC destroy. set_state :INIT @get_server_public_key = @opts[:get_server_public_key] begin if @opts[:host].nil? or @opts[:host].empty? or @opts[:host] == "localhost" socket = @opts[:socket] || ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT @socket = Socket.unix(socket) else port = @opts[:port] || ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql","tcp") rescue MYSQL_TCP_PORT) @socket = Socket.tcp(@opts[:host], port, connect_timeout: @opts[:connect_timeout]) end rescue Errno::ETIMEDOUT raise ClientError, "connection timeout" end end
convert Ruby value to netdata @param v [Object] Ruby value. @return [Integer] type of column. Field::TYPE_* @return [String] netdata @raise [ProtocolError] value too large / value is not supported
# File lib/mysql/protocol.rb, line 72 def self.value2net(v) case v when nil type = Field::TYPE_NULL val = "" when Integer if -0x8000_0000 <= v && v < 0x8000_0000 type = Field::TYPE_LONG val = [v].pack('V') elsif -0x8000_0000_0000_0000 <= v && v < 0x8000_0000_0000_0000 type = Field::TYPE_LONGLONG val = [v&0xffffffff, v>>32].pack("VV") elsif 0x8000_0000_0000_0000 <= v && v <= 0xffff_ffff_ffff_ffff type = Field::TYPE_LONGLONG | 0x8000 val = [v&0xffffffff, v>>32].pack("VV") else raise ProtocolError, "value too large: #{v}" end when Float type = Field::TYPE_DOUBLE val = [v].pack("E") when String type = Field::TYPE_STRING val = Packet.lcs(v) when Time type = Field::TYPE_DATETIME val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, v.usec].pack("CvCCCCCV") else raise ProtocolError, "class #{v.class} is not supported" end return type, val end
Public Instance Methods
initial negotiate and authenticate. @param charset [Mysql::Charset, nil] charset for connection. nil: use server’s charset @raise [ProtocolError] The old style password is not supported
# File lib/mysql/protocol.rb, line 170 def authenticate check_state :INIT reset init_packet = InitialPacket.parse read @server_info = init_packet.server_version @server_version = init_packet.server_version.split(/\D/)[0,3].inject{|a,b|a.to_i*100+b.to_i} @server_capabilities = init_packet.server_capabilities @thread_id = init_packet.thread_id @client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH @client_flags |= CLIENT_LOCAL_FILES if @opts[:local_infile] || @opts[:load_data_local_dir] @client_flags |= CLIENT_CONNECT_WITH_DB if @opts[:database] @client_flags |= @opts[:flags] if @opts[:charset] @charset = @opts[:charset].is_a?(Charset) ? @opts[:charset] : Charset.by_name(@opts[:charset]) else @charset = Charset.by_number(init_packet.server_charset) @charset.encoding # raise error if unsupported charset end enable_ssl Authenticator.new(self).authenticate(@opts[:username], @opts[:password].to_s, @opts[:database], init_packet.scramble_buff, init_packet.auth_plugin) set_state :READY end
# File lib/mysql/protocol.rb, line 413 def check_state(st) raise 'command out of sync' unless @state == st end
# File lib/mysql/protocol.rb, line 163 def close @socket.close end
# File lib/mysql/protocol.rb, line 193 def enable_ssl case @opts[:ssl_mode] when SSL_MODE_DISABLED, '1', 'disabled' return when SSL_MODE_PREFERRED, '2', 'preferred' return if @socket.local_address.unix? return if @server_capabilities & CLIENT_SSL == 0 when SSL_MODE_REQUIRED, '3', 'required' if @server_capabilities & CLIENT_SSL == 0 raise ClientError::SslConnectionError, "SSL is required but the server doesn't support it" end else raise ClientError, "ssl_mode #{@opts[:ssl_mode]} is not supported" end begin @client_flags |= CLIENT_SSL write Protocol::TlsAuthenticationPacket.serialize(@client_flags, 1024**3, @charset.number) @socket = OpenSSL::SSL::SSLSocket.new(@socket) @socket.sync_close = true @socket.connect rescue => e @client_flags &= ~CLIENT_SSL return if @opts[:ssl_mode] == SSL_MODE_PREFERRED raise e end end
# File lib/mysql/protocol.rb, line 409 def gc_stmt(stmt_id) @gc_stmt_queue.push stmt_id end
get result of query. @return [integer, nil] number of fields of results. nil if no results.
# File lib/mysql/protocol.rb, line 246 def get_result begin res_packet = ResultPacket.parse read if res_packet.field_count.to_i > 0 # result data exists set_state :FIELD return res_packet.field_count end if res_packet.field_count.nil? # LOAD DATA LOCAL INFILE send_local_file(res_packet.message) end @affected_rows, @insert_id, @server_status, @warning_count, @message = res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet.message set_state :READY return nil rescue set_state :READY raise end end
Kill command
# File lib/mysql/protocol.rb, line 320 def kill_command(pid) simple_command [COM_PROCESS_KILL, pid].pack("CV") end
Ping command
# File lib/mysql/protocol.rb, line 315 def ping_command simple_command [COM_PING].pack("C") end
Query command @param query [String] query string @return [Integer, nil] number of fields of results. nil if no results.
# File lib/mysql/protocol.rb, line 232 def query_command(query) check_state :READY begin reset write [COM_QUERY, @charset.convert(query)].pack("Ca*") get_result rescue set_state :READY raise end end
Quit command
# File lib/mysql/protocol.rb, line 221 def quit_command synchronize do reset write [COM_QUIT].pack("C") close end end
Read one packet data @return [Packet] packet data @rails [ProtocolError] invalid packet sequence number
# File lib/mysql/protocol.rb, line 449 def read data = '' len = nil begin header = read_timeout(4, @opts[:read_timeout]) raise EOFError unless header && header.length == 4 len1, len2, seq = header.unpack("CvC") len = (len2 << 8) + len1 raise ProtocolError, "invalid packet: sequence number mismatch(#{seq} != #{@seq}(expected))" if @seq != seq @seq = (@seq + 1) % 256 ret = read_timeout(len, @opts[:read_timeout]) raise EOFError unless ret && ret.length == len data.concat ret rescue EOFError raise ClientError::ServerGoneError, 'MySQL server has gone away' rescue Errno::ETIMEDOUT raise ClientError, "read timeout" end while len == MAX_PACKET_LENGTH @sqlstate = "00000" # Error packet if data[0] == ?\xff _, errno, marker, @sqlstate, message = data.unpack("Cvaa5a*") unless marker == "#" _, errno, message = data.unpack("Cva*") # Version 4.0 Error @sqlstate = "" end message.force_encoding(@charset.encoding) if Mysql::ServerError::ERROR_MAP.key? errno raise Mysql::ServerError::ERROR_MAP[errno].new(message, @sqlstate) end raise Mysql::ServerError.new(message, @sqlstate, errno) end Packet.new(data) end
Read EOF packet @raise [ProtocolError] packet is not EOF
# File lib/mysql/protocol.rb, line 554 def read_eof_packet raise ProtocolError, "packet is not EOF" unless read.eof? end
# File lib/mysql/protocol.rb, line 486 def read_timeout(len, timeout) return @socket.read(len) if timeout.nil? || timeout == 0 result = '' e = Time.now + timeout while result.size < len now = Time.now raise Errno::ETIMEDOUT if now > e r = @socket.read_nonblock(len - result.size, exception: false) case r when :wait_readable IO.select([@socket], nil, nil, e - now) next when :wait_writable IO.select(nil, [@socket], nil, e - now) next else result << r end end return result end
Refresh command
# File lib/mysql/protocol.rb, line 325 def refresh_command(op) simple_command [COM_REFRESH, op].pack("CC") end
Reset sequence number
# File lib/mysql/protocol.rb, line 442 def reset @seq = 0 # packet counter. reset by each command end
Retrieve all records for simple query @param fields [Array<Mysql::Field>] number of fields @return [Array<Array<String>>] all records
# File lib/mysql/protocol.rb, line 298 def retr_all_records(fields) check_state :RESULT enc = charset.encoding begin all_recs = [] until (pkt = read).eof? all_recs.push RawRecord.new(pkt, fields, enc) end pkt.read(3) @server_status = pkt.utiny all_recs ensure set_state :READY end end
Retrieve n fields @param n [Integer] number of fields @return [Array<Mysql::Field>] field list
# File lib/mysql/protocol.rb, line 282 def retr_fields(n) check_state :FIELD begin fields = n.times.map{Field.new FieldPacket.parse(read)} read_eof_packet set_state :RESULT fields rescue set_state :READY raise end end
send local file to server
# File lib/mysql/protocol.rb, line 267 def send_local_file(filename) filename = File.absolute_path(filename) if @opts[:local_infile] || @opts[:load_data_local_dir] && filename.start_with?(@opts[:load_data_local_dir]) File.open(filename){|f| write f} else raise ClientError::LoadDataLocalInfileRejected, 'LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.' end ensure write nil # EOF mark read end
Set option command
# File lib/mysql/protocol.rb, line 330 def set_option_command(opt) simple_command [COM_SET_OPTION, opt].pack("Cv") end
# File lib/mysql/protocol.rb, line 417 def set_state(st) @state = st if st == :READY && !@gc_stmt_queue.empty? gc_disabled = GC.disable begin while st = @gc_stmt_queue.shift reset write [COM_STMT_CLOSE, st].pack("CV") end ensure GC.enable unless gc_disabled end end end
Shutdown command
# File lib/mysql/protocol.rb, line 335 def shutdown_command(level) simple_command [COM_SHUTDOWN, level].pack("CC") end
Send simple command
- @param packet
- String
-
packet data
@return [String] received data
# File lib/mysql/protocol.rb, line 561 def simple_command(packet) synchronize do reset write packet read.to_s end end
Statistics command
# File lib/mysql/protocol.rb, line 340 def statistics_command simple_command [COM_STATISTICS].pack("C") end
Stmt
close command @param stmt_id [Integer] statement id
# File lib/mysql/protocol.rb, line 402 def stmt_close_command(stmt_id) synchronize do reset write [COM_STMT_CLOSE, stmt_id].pack("CV") end end
Stmt
execute command @param stmt_id [Integer] statement id @param values [Array] parameters @return [Integer] number of fields
# File lib/mysql/protocol.rb, line 370 def stmt_execute_command(stmt_id, values) check_state :READY begin reset write ExecutePacket.serialize(stmt_id, Mysql::Stmt::CURSOR_TYPE_NO_CURSOR, values) get_result rescue set_state :READY raise end end
Stmt
prepare command @param stmt [String] prepared statement @return [Array<Integer, Integer, Array<Field>>] statement id, number of parameters, field list
# File lib/mysql/protocol.rb, line 347 def stmt_prepare_command(stmt) synchronize do reset write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*") res_packet = PrepareResultPacket.parse read if res_packet.param_count > 0 res_packet.param_count.times{read} # skip parameter packet read_eof_packet end if res_packet.field_count > 0 fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)} read_eof_packet else fields = [] end return res_packet.statement_id, res_packet.param_count, fields end end
Retrieve all records for prepared statement @param fields [Array of Mysql::Fields] field list @param charset [Mysql::Charset] @return [Array<Array<Object>>] all records
# File lib/mysql/protocol.rb, line 386 def stmt_retr_all_records(fields, charset) check_state :RESULT enc = charset.encoding begin all_recs = [] until (pkt = read).eof? all_recs.push StmtRawRecord.new(pkt, fields, enc) end all_recs ensure set_state :READY end end
# File lib/mysql/protocol.rb, line 432 def synchronize begin check_state :READY return yield ensure set_state :READY end end
Write one packet data @param data [String, IO, nil] packet data. If data is nil, write empty packet.
# File lib/mysql/protocol.rb, line 510 def write(data) begin @socket.sync = false if data.nil? write_timeout([0, 0, @seq].pack("CvC"), @opts[:write_timeout]) @seq = (@seq + 1) % 256 else data = StringIO.new data if data.is_a? String while d = data.read(MAX_PACKET_LENGTH) write_timeout([d.length%256, d.length/256, @seq].pack("CvC")+d, @opts[:write_timeout]) @seq = (@seq + 1) % 256 end end @socket.sync = true @socket.flush rescue Errno::EPIPE raise ClientError::ServerGoneError, 'MySQL server has gone away' rescue Errno::ETIMEDOUT raise ClientError, "write timeout" end end
# File lib/mysql/protocol.rb, line 532 def write_timeout(data, timeout) return @socket.write(data) if timeout.nil? || timeout == 0 len = 0 e = Time.now + timeout while len < data.size now = Time.now raise Errno::ETIMEDOUT if now > e l = @socket.write_nonblock(data[len..-1], exception: false) case l when :wait_readable IO.select([@socket], nil, nil, e - now) when :wait_writable IO.select(nil, [@socket], nil, e - now) else len += l end end return len end