class MysqlBinlog::BinlogEventParser

Parse binary log events from a provided binary log. Must be driven externally, but handles all the details of parsing an event header and the content of the various event types.

Attributes

binlog[RW]

The binary log object this event parser will parse events from.

parser[RW]

The binary log field parser extracted from the binlog object for convenience.

reader[RW]

The binary log reader extracted from the binlog object for convenience.

Public Class Methods

new(binlog_instance) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 198
def initialize(binlog_instance)
  @binlog = binlog_instance
  @reader = binlog_instance.reader
  @parser = binlog_instance.field_parser
  @table_map = {}
end

Public Instance Methods

_generic_rows_event_vh() click to toggle source

Parse the variable header from a v2 rows event. This is only used for ROWS_V_EXTRAINFO_TAG which is used by NDB. Ensure it can be skipped properly but don't bother parsing it.

# File lib/mysql_binlog/binlog_event_parser.rb, line 574
def _generic_rows_event_vh
  vh_payload_len = parser.read_uint16 - 2
  return unless vh_payload_len > 0

  reader.read(vh_payload_len)
end
delete_rows_event_v1(header)
delete_rows_event_v2(header)
diff_row_images(before, after) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 526
def diff_row_images(before, after)
  diff = {}
  before.each_with_index do |before_column, index|
    after_column = after[index]
    before_value = before_column.first[1]
    after_value = after_column.first[1]
    if before_value != after_value
      diff[index] = { before: before_value, after: after_value }
    end
  end
  diff
end
event_header() click to toggle source

Parse an event header, which is consistent for all event types.

Documented in sql/log_event.h line ~749 as “Common-Header”

Implemented in sql/log_event.cc line ~936 in Log_event::write_header

# File lib/mysql_binlog/binlog_event_parser.rb, line 210
def event_header
  header = {}
  header[:timestamp]      = parser.read_uint32
  event_type = parser.read_uint8
  header[:event_type]     = EVENT_TYPES[event_type] || "unknown_#{event_type}".to_sym
  header[:server_id]      = parser.read_uint32
  header[:event_length]   = parser.read_uint32
  header[:next_position]  = parser.read_uint32
  header[:flags] = parser.read_uint_bitmap_by_size_and_name(2, EVENT_HEADER_FLAGS)

  header
end
format_description_event(header) click to toggle source

Parse fields for a Format_description event.

Implemented in sql/log_event.cc line ~4123 in Format_description_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 226
def format_description_event(header)
  fields = {}
  fields[:binlog_version]   = parser.read_uint16
  fields[:server_version]   = parser.read_nstringz(50)
  fields[:create_timestamp] = parser.read_uint32
  fields[:header_length]    = parser.read_uint8
  fields
end
format_gtid(sid, gno_or_ivs) click to toggle source

6d9190a2-cca6-11e8-aa8c-42010aef0019:551845019

# File lib/mysql_binlog/binlog_event_parser.rb, line 645
def format_gtid(sid, gno_or_ivs)
  "#{format_gtid_sid(sid)}:#{gno_or_ivs}"
end
format_gtid_sid(sid) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 640
def format_gtid_sid(sid)
  [0..3, 4..5, 6..7, 8..9, 10..15].map { |r| in_hex(sid[r]) }.join("-")
end
generic_rows_event_v1(header) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 615
def generic_rows_event_v1(header)
  _generic_rows_event(header, contains_vh: false)
end
generic_rows_event_v2(header) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 623
def generic_rows_event_v2(header)
  _generic_rows_event(header, contains_vh: true)
end
gtid_log_event(header) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 670
def gtid_log_event(header)
  flags = parser.read_uint8
  sid = parser.read_nstring(16)
  gno = parser.read_uint64
  lts_type = parser.read_uint8
  lts_last_committed = parser.read_uint64
  lts_sequence_number = parser.read_uint64

  {
    flags: flags,
    gtid: format_gtid(sid, gno),
    lts: {
      type: lts_type,
      last_committed: lts_last_committed,
      sequence_number: lts_sequence_number,
    },
  }
end
in_hex(bytes) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 636
def in_hex(bytes)
  bytes.each_byte.map { |c| "%02x" % c.ord }.join
end
intvar_event(header) click to toggle source

Parse fields for an Intvar event.

Implemented in sql/log_event.cc line ~5326 in Intvar_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 347
def intvar_event(header)
  fields = {}

  fields[:intvar_type]  = parser.read_uint8
  fields[:intvar_name]  = INTVAR_EVENT_INTVAR_TYPES[fields[:intvar_type]]
  fields[:intvar_value] = parser.read_uint64

  fields
