class Mysql::Protocol

MySQL network protocol

Constants

MAX_PACKET_LENGTH
VERSION

Attributes

affected_rows[R]
charset[RW]
client_flags[R]
get_server_public_key[R]
insert_id[R]
message[R]
server_info[R]
server_status[R]
server_version[R]
sqlstate[R]
thread_id[R]
warning_count[R]

Public Class Methods

net2value(pkt, type, unsigned) click to toggle source

Convert netdata to Ruby value @param data [Packet] packet data @param type [Integer] field type @param unsigned [true or false] true if value is unsigned @return [Object] converted value.

# File lib/mysql/protocol.rb, line 22
def self.net2value(pkt, type, unsigned)
  case type
  when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_NEWDECIMAL, Field::TYPE_BLOB, Field::TYPE_JSON
    return pkt.lcs
  when Field::TYPE_TINY
    v = pkt.utiny
    return unsigned ? v : v < 128 ? v : v-256
  when Field::TYPE_SHORT
    v = pkt.ushort
    return unsigned ? v : v < 32768 ? v : v-65536
  when Field::TYPE_INT24, Field::TYPE_LONG
    v = pkt.ulong
    return unsigned ? v : v < 0x8000_0000 ? v : v-0x10000_0000
  when Field::TYPE_LONGLONG
    n1, n2 = pkt.ulong, pkt.ulong
    v = (n2 << 32) | n1
    return unsigned ? v : v < 0x8000_0000_0000_0000 ? v : v-0x10000_0000_0000_0000
  when Field::TYPE_FLOAT
    return pkt.read(4).unpack('e').first
  when Field::TYPE_DOUBLE
    return pkt.read(8).unpack('E').first
  when Field::TYPE_DATE
    len = pkt.utiny
    y, m, d = pkt.read(len).unpack("vCC")
    t = Time.new(y, m, d) rescue nil
    return t
  when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP
    len = pkt.utiny
    y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV")
    return Time.new(y, m, d, h, mi, Rational((s.to_i*1000000+sp.to_i)/1000000)) rescue nil
  when Field::TYPE_TIME
    len = pkt.utiny
    sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV")
    r = d.to_i*86400 + h.to_i*3600 + mi.to_i*60 + s.to_i + sp.to_f/1000000
    r *= -1 if sign != 0
    return r
  when Field::TYPE_YEAR
    return pkt.ushort
  when Field::TYPE_BIT
    return pkt.lcs
  else
    raise "not implemented: type=#{type}"
  end
end
new(opts) click to toggle source

make socket connection to server. @param opts [Hash] @option :host [String] hostname mysqld running @option :username [String] username to connect to mysqld @option :password [String] password to connect to mysqld @option :database [String] initial database name @option :port [String] port number (used if host is not ‘localhost’ or nil) @option :socket [String] socket filename (used if host is ‘localhost’ or nil) @option :flags [Integer] connection flag. Mysql::CLIENT_* ORed @option :charset [Mysql::Charset] character set @option :connect_timeout [Numeric, nil] @option :read_timeout [Numeric, nil] @option :write_timeout [Numeric, nil] @option :local_infile [Boolean] @option :load_data_local_dir [String] @option :ssl_mode [Integer] @option :get_server_public_key [Boolean] @raise [ClientError] connection timeout

# File lib/mysql/protocol.rb, line 142
def initialize(opts)
  @opts = opts
  @charset = Mysql::Charset.by_name("utf8mb4")
  @insert_id = 0
  @warning_count = 0
  @gc_stmt_queue = []   # stmt id list which GC destroy.
  set_state :INIT
  @get_server_public_key = @opts[:get_server_public_key]
  begin
    if @opts[:host].nil? or @opts[:host].empty? or @opts[:host] == "localhost"
      socket = @opts[:socket] || ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT
      @socket = Socket.unix(socket)
    else
      port = @opts[:port] || ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql","tcp") rescue MYSQL_TCP_PORT)
      @socket = Socket.tcp(@opts[:host], port, connect_timeout: @opts[:connect_timeout])
    end
  rescue Errno::ETIMEDOUT
    raise ClientError, "connection timeout"
  end
end
value2net(v) click to toggle source

convert Ruby value to netdata @param v [Object] Ruby value. @return [Integer] type of column. Field::TYPE_* @return [String] netdata @raise [ProtocolError] value too large / value is not supported

