class MysqlPR::Protocol

MySQL network protocol

Constants

MAX_PACKET_LENGTH
VERSION

Attributes

affected_rows[R]
charset[RW]
insert_id[R]
message[R]
server_info[R]
server_status[R]
server_version[R]
sqlstate[R]
thread_id[R]
warning_count[R]

Public Class Methods

net2value(pkt, type, unsigned) click to toggle source

Convert netdata to Ruby value

Argument

data
Packet

packet data

type
Integer

field type

unsigned
true or false

true if value is unsigned

Return

Object

converted value.

# File lib/mysql-pr/protocol.rb, line 23
def self.net2value(pkt, type, unsigned)
  case type
  when Field::TYPE_STRING, Field::TYPE_VAR_STRING, Field::TYPE_NEWDECIMAL, Field::TYPE_BLOB
    return pkt.lcs
  when Field::TYPE_TINY
    v = pkt.utiny
    return unsigned ? v : v < 128 ? v : v-256
  when Field::TYPE_SHORT
    v = pkt.ushort
    return unsigned ? v : v < 32768 ? v : v-65536
  when Field::TYPE_INT24, Field::TYPE_LONG
    v = pkt.ulong
    return unsigned ? v : v < 2**32/2 ? v : v-2**32
  when Field::TYPE_LONGLONG
    n1, n2 = pkt.ulong, pkt.ulong
    v = (n2 << 32) | n1
    return unsigned ? v : v < 2**64/2 ? v : v-2**64
  when Field::TYPE_FLOAT
    return pkt.read(4).unpack('e').first
  when Field::TYPE_DOUBLE
    return pkt.read(8).unpack('E').first
  when Field::TYPE_DATE
    len = pkt.utiny
    y, m, d = pkt.read(len).unpack("vCC")
    t = MysqlPR::Time.new(y, m, d, nil, nil, nil)
    return t
  when Field::TYPE_DATETIME, Field::TYPE_TIMESTAMP
    len = pkt.utiny
    y, m, d, h, mi, s, sp = pkt.read(len).unpack("vCCCCCV")
    return MysqlPR::Time.new(y, m, d, h, mi, s, false, sp)
  when Field::TYPE_TIME
    len = pkt.utiny
    sign, d, h, mi, s, sp = pkt.read(len).unpack("CVCCCV")
    h = d.to_i * 24 + h.to_i
    return MysqlPR::Time.new(0, 0, 0, h, mi, s, sign!=0, sp)
  when Field::TYPE_YEAR
    return pkt.ushort
  when Field::TYPE_BIT
    return pkt.lcs
  else
    raise "not implemented: type=#{type}"
  end
end
new(host, port, socket, conn_timeout, read_timeout, write_timeout) click to toggle source

make socket connection to server.

Argument

host
String

if “localhost” or “” nil then use UNIXSocket. Otherwise use TCPSocket

port
Integer

port number using by TCPSocket

socket
String

socket file name using by UNIXSocket

conn_timeout
Integer

connect timeout (sec).

read_timeout
Integer

read timeout (sec).

write_timeout
Integer

write timeout (sec).

Exception

ClientError

connection timeout

# File lib/mysql-pr/protocol.rb, line 156
def initialize(host, port, socket, conn_timeout, read_timeout, write_timeout)
  @insert_id = 0
  @warning_count = 0
  @gc_stmt_queue = []   # stmt id list which GC destroy.
  set_state :INIT
  @read_timeout = read_timeout
  @write_timeout = write_timeout
  begin
    Timeout.timeout conn_timeout do
      if host.nil? or host.empty? or host == "localhost"
        socket ||= ENV["MYSQL_UNIX_PORT"] || MYSQL_UNIX_PORT
        @sock = UNIXSocket.new socket
      else
        port ||= ENV["MYSQL_TCP_PORT"] || (Socket.getservbyname("mysql","tcp") rescue MYSQL_TCP_PORT)
        @sock = TCPSocket.new host, port
      end
    end
  rescue Timeout::Error
    raise ClientError, "connection timeout"
  end
end
value2net(v) click to toggle source

convert Ruby value to netdata

Argument

v
Object

Ruby value.

Return

Integer

type of column. Field::TYPE_*

String

netdata

Exception

ProtocolError

value too large / value is not supported