end
previous_gtids_log_event(header) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 649
def previous_gtids_log_event(header)
  n_sids = parser.read_uint64

  gtids = []
  n_sids.times do
    sid = parser.read_nstring(16)
    n_ivs = parser.read_uint64
    ivs = []
    n_ivs.times do
      iv_start = parser.read_uint64
      iv_end = parser.read_uint64
      ivs << "#{iv_start}-#{iv_end}"
    end
    gtids << format_gtid(sid, ivs.join(":"))
  end

  {
    previous_gtids: gtids
  }
end
query_event(header) click to toggle source

Parse fields for a Query event.

Implemented in sql/log_event.cc line ~2214 in Query_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 331
def query_event(header)
  fields = {}
  fields[:thread_id] = parser.read_uint32
  fields[:elapsed_time] = parser.read_uint32
  db_length = parser.read_uint8
  fields[:error_code] = parser.read_uint16
  fields[:status] = _query_event_status(header, fields)
  fields[:db] = parser.read_nstringz(db_length + 1)
  query_length = reader.remaining(header)
  fields[:query] = reader.read([query_length, binlog.max_query_length].min)
  fields
end
rand_event(header) click to toggle source

Parse fields for an Rand event.

Implemented in sql/log_event.cc line ~5454 in Rand_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 369
def rand_event(header)
  fields = {}
  fields[:seed1] = parser.read_uint64
  fields[:seed2] = parser.read_uint64
  fields
end
rotate_event(header) click to toggle source

Parse fields for a Rotate event.

Implemented in sql/log_event.cc line ~5157 in Rotate_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 238
def rotate_event(header)
  fields = {}
  fields[:pos] = parser.read_uint64
  name_length = reader.remaining(header)
  fields[:name] = parser.read_nstring(name_length)
  fields
end
rows_query_log_event(header) click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 631
def rows_query_log_event(header)
  reader.read(1) # skip useless byte length which is unused
  { query: reader.read(header[:payload_length]-1) }
end
table_map_event(header) click to toggle source

Parse fields for a Table_map event.

Implemented in sql/log_event.cc line ~8638 in Table_map_log_event::write_data_header and Table_map_log_event::write_data_body

# File lib/mysql_binlog/binlog_event_parser.rb, line 434
def table_map_event(header)
  fields = {}
  fields[:table_id] = parser.read_uint48
  fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, TABLE_MAP_EVENT_FLAGS)
  map_entry = @table_map[fields[:table_id]] = {}
  map_entry[:db] = parser.read_lpstringz
  map_entry[:table] = parser.read_lpstringz
  columns = parser.read_varint
  columns_type = parser.read_uint8_array(columns).map { |c| MYSQL_TYPES[c] || "unknown_#{c}".to_sym }
  columns_metadata = _table_map_event_column_metadata(columns_type)
  columns_nullable = parser.read_bit_array(columns)

  # Remap overloaded types before we piece together the entire event.
  columns.times do |c|
    if columns_metadata[c] and columns_metadata[c][:type]
      columns_type[c] = columns_metadata[c][:type]
      columns_metadata[c].delete :type
    end
  end

  map_entry[:columns] = columns.times.map do |c|
    {
      :type     => columns_type[c],
      :nullable => columns_nullable[c],
      :metadata => columns_metadata[c],
    }
  end

  fields[:map_entry] = map_entry
  fields
end
table_metadata_event(header) click to toggle source

Parse fields for a Table_metadata event, which is specific to Twitter MySQL releases at the moment.

Implemented in sql/log_event.cc line ~8772 (in Twitter MySQL) in Table_metadata_log_event::write_data_header and Table_metadata_log_event::write_data_body

# File lib/mysql_binlog/binlog_event_parser.rb, line 472
def table_metadata_event(header)
  fields = {}
  table_id = parser.read_uint48
  columns = parser.read_uint16

  fields[:table] = @table_map[table_id]
  fields[:flags] = parser.read_uint16
  fields[:columns] = columns.times.map do |c|
    descriptor_length = parser.read_uint32
    column_type = parser.read_uint8
    @table_map[table_id][:columns][c][:description] = {
      :type => MYSQL_TYPES[column_type] || "unknown_#{column_type}".to_sym,
      :length => parser.read_uint32,
      :scale => parser.read_uint8,
      :character_set => COLLATION[parser.read_uint16],
      :flags => parser.read_uint_bitmap_by_size_and_name(2,
                TABLE_METADATA_EVENT_COLUMN_FLAGS),
      :name => parser.read_varstring,
      :type_name => parser.read_varstring,
      :comment => parser.read_varstring,
    }
  end
  fields