# File lib/mysql/protocol.rb, line 72
def self.value2net(v)
  case v
  when nil
    type = Field::TYPE_NULL
    val = ""
  when Integer
    if -0x8000_0000 <= v && v < 0x8000_0000
      type = Field::TYPE_LONG
      val = [v].pack('V')
    elsif -0x8000_0000_0000_0000 <= v && v < 0x8000_0000_0000_0000
      type = Field::TYPE_LONGLONG
      val = [v&0xffffffff, v>>32].pack("VV")
    elsif 0x8000_0000_0000_0000 <= v && v <= 0xffff_ffff_ffff_ffff
      type = Field::TYPE_LONGLONG | 0x8000
      val = [v&0xffffffff, v>>32].pack("VV")
    else
      raise ProtocolError, "value too large: #{v}"
    end
  when Float
    type = Field::TYPE_DOUBLE
    val = [v].pack("E")
  when String
    type = Field::TYPE_STRING
    val = Packet.lcs(v)
  when Time
    type = Field::TYPE_DATETIME
    val = [11, v.year, v.month, v.day, v.hour, v.min, v.sec, v.usec].pack("CvCCCCCV")
  else
    raise ProtocolError, "class #{v.class} is not supported"
  end
  return type, val
end

Public Instance Methods

authenticate() click to toggle source

initial negotiate and authenticate. @param charset [Mysql::Charset, nil] charset for connection. nil: use server’s charset @raise [ProtocolError] The old style password is not supported

# File lib/mysql/protocol.rb, line 170
def authenticate
  check_state :INIT
  reset
  init_packet = InitialPacket.parse read
  @server_info = init_packet.server_version
  @server_version = init_packet.server_version.split(/\D/)[0,3].inject{|a,b|a.to_i*100+b.to_i}
  @server_capabilities = init_packet.server_capabilities
  @thread_id = init_packet.thread_id
  @client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH
  @client_flags |= CLIENT_LOCAL_FILES if @opts[:local_infile] || @opts[:load_data_local_dir]
  @client_flags |= CLIENT_CONNECT_WITH_DB if @opts[:database]
  @client_flags |= @opts[:flags]
  if @opts[:charset]
    @charset = @opts[:charset].is_a?(Charset) ? @opts[:charset] : Charset.by_name(@opts[:charset])
  else
    @charset = Charset.by_number(init_packet.server_charset)
    @charset.encoding       # raise error if unsupported charset
  end
  enable_ssl
  Authenticator.new(self).authenticate(@opts[:username], @opts[:password].to_s, @opts[:database], init_packet.scramble_buff, init_packet.auth_plugin)
  set_state :READY
end
check_state(st) click to toggle source
# File lib/mysql/protocol.rb, line 413
def check_state(st)
  raise 'command out of sync' unless @state == st
end
close() click to toggle source
# File lib/mysql/protocol.rb, line 163
def close
  @socket.close
end
enable_ssl() click to toggle source
# File lib/mysql/protocol.rb, line 193
def enable_ssl
  case @opts[:ssl_mode]
  when SSL_MODE_DISABLED, '1', 'disabled'
    return
  when SSL_MODE_PREFERRED, '2', 'preferred'
    return if @socket.local_address.unix?
    return if @server_capabilities & CLIENT_SSL == 0
  when SSL_MODE_REQUIRED, '3', 'required'
    if @server_capabilities & CLIENT_SSL == 0
      raise ClientError::SslConnectionError, "SSL is required but the server doesn't support it"
    end
  else
    raise ClientError, "ssl_mode #{@opts[:ssl_mode]} is not supported"
  end
  begin
    @client_flags |= CLIENT_SSL
    write Protocol::TlsAuthenticationPacket.serialize(@client_flags, 1024**3, @charset.number)
    @socket = OpenSSL::SSL::SSLSocket.new(@socket)
    @socket.sync_close = true
    @socket.connect
  rescue => e
    @client_flags &= ~CLIENT_SSL
    return if @opts[:ssl_mode] == SSL_MODE_PREFERRED
    raise e
  end
end
gc_stmt(stmt_id) click to toggle source
# File lib/mysql/protocol.rb, line 409
def gc_stmt(stmt_id)
  @gc_stmt_queue.push stmt_id
end
get_result() click to toggle source

get result of query. @return [integer, nil] number of fields of results. nil if no results.

# File lib/mysql/protocol.rb, line 246
def get_result
  begin
    res_packet = ResultPacket.parse read
    if res_packet.field_count.to_i > 0  # result data exists
      set_state :FIELD
      return res_packet.field_count
    end
    if res_packet.field_count.nil?      # LOAD DATA LOCAL INFILE
      send_local_file(res_packet.message)
    end
    @affected_rows, @insert_id, @server_status, @warning_count, @message =
      res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet.message
    set_state :READY
    return nil
  rescue
    set_state :READY
    raise
  end
end
kill_command(pid) click to toggle source

Kill command

# File lib/mysql/protocol.rb, line 320
def kill_command(pid)
  simple_command [COM_PROCESS_KILL, pid].pack("CV")
end
ping_command() click to toggle source

Ping command

# File lib/mysql/protocol.rb, line 315
def ping_command
  simple_command [COM_PING].pack("C")
end
query_command(query) click to toggle source