# File lib/mysql-pr/protocol.rb, line 75
def self.value2net(v)
  case v
  when nil
    type = Field::TYPE_NULL
    val = ""
  when Integer
    if v >= 0
      if v < 256
        type = Field::TYPE_TINY | 0x8000
        val = [v].pack("C")
      elsif v < 256**2
        type = Field::TYPE_SHORT | 0x8000
        val = [v].pack("v")
      elsif v < 256**4
        type = Field::TYPE_LONG | 0x8000
        val = [v].pack("V")
      elsif v < 256**8
        type = Field::TYPE_LONGLONG | 0x8000
        val = [v&0xffffffff, v>>32].pack("VV")
      else
        raise ProtocolError, "value too large: #{v}"
      end
    else
      if -v <= 256/2
        type = Field::TYPE_TINY
        val = [v].pack("C")
      elsif -v <= 256**2/2
        type = Field::TYPE_SHORT
        val = [v].pack("v")
      elsif -v <= 256**4/2
        type = Field::TYPE_LONG
        val = [v].pack("V")
      elsif -v <= 256**8/2
        type = Field::TYPE_LONGLONG
        val = [v&0xffffffff, v>>32].pack("VV")
      else
        raise ProtocolError, "value too large: #{v}"
      end
    end
  when Float
    type = Field::TYPE_DOUBLE
    val = [v].pack("E")
  when String
    type = Field::TYPE_STRING
    val = Packet.lcs(v)
  when MysqlPR::Time, ::Time
    type = Field::TYPE_DATETIME
    val = [7, v.year, v.month, v.day, v.hour, v.min, v.sec].pack("CvCCCCC")
  else
    raise ProtocolError, "class #{v.class} is not supported"
  end
  return type, val
end

Public Instance Methods

authenticate(user, passwd, db, flag, charset) click to toggle source

initial negotiate and authenticate.

Argument

user
String / nil

username

passwd
String / nil

password

db
String / nil

default database name. nil: no default.

flag
Integer

client flag

charset
MysqlPR::Charset / nil

charset for connection. nil: use server’s charset

Exception

ProtocolError

The old style password is not supported

# File lib/mysql-pr/protocol.rb, line 191
def authenticate(user, passwd, db, flag, charset)
  check_state :INIT
  @authinfo = [user, passwd, db, flag, charset]
  reset
  init_packet = InitialPacket.parse read
  @server_info = init_packet.server_version
  @server_version = init_packet.server_version.split(/\D/)[0,3].inject{|a,b|a.to_i*100+b.to_i}
  @thread_id = init_packet.thread_id
  client_flags = CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION
  client_flags |= CLIENT_CONNECT_WITH_DB if db
  client_flags |= flag
  @charset = charset
  unless @charset
    @charset = Charset.by_number(init_packet.server_charset)
    @charset.encoding       # raise error if unsupported charset
  end
  netpw = encrypt_password passwd, init_packet.scramble_buff
  write AuthenticationPacket.serialize(client_flags, 1024**3, @charset.number, user, netpw, db)
  raise ProtocolError, 'The old style password is not supported' if read.to_s == "\xfe"
  set_state :READY
end
close() click to toggle source
# File lib/mysql-pr/protocol.rb, line 178
def close
  @sock.close
end
field_list_command(table, field) click to toggle source

Field list command

Argument

table
String

table name.

field
String / nil

field name that may contain wild card.

Return

Array of Field

field list

# File lib/mysql-pr/protocol.rb, line 310
def field_list_command(table, field)
  synchronize do
    reset
    write [COM_FIELD_LIST, table, 0, field].pack("Ca*Ca*")
    fields = []
    until (data = read).eof?
      fields.push Field.new(FieldPacket.parse(data))
    end
    return fields
  end
end
gc_stmt(stmt_id) click to toggle source
# File lib/mysql-pr/protocol.rb, line 445
def gc_stmt(stmt_id)
  @gc_stmt_queue.push stmt_id
end
get_result() click to toggle source

get result of query.

Return

integer / nil

number of fields of results. nil if no results.

# File lib/mysql-pr/protocol.rb, line 242
def get_result
  begin
    res_packet = ResultPacket.parse read
    if res_packet.field_count.to_i > 0  # result data exists
      set_state :FIELD
      return res_packet.field_count
    end
    if res_packet.field_count.nil?      # LOAD DATA LOCAL INFILE
      filename = res_packet.message
      File.open(filename){|f| write f}
      write nil  # EOF mark
      read
    end
    @affected_rows, @insert_id, @server_status, @warning_count, @message =
      res_packet.affected_rows, res_packet.insert_id, res_packet.server_status, res_packet.warning_count, res_packet.message
    set_state :READY
    return nil
  rescue
    set_state :READY
    raise
  end
end
kill_command(pid) click to toggle source

Kill command

# File lib/mysql-pr/protocol.rb, line 347
def kill_command(pid)
  simple_command [COM_PROCESS_KILL, pid].pack("CV")
end
ping_command() click to toggle source

Ping command

# File lib/mysql-pr/protocol.rb, line 342
def ping_command
  simple_command [COM_PING].pack("C")
end
process_info_command() click to toggle source

Process info command

Return

Array of Field

field list