end
update_rows_event_v1(header)
update_rows_event_v2(header)
write_rows_event_v1(header)
write_rows_event_v2(header)
xid_event(header) click to toggle source

Parse fields for an Xid event.

Implemented in sql/log_event.cc line ~5559 in Xid_log_event::write

# File lib/mysql_binlog/binlog_event_parser.rb, line 360
def xid_event(header)
  fields = {}
  fields[:xid] = parser.read_uint64
  fields
end

Private Instance Methods

_generic_rows_event(header, contains_vh: false) click to toggle source

Parse fields for any of the row-based replication row events:

  • Write_rows which is used for INSERT.

  • Update_rows which is used for UPDATE.

  • Delete_rows which is used for DELETE.

Implemented in sql/log_event.cc line ~8039 in Rows_log_event::write_data_header and Rows_log_event::write_data_body

# File lib/mysql_binlog/binlog_event_parser.rb, line 589
def _generic_rows_event(header, contains_vh: false)
  fields = {}
  table_id = parser.read_uint48
  fields[:table] = @table_map[table_id]
  fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, GENERIC_ROWS_EVENT_FLAGS)

  # Rows_log_event v2 events contain a variable-sized header. Only NDB
  # uses it right now, so let's just make sure it's skipped properly.
  _generic_rows_event_vh if contains_vh

  columns = parser.read_varint
  columns_used = {}
  case header[:event_type]
  when :write_rows_event_v1, :write_rows_event_v2
    columns_used[:after]  = parser.read_bit_array(columns)
  when :delete_rows_event_v1, :delete_rows_event_v2
    columns_used[:before] = parser.read_bit_array(columns)
  when :update_rows_event_v1, :update_rows_event_v2
    columns_used[:before] = parser.read_bit_array(columns)
    columns_used[:after]  = parser.read_bit_array(columns)
  end
  fields[:row_image] = _generic_rows_event_row_images(header, fields, columns_used)
  fields
end
_generic_rows_event_row_image(header, fields, columns_used) click to toggle source

Parse a single row image, which is comprised of a series of columns. Not all columns are present in the row image, the columns_used array of true and false values identifies which columns are present.

# File lib/mysql_binlog/binlog_event_parser.rb, line 500
def _generic_rows_event_row_image(header, fields, columns_used)
  row_image = []
  start_position = reader.position
  columns_null = parser.read_bit_array(fields[:table][:columns].size)
  fields[:table][:columns].each_with_index do |column, column_index|
    #puts "column #{column_index} #{column}: used=#{columns_used[column_index]}, null=#{columns_null[column_index]}"
    if !columns_used[column_index]
      row_image << nil
    elsif columns_null[column_index]
      row_image << { column_index => nil }
    else
      value = parser.read_mysql_type(column[:type], column[:metadata])
      row_image << {
        column_index => value,
      }
    end
  end
  end_position = reader.position

  {
    image: row_image,
    size: end_position-start_position
  }
end
_generic_rows_event_row_images(header, fields, columns_used) click to toggle source

Parse the row images present in a row-based replication row event. This is rather incomplete right now due missing support for many MySQL types, but can parse some basic events.

# File lib/mysql_binlog/binlog_event_parser.rb, line 542
def _generic_rows_event_row_images(header, fields, columns_used)
  row_images = []
  end_position = reader.position + reader.remaining(header)
  while reader.position < end_position
    row_image = {}
    case header[:event_type]
    when :write_rows_event_v1, :write_rows_event_v2
      row_image[:after]  = _generic_rows_event_row_image(header, fields, columns_used[:after])
    when :delete_rows_event_v1, :delete_rows_event_v1
      row_image[:before] = _generic_rows_event_row_image(header, fields, columns_used[:before])
    when :update_rows_event_v1, :update_rows_event_v2
      row_image[:before] = _generic_rows_event_row_image(header, fields, columns_used[:before])
      row_image[:after]  = _generic_rows_event_row_image(header, fields, columns_used[:after])
      row_image[:diff] = diff_row_images(row_image[:before][:image], row_image[:after][:image])
    end
    row_images << row_image
  end

  # We may have read too much, especially if any of the fields in the row
  # image were misunderstood. Raise a more specific exception here instead
  # of the generic OverReadException from the entire event.
  if reader.position > end_position
    raise OverReadException.new("Read past end of row image")
  end

  row_images