Query command @param query [String] query string @return [Integer, nil] number of fields of results. nil if no results.

# File lib/mysql/protocol.rb, line 232
def query_command(query)
  check_state :READY
  begin
    reset
    write [COM_QUERY, @charset.convert(query)].pack("Ca*")
    get_result
  rescue
    set_state :READY
    raise
  end
end
quit_command() click to toggle source

Quit command

# File lib/mysql/protocol.rb, line 221
def quit_command
  synchronize do
    reset
    write [COM_QUIT].pack("C")
    close
  end
end
read() click to toggle source

Read one packet data @return [Packet] packet data @rails [ProtocolError] invalid packet sequence number

# File lib/mysql/protocol.rb, line 449
def read
  data = ''
  len = nil
  begin
    header = read_timeout(4, @opts[:read_timeout])
    raise EOFError unless header && header.length == 4
    len1, len2, seq = header.unpack("CvC")
    len = (len2 << 8) + len1
    raise ProtocolError, "invalid packet: sequence number mismatch(#{seq} != #{@seq}(expected))" if @seq != seq
    @seq = (@seq + 1) % 256
    ret = read_timeout(len, @opts[:read_timeout])
    raise EOFError unless ret && ret.length == len
    data.concat ret
  rescue EOFError
    raise ClientError::ServerGoneError, 'MySQL server has gone away'
  rescue Errno::ETIMEDOUT
    raise ClientError, "read timeout"
  end while len == MAX_PACKET_LENGTH

  @sqlstate = "00000"

  # Error packet
  if data[0] == ?\xff
    _, errno, marker, @sqlstate, message = data.unpack("Cvaa5a*")
    unless marker == "#"
      _, errno, message = data.unpack("Cva*")    # Version 4.0 Error
      @sqlstate = ""
    end
    message.force_encoding(@charset.encoding)
    if Mysql::ServerError::ERROR_MAP.key? errno
      raise Mysql::ServerError::ERROR_MAP[errno].new(message, @sqlstate)
    end
    raise Mysql::ServerError.new(message, @sqlstate, errno)
  end
  Packet.new(data)
end
read_eof_packet() click to toggle source

Read EOF packet @raise [ProtocolError] packet is not EOF

# File lib/mysql/protocol.rb, line 554
def read_eof_packet
  raise ProtocolError, "packet is not EOF" unless read.eof?
end
read_timeout(len, timeout) click to toggle source
# File lib/mysql/protocol.rb, line 486
def read_timeout(len, timeout)
  return @socket.read(len) if timeout.nil? || timeout == 0
  result = ''
  e = Time.now + timeout
  while result.size < len
    now = Time.now
    raise Errno::ETIMEDOUT if now > e
    r = @socket.read_nonblock(len - result.size, exception: false)
    case r
    when :wait_readable
      IO.select([@socket], nil, nil, e - now)
      next
    when :wait_writable
      IO.select(nil, [@socket], nil, e - now)
      next
    else
      result << r
    end
  end
  return result
end
refresh_command(op) click to toggle source

Refresh command

# File lib/mysql/protocol.rb, line 325
def refresh_command(op)
  simple_command [COM_REFRESH, op].pack("CC")
end
reset() click to toggle source

Reset sequence number

# File lib/mysql/protocol.rb, line 442
def reset
  @seq = 0    # packet counter. reset by each command
end
retr_all_records(fields) click to toggle source

Retrieve all records for simple query @param fields [Array<Mysql::Field>] number of fields @return [Array<Array<String>>] all records

# File lib/mysql/protocol.rb, line 298
def retr_all_records(fields)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push RawRecord.new(pkt, fields, enc)
    end
    pkt.read(3)
    @server_status = pkt.utiny
    all_recs
  ensure
    set_state :READY
  end
end
retr_fields(n) click to toggle source

Retrieve n fields @param n [Integer] number of fields @return [Array<Mysql::Field>] field list

# File lib/mysql/protocol.rb, line 282
def retr_fields(n)
  check_state :FIELD
  begin
    fields = n.times.map{Field.new FieldPacket.parse(read)}
    read_eof_packet
    set_state :RESULT
    fields
  rescue
    set_state :READY
    raise
  end
end
send_local_file(filename) click to toggle source

send local file to server

# File lib/mysql/protocol.rb, line 267
def send_local_file(filename)
  filename = File.absolute_path(filename)
  if @opts[:local_infile] || @opts[:load_data_local_dir] && filename.start_with?(@opts[:load_data_local_dir])
    File.open(filename){|f| write f}
  else
    raise ClientError::LoadDataLocalInfileRejected, 'LOAD DATA LOCAL INFILE file request rejected due to restrictions on access.'
  end
ensure
  write nil # EOF mark
  read
end
set_option_command(opt) click to toggle source

Set option command