# File lib/mysql-pr/protocol.rb, line 325
def process_info_command
  check_state :READY
  begin
    reset
    write [COM_PROCESS_INFO].pack("C")
    field_count = read.lcb
    fields = field_count.times.map{Field.new FieldPacket.parse(read)}
    read_eof_packet
    set_state :RESULT
    return fields
  rescue
    set_state :READY
    raise
  end
end
query_command(query) click to toggle source

Query command

Argument

query
String

query string

Return

Integer / nil

number of fields of results. nil if no results.

# File lib/mysql-pr/protocol.rb, line 227
def query_command(query)
  check_state :READY
  begin
    reset
    write [COM_QUERY, @charset.convert(query)].pack("Ca*")
    get_result
  rescue
    set_state :READY
    raise
  end
end
quit_command() click to toggle source

Quit command

# File lib/mysql-pr/protocol.rb, line 214
def quit_command
  synchronize do
    reset
    write [COM_QUIT].pack("C")
    close
  end
end
refresh_command(op) click to toggle source

Refresh command

# File lib/mysql-pr/protocol.rb, line 352
def refresh_command(op)
  simple_command [COM_REFRESH, op].pack("CC")
end
retr_all_records(nfields) click to toggle source

Retrieve all records for simple query

Argument

nfields
Integer

number of fields

Return

Array of Array of String

all records

# File lib/mysql-pr/protocol.rb, line 288
def retr_all_records(nfields)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push RawRecord.new(pkt, nfields, enc)
    end
    pkt.read(3)
    @server_status = pkt.utiny
    all_recs
  ensure
    set_state :READY
  end
end
retr_fields(n) click to toggle source

Retrieve n fields

Argument

n
Integer

number of fields

Return

Array of MysqlPR::Field

field list

# File lib/mysql-pr/protocol.rb, line 270
def retr_fields(n)
  check_state :FIELD
  begin
    fields = n.times.map{Field.new FieldPacket.parse(read)}
    read_eof_packet
    set_state :RESULT
    fields
  rescue
    set_state :READY
    raise
  end
end
set_option_command(opt) click to toggle source

Set option command

# File lib/mysql-pr/protocol.rb, line 357
def set_option_command(opt)
  simple_command [COM_SET_OPTION, opt].pack("Cv")
end
shutdown_command(level) click to toggle source

Shutdown command

# File lib/mysql-pr/protocol.rb, line 362
def shutdown_command(level)
  simple_command [COM_SHUTDOWN, level].pack("CC")
end
statistics_command() click to toggle source

Statistics command

# File lib/mysql-pr/protocol.rb, line 367
def statistics_command
  simple_command [COM_STATISTICS].pack("C")
end
stmt_close_command(stmt_id) click to toggle source

Stmt close command

Argument

stmt_id
Integer

statement id

# File lib/mysql-pr/protocol.rb, line 438
def stmt_close_command(stmt_id)
  synchronize do
    reset
    write [COM_STMT_CLOSE, stmt_id].pack("CV")
  end
end
stmt_execute_command(stmt_id, values) click to toggle source

Stmt execute command

Argument

stmt_id
Integer

statement id

values
Array

parameters

Return

Integer

number of fields

# File lib/mysql-pr/protocol.rb, line 403
def stmt_execute_command(stmt_id, values)
  check_state :READY
  begin
    reset
    write ExecutePacket.serialize(stmt_id, MysqlPR::Stmt::CURSOR_TYPE_NO_CURSOR, values)
    get_result
  rescue
    set_state :READY
    raise
  end
end
stmt_prepare_command(stmt) click to toggle source

Stmt prepare command

Argument

stmt
String

prepared statement

Return

Integer

statement id

Integer

number of parameters

Array of Field

field list

# File lib/mysql-pr/protocol.rb, line 378
def stmt_prepare_command(stmt)
  synchronize do
    reset
    write [COM_STMT_PREPARE, charset.convert(stmt)].pack("Ca*")
    res_packet = PrepareResultPacket.parse read
    if res_packet.param_count > 0
      res_packet.param_count.times{read}    # skip parameter packet
      read_eof_packet
    end
    if res_packet.field_count > 0
      fields = res_packet.field_count.times.map{Field.new FieldPacket.parse(read)}
      read_eof_packet
    else
      fields = []
    end
    return res_packet.statement_id, res_packet.param_count, fields
  end
end
stmt_retr_all_records(fields, charset) click to toggle source

Retrieve all records for prepared statement

Argument

fields
Array of MysqlPR::Fields

field list

charset
MysqlPR::Charset

Return

Array of Array of Object

all records

# File lib/mysql-pr/protocol.rb, line 421
def stmt_retr_all_records(fields, charset)
  check_state :RESULT
  enc = charset.encoding
  begin
    all_recs = []
    until (pkt = read).eof?
      all_recs.push StmtRawRecord.new(pkt, fields, enc)
    end
    all_recs
  ensure
    set_state :READY
  end