end
_query_event_status(header, fields) click to toggle source

Parse a dynamic status structure within a query_event, which consists of a status_length (uint16) followed by a number of status variables (determined by the status_length) each of which consist of:

  • A type code (uint8), one of QUERY_EVENT_STATUS_TYPES.

  • The content itself, determined by the type. Additional processing is required based on the type.

# File lib/mysql_binlog/binlog_event_parser.rb, line 271
def _query_event_status(header, fields)
  status = {}
  status_length = parser.read_uint16
  end_position = reader.position + status_length
  while reader.position < end_position
    status_type_id = parser.read_uint8
    status_type = QUERY_EVENT_STATUS_TYPES[status_type_id]
    status[status_type] = case status_type
    when :flags2
      parser.read_uint_bitmap_by_size_and_name(4, QUERY_EVENT_FLAGS2)
    when :sql_mode
      parser.read_uint64
    when :catalog_deprecated
      parser.read_lpstringz
    when :auto_increment
      {
        :increment => parser.read_uint16,
        :offset    => parser.read_uint16,
      }
    when :charset
      {
        :character_set_client => COLLATION[parser.read_uint16],
        :collation_connection => COLLATION[parser.read_uint16],
        :collation_server     => COLLATION[parser.read_uint16],
      }
    when :time_zone
      parser.read_lpstring
    when :catalog
      parser.read_lpstring
    when :lc_time_names
      parser.read_uint16
    when :charset_database
      parser.read_uint16
    when :table_map_for_update
      parser.read_uint64
    when :updated_db_names
      _query_event_status_updated_db_names
    when :commit_ts
      parser.read_uint64
    when :microseconds
      parser.read_uint24
    else
      raise "Unknown status type #{status_type_id}"
    end
  end

  # We may have read too much due to an invalid string read especially.
  # Raise a more specific exception here instead of the generic
  # OverReadException from the entire event.
  if reader.position > end_position
    raise OverReadException.new("Read past end of Query event status field")
  end

  status
end
_query_event_status_updated_db_names() click to toggle source
# File lib/mysql_binlog/binlog_event_parser.rb, line 246
def _query_event_status_updated_db_names
  db_count = parser.read_uint8
  return nil if db_count == QUERY_EVENT_OVER_MAX_DBS_IN_EVENT_MTS

  db_names = []
  db_count.times do |n|
    db_name = ""
    loop do
      c = reader.read(1)
      break if c == "\0"
      db_name << c
    end
    db_names << db_name
  end

  db_names
end
_table_map_event_column_metadata(columns_type) click to toggle source

Parse column metadata within a Table_map event.

# File lib/mysql_binlog/binlog_event_parser.rb, line 421
def _table_map_event_column_metadata(columns_type)
  length = parser.read_varint
  columns_type.map do |column|
    _table_map_event_column_metadata_read(column)
  end
end
_table_map_event_column_metadata_read(column_type) click to toggle source

Parse a number of bytes from the metadata section of a Table_map event representing various fields based on the column type of the column being processed.

# File lib/mysql_binlog/binlog_event_parser.rb, line 379
def _table_map_event_column_metadata_read(column_type)
  case column_type
  when :float, :double
    { :size => parser.read_uint8 }
  when :varchar
    { :max_length => parser.read_uint16 }
  when :bit
    bits  = parser.read_uint8
    bytes = parser.read_uint8
    {
      :bits  => (bytes * 8) + bits
    }
  when :newdecimal
    {
      :precision => parser.read_uint8,
      :decimals  => parser.read_uint8,
    }
  when :blob, :geometry, :json
    { :length_size => parser.read_uint8 }
  when :string, :var_string
    # The :string type sets a :real_type field to indicate the actual type
    # which is fundamentally incompatible with :string parsing. Setting
    # a :type key in this hash will cause table_map_event to override the
    # main field :type with the provided type here.
    # See Field_string::do_save_field_metadata for reference.
    metadata  = (parser.read_uint8 << 8) + parser.read_uint8
    real_type = MYSQL_TYPES[metadata >> 8]
    case real_type
    when :enum, :set
      { :type => real_type, :size => metadata & 0x00ff }
    else
      { :max_length  => (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff) }
    end
  when :timestamp2, :datetime2, :time2
    {
      :decimals => parser.read_uint8,
    }
  end
end