# File lib/mysql/protocol.rb, line 330
def set_option_command(opt)
  simple_command [COM_SET_OPTION, opt].pack("Cv")
end
set_state(st) click to toggle source
# File lib/mysql/protocol.rb, line 417
def set_state(st)
  @state = st
  if st == :READY && !@gc_stmt_queue.empty?
    gc_disabled = GC.disable
    begin
      while st = @gc_stmt_queue.shift
        reset
        write [COM_STMT_CLOSE, st].pack("CV")
      end
    ensure
      GC.enable unless gc_disabled
    end
  end
end
shutdown_command(level) click to toggle source

Shutdown command

# File lib/mysql/protocol.rb, line 335
def shutdown_command(level)
  simple_command [COM_SHUTDOWN, level].pack("CC")
end
simple_command(packet) click to toggle source

Send simple command

@param packet
String

packet data

@return [String] received data

# File lib/mysql/protocol.rb, line 561
def simple_command(packet)
  synchronize do
    reset
    write packet
    read.to_s
  end
end
statistics_command() click to toggle source

Statistics command

# File lib/mysql/protocol.rb, line 340
def statistics_command
  simple_command [COM_STATISTICS].pack("C")
end
stmt_close_command(stmt_id) click to toggle source

Stmt close command @param stmt_id [Integer] statement id

# File lib/mysql/protocol.rb, line 402
def stmt_close_command(stmt_id)
  synchronize do
    reset
    write [COM_STMT_CLOSE, stmt_id].pack("CV")
  end
end
stmt_execute_command(stmt_id, values) click to toggle source

Stmt execute command @param stmt_id [Integer] statement id @param values [Array] parameters @return [Integer] number of fields

# File lib/mysql/protocol.rb, line 370
def stmt_execute_command(stmt_id, values)
  check_state :READY
  begin
    reset
    write ExecutePacket.serialize(stmt_id, Mysql::Stmt::CURSOR_TYPE_NO_CURSOR, values)
    get_result
  rescue
    set_state :READY
    raise
  end
end
stmt_prepare_command(stmt) click to toggle source

Stmt prepare command @param stmt [String] prepared statement @return [Array<Integer, Integer, Array<Field>>] statement id, number of parameters, field list

# File lib/mysql/protocol.rb, line 347
def stmt_prepare_command(stmt)
  synchronize do
    reset
    write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*")
    res_packet = PrepareResultPacket.parse read
    if res_packet.param_count > 0
      res_packet.param_count.times{read}    # skip parameter packet
      read_eof_packet
    end
    if res_packet.field_count > 0
      fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)}
      read_eof_packet
    else
      fields = []
    end
    return res_packet.statement_id, res_packet.param_count, fields
  end
end
stmt_retr_all_records(fields, charset) click to toggle source

Retrieve all records for prepared statement @param fields [Array of Mysql::Fields] field list @param charset [Mysql::Charset] @return [Array<Array<Object>>] all records

# File lib/mysql/protocol.rb, line 386
def stmt_retr_all_records(fields, charset)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push StmtRawRecord.new(pkt, fields, enc)
    end
    all_recs
  ensure
    set_state :READY
  end
end
synchronize() { || ... } click to toggle source
# File lib/mysql/protocol.rb, line 432
def synchronize
  begin
    check_state :READY
    return yield
  ensure
    set_state :READY
  end
end
write(data) click to toggle source

Write one packet data @param data [String, IO, nil] packet data. If data is nil, write empty packet.

# File lib/mysql/protocol.rb, line 510
def write(data)
  begin
    @socket.sync = false
    if data.nil?
      write_timeout([0, 0, @seq].pack("CvC"), @opts[:write_timeout])
      @seq = (@seq + 1) % 256
    else
      data = StringIO.new data if data.is_a? String
      while d = data.read(MAX_PACKET_LENGTH)
        write_timeout([d.length%256, d.length/256, @seq].pack("CvC")+d, @opts[:write_timeout])
        @seq = (@seq + 1) % 256
      end
    end
    @socket.sync = true
    @socket.flush
  rescue Errno::EPIPE
    raise ClientError::ServerGoneError, 'MySQL server has gone away'
  rescue Errno::ETIMEDOUT
    raise ClientError, "write timeout"
  end
end
write_timeout(data, timeout) click to toggle source
# File lib/mysql/protocol.rb, line 532
def write_timeout(data, timeout)
  return @socket.write(data) if timeout.nil? || timeout == 0
  len = 0
  e = Time.now + timeout
  while len < data.size
    now = Time.now
    raise Errno::ETIMEDOUT if now > e
    l = @socket.write_nonblock(data[len..-1], exception: false)
    case l
    when :wait_readable
      IO.select([@socket], nil, nil, e - now)
    when :wait_writable
      IO.select(nil, [@socket], nil, e - now)
    else
      len += l
    end
  end
  return len
end