end

Private Instance Methods

check_state(st) click to toggle source
# File lib/mysql-pr/protocol.rb, line 451
def check_state(st)
  raise 'command out of sync' unless @state == st
end
encrypt_password(plain, scramble) click to toggle source

Encrypt password

Argument

plain
String

plain password.

scramble
String

scramble code from initial packet.

Return

String

encrypted password

# File lib/mysql-pr/protocol.rb, line 584
def encrypt_password(plain, scramble)
  return "" if plain.nil? or plain.empty?
  hash_stage1 = Digest::SHA1.digest plain
  hash_stage2 = Digest::SHA1.digest hash_stage1
  return hash_stage1.unpack("C*").zip(Digest::SHA1.digest(scramble+hash_stage2).unpack("C*")).map{|a,b| a^b}.pack("C*")
end
read() click to toggle source

Read one packet data

Return

Packet

packet data

Exception

ProtocolError

invalid packet sequence number

# File lib/mysql-pr/protocol.rb, line 489
def read
  ret = ""
  len = nil
  begin
    Timeout.timeout @read_timeout do
      header = @sock.read(4)
      raise EOFError unless header && header.length == 4
      len1, len2, seq = header.unpack("CvC")
      len = (len2 << 8) + len1
      raise ProtocolError, "invalid packet: sequence number mismatch(#{seq} != #{@seq}(expected))" if @seq != seq
      @seq = (@seq + 1) % 256
      ret = @sock.read(len)
      raise EOFError unless ret && ret.length == len
    end
  rescue EOFError
    raise ClientError::ServerGoneError, 'The MySQL server has gone away'
  rescue Timeout::Error
    raise ClientError, "read timeout"
  end while len == MAX_PACKET_LENGTH

  @sqlstate = "00000"

  # Error packet
  if ret[0] == ?\xff
    f, errno, marker, @sqlstate, message = ret.unpack("Cvaa5a*")
    unless marker == "#"
      f, errno, message = ret.unpack("Cva*")    # Version 4.0 Error
      @sqlstate = ""
    end
    if MysqlPR::ServerError::ERROR_MAP.key? errno
      raise MysqlPR::ServerError::ERROR_MAP[errno].new(message, @sqlstate)
    end
    raise MysqlPR::ServerError.new(message, @sqlstate)
  end
  Packet.new(ret)
end
read_eof_packet() click to toggle source

Read EOF packet

Exception

ProtocolError

packet is not EOF

# File lib/mysql-pr/protocol.rb, line 561
def read_eof_packet
  raise ProtocolError, "packet is not EOF" unless read.eof?
end
reset() click to toggle source

Reset sequence number

# File lib/mysql-pr/protocol.rb, line 480
def reset
  @seq = 0    # packet counter. reset by each command
end
set_state(st) click to toggle source
# File lib/mysql-pr/protocol.rb, line 455
def set_state(st)
  @state = st
  if st == :READY
    gc_disabled = GC.disable unless RUBY_PLATFORM == 'java'
    begin
      while st = @gc_stmt_queue.shift
        reset
        write [COM_STMT_CLOSE, st].pack("CV")
      end
    ensure
      GC.enable unless gc_disabled unless RUBY_PLATFORM == 'java'
    end
  end
end
simple_command(packet) click to toggle source

Send simple command

Argument

packet
String

packet data

Return

String

received data

# File lib/mysql-pr/protocol.rb, line 570
def simple_command(packet)
  synchronize do
    reset
    write packet
    read.to_s
  end
end
synchronize() { || ... } click to toggle source
# File lib/mysql-pr/protocol.rb, line 470
def synchronize
  begin
    check_state :READY
    return yield
  ensure
    set_state :READY
  end
end
write(data) click to toggle source

Write one packet data

Argument

data
String / IO

packet data. If data is nil, write empty packet.

# File lib/mysql-pr/protocol.rb, line 529
def write(data)
  begin
    @sock.sync = false
    if data.nil?
      Timeout.timeout @write_timeout do
        @sock.write [0, 0, @seq].pack("CvC")
      end
      @seq = (@seq + 1) % 256
    else
      data = StringIO.new data if data.is_a? String
      while d = data.read(MAX_PACKET_LENGTH)
        Timeout.timeout @write_timeout do
          @sock.write [d.length%256, d.length/256, @seq].pack("CvC")
          @sock.write d
        end
        @seq = (@seq + 1) % 256
      end
    end
    @sock.sync = true
    Timeout.timeout @write_timeout do
      @sock.flush
    end
  rescue Errno::EPIPE
    raise ClientError::ServerGoneError, 'The MySQL server has gone away'
  rescue Timeout::Error
    raise ClientError, "write timeout"
  end
end