class MysqlBinlog::BinlogEventParser
Parse binary log events from a provided binary log. Must be driven externally, but handles all the details of parsing an event header and the content of the various event types.
Attributes
The binary log object this event parser will parse events from.
The binary log field parser extracted from the binlog object for convenience.
The binary log reader extracted from the binlog object for convenience.
Public Class Methods
# File lib/mysql_binlog/binlog_event_parser.rb, line 198 def initialize(binlog_instance) @binlog = binlog_instance @reader = binlog_instance.reader @parser = binlog_instance.field_parser @table_map = {} end
Public Instance Methods
Parse the variable header from a v2 rows event. This is only used for ROWS_V_EXTRAINFO_TAG which is used by NDB. Ensure it can be skipped properly but don't bother parsing it.
# File lib/mysql_binlog/binlog_event_parser.rb, line 574 def _generic_rows_event_vh vh_payload_len = parser.read_uint16 - 2 return unless vh_payload_len > 0 reader.read(vh_payload_len) end
# File lib/mysql_binlog/binlog_event_parser.rb, line 526 def diff_row_images(before, after) diff = {} before.each_with_index do |before_column, index| after_column = after[index] before_value = before_column.first[1] after_value = after_column.first[1] if before_value != after_value diff[index] = { before: before_value, after: after_value } end end diff end
Parse an event header, which is consistent for all event types.
Documented in sql/log_event.h line ~749 as “Common-Header”
Implemented in sql/log_event.cc line ~936 in Log_event::write_header
# File lib/mysql_binlog/binlog_event_parser.rb, line 210 def event_header header = {} header[:timestamp] = parser.read_uint32 event_type = parser.read_uint8 header[:event_type] = EVENT_TYPES[event_type] || "unknown_#{event_type}".to_sym header[:server_id] = parser.read_uint32 header[:event_length] = parser.read_uint32 header[:next_position] = parser.read_uint32 header[:flags] = parser.read_uint_bitmap_by_size_and_name(2, EVENT_HEADER_FLAGS) header end
Parse fields for a Format_description
event.
Implemented in sql/log_event.cc line ~4123 in Format_description_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 226 def format_description_event(header) fields = {} fields[:binlog_version] = parser.read_uint16 fields[:server_version] = parser.read_nstringz(50) fields[:create_timestamp] = parser.read_uint32 fields[:header_length] = parser.read_uint8 fields end
6d9190a2-cca6-11e8-aa8c-42010aef0019:551845019
# File lib/mysql_binlog/binlog_event_parser.rb, line 645 def format_gtid(sid, gno_or_ivs) "#{format_gtid_sid(sid)}:#{gno_or_ivs}" end
# File lib/mysql_binlog/binlog_event_parser.rb, line 640 def format_gtid_sid(sid) [0..3, 4..5, 6..7, 8..9, 10..15].map { |r| in_hex(sid[r]) }.join("-") end
# File lib/mysql_binlog/binlog_event_parser.rb, line 615 def generic_rows_event_v1(header) _generic_rows_event(header, contains_vh: false) end
# File lib/mysql_binlog/binlog_event_parser.rb, line 623 def generic_rows_event_v2(header) _generic_rows_event(header, contains_vh: true) end
# File lib/mysql_binlog/binlog_event_parser.rb, line 670 def gtid_log_event(header) flags = parser.read_uint8 sid = parser.read_nstring(16) gno = parser.read_uint64 lts_type = parser.read_uint8 lts_last_committed = parser.read_uint64 lts_sequence_number = parser.read_uint64 { flags: flags, gtid: format_gtid(sid, gno), lts: { type: lts_type, last_committed: lts_last_committed, sequence_number: lts_sequence_number, }, } end
# File lib/mysql_binlog/binlog_event_parser.rb, line 636 def in_hex(bytes) bytes.each_byte.map { |c| "%02x" % c.ord }.join end
Parse fields for an Intvar
event.
Implemented in sql/log_event.cc line ~5326 in Intvar_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 347 def intvar_event(header) fields = {} fields[:intvar_type] = parser.read_uint8 fields[:intvar_name] = INTVAR_EVENT_INTVAR_TYPES[fields[:intvar_type]] fields[:intvar_value] = parser.read_uint64 fields end
# File lib/mysql_binlog/binlog_event_parser.rb, line 649 def previous_gtids_log_event(header) n_sids = parser.read_uint64 gtids = [] n_sids.times do sid = parser.read_nstring(16) n_ivs = parser.read_uint64 ivs = [] n_ivs.times do iv_start = parser.read_uint64 iv_end = parser.read_uint64 ivs << "#{iv_start}-#{iv_end}" end gtids << format_gtid(sid, ivs.join(":")) end { previous_gtids: gtids } end
Parse fields for a Query
event.
Implemented in sql/log_event.cc line ~2214 in Query_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 331 def query_event(header) fields = {} fields[:thread_id] = parser.read_uint32 fields[:elapsed_time] = parser.read_uint32 db_length = parser.read_uint8 fields[:error_code] = parser.read_uint16 fields[:status] = _query_event_status(header, fields) fields[:db] = parser.read_nstringz(db_length + 1) query_length = reader.remaining(header) fields[:query] = reader.read([query_length, binlog.max_query_length].min) fields end
Parse fields for an Rand
event.
Implemented in sql/log_event.cc line ~5454 in Rand_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 369 def rand_event(header) fields = {} fields[:seed1] = parser.read_uint64 fields[:seed2] = parser.read_uint64 fields end
Parse fields for a Rotate
event.
Implemented in sql/log_event.cc line ~5157 in Rotate_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 238 def rotate_event(header) fields = {} fields[:pos] = parser.read_uint64 name_length = reader.remaining(header) fields[:name] = parser.read_nstring(name_length) fields end
# File lib/mysql_binlog/binlog_event_parser.rb, line 631 def rows_query_log_event(header) reader.read(1) # skip useless byte length which is unused { query: reader.read(header[:payload_length]-1) } end
Parse fields for a Table_map
event.
Implemented in sql/log_event.cc line ~8638 in Table_map_log_event::write_data_header and Table_map_log_event::write_data_body
# File lib/mysql_binlog/binlog_event_parser.rb, line 434 def table_map_event(header) fields = {} fields[:table_id] = parser.read_uint48 fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, TABLE_MAP_EVENT_FLAGS) map_entry = @table_map[fields[:table_id]] = {} map_entry[:db] = parser.read_lpstringz map_entry[:table] = parser.read_lpstringz columns = parser.read_varint columns_type = parser.read_uint8_array(columns).map { |c| MYSQL_TYPES[c] || "unknown_#{c}".to_sym } columns_metadata = _table_map_event_column_metadata(columns_type) columns_nullable = parser.read_bit_array(columns) # Remap overloaded types before we piece together the entire event. columns.times do |c| if columns_metadata[c] and columns_metadata[c][:type] columns_type[c] = columns_metadata[c][:type] columns_metadata[c].delete :type end end map_entry[:columns] = columns.times.map do |c| { :type => columns_type[c], :nullable => columns_nullable[c], :metadata => columns_metadata[c], } end fields[:map_entry] = map_entry fields end
Parse fields for a Table_metadata
event, which is specific to Twitter MySQL releases at the moment.
Implemented in sql/log_event.cc line ~8772 (in Twitter MySQL) in Table_metadata_log_event::write_data_header and Table_metadata_log_event::write_data_body
# File lib/mysql_binlog/binlog_event_parser.rb, line 472 def table_metadata_event(header) fields = {} table_id = parser.read_uint48 columns = parser.read_uint16 fields[:table] = @table_map[table_id] fields[:flags] = parser.read_uint16 fields[:columns] = columns.times.map do |c| descriptor_length = parser.read_uint32 column_type = parser.read_uint8 @table_map[table_id][:columns][c][:description] = { :type => MYSQL_TYPES[column_type] || "unknown_#{column_type}".to_sym, :length => parser.read_uint32, :scale => parser.read_uint8, :character_set => COLLATION[parser.read_uint16], :flags => parser.read_uint_bitmap_by_size_and_name(2, TABLE_METADATA_EVENT_COLUMN_FLAGS), :name => parser.read_varstring, :type_name => parser.read_varstring, :comment => parser.read_varstring, } end fields end
Parse fields for an Xid
event.
Implemented in sql/log_event.cc line ~5559 in Xid_log_event::write
# File lib/mysql_binlog/binlog_event_parser.rb, line 360 def xid_event(header) fields = {} fields[:xid] = parser.read_uint64 fields end
Private Instance Methods
Parse fields for any of the row-based replication row events:
-
Write_rows
which is used forINSERT
. -
Update_rows
which is used forUPDATE
. -
Delete_rows
which is used forDELETE
.
Implemented in sql/log_event.cc line ~8039 in Rows_log_event::write_data_header and Rows_log_event::write_data_body
# File lib/mysql_binlog/binlog_event_parser.rb, line 589 def _generic_rows_event(header, contains_vh: false) fields = {} table_id = parser.read_uint48 fields[:table] = @table_map[table_id] fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, GENERIC_ROWS_EVENT_FLAGS) # Rows_log_event v2 events contain a variable-sized header. Only NDB # uses it right now, so let's just make sure it's skipped properly. _generic_rows_event_vh if contains_vh columns = parser.read_varint columns_used = {} case header[:event_type] when :write_rows_event_v1, :write_rows_event_v2 columns_used[:after] = parser.read_bit_array(columns) when :delete_rows_event_v1, :delete_rows_event_v2 columns_used[:before] = parser.read_bit_array(columns) when :update_rows_event_v1, :update_rows_event_v2 columns_used[:before] = parser.read_bit_array(columns) columns_used[:after] = parser.read_bit_array(columns) end fields[:row_image] = _generic_rows_event_row_images(header, fields, columns_used) fields end
Parse a single row image, which is comprised of a series of columns. Not all columns are present in the row image, the columns_used array of true and false values identifies which columns are present.
# File lib/mysql_binlog/binlog_event_parser.rb, line 500 def _generic_rows_event_row_image(header, fields, columns_used) row_image = [] start_position = reader.position columns_null = parser.read_bit_array(fields[:table][:columns].size) fields[:table][:columns].each_with_index do |column, column_index| #puts "column #{column_index} #{column}: used=#{columns_used[column_index]}, null=#{columns_null[column_index]}" if !columns_used[column_index] row_image << nil elsif columns_null[column_index] row_image << { column_index => nil } else value = parser.read_mysql_type(column[:type], column[:metadata]) row_image << { column_index => value, } end end end_position = reader.position { image: row_image, size: end_position-start_position } end
Parse the row images present in a row-based replication row event. This is rather incomplete right now due missing support for many MySQL types, but can parse some basic events.
# File lib/mysql_binlog/binlog_event_parser.rb, line 542 def _generic_rows_event_row_images(header, fields, columns_used) row_images = [] end_position = reader.position + reader.remaining(header) while reader.position < end_position row_image = {} case header[:event_type] when :write_rows_event_v1, :write_rows_event_v2 row_image[:after] = _generic_rows_event_row_image(header, fields, columns_used[:after]) when :delete_rows_event_v1, :delete_rows_event_v1 row_image[:before] = _generic_rows_event_row_image(header, fields, columns_used[:before]) when :update_rows_event_v1, :update_rows_event_v2 row_image[:before] = _generic_rows_event_row_image(header, fields, columns_used[:before]) row_image[:after] = _generic_rows_event_row_image(header, fields, columns_used[:after]) row_image[:diff] = diff_row_images(row_image[:before][:image], row_image[:after][:image]) end row_images << row_image end # We may have read too much, especially if any of the fields in the row # image were misunderstood. Raise a more specific exception here instead # of the generic OverReadException from the entire event. if reader.position > end_position raise OverReadException.new("Read past end of row image") end row_images end
Parse a dynamic status
structure within a query_event
, which consists of a status_length (uint16) followed by a number of status variables (determined by the status_length
) each of which consist of:
-
A type code (
uint8
), one ofQUERY_EVENT_STATUS_TYPES
. -
The content itself, determined by the type. Additional processing is required based on the type.
# File lib/mysql_binlog/binlog_event_parser.rb, line 271 def _query_event_status(header, fields) status = {} status_length = parser.read_uint16 end_position = reader.position + status_length while reader.position < end_position status_type_id = parser.read_uint8 status_type = QUERY_EVENT_STATUS_TYPES[status_type_id] status[status_type] = case status_type when :flags2 parser.read_uint_bitmap_by_size_and_name(4, QUERY_EVENT_FLAGS2) when :sql_mode parser.read_uint64 when :catalog_deprecated parser.read_lpstringz when :auto_increment { :increment => parser.read_uint16, :offset => parser.read_uint16, } when :charset { :character_set_client => COLLATION[parser.read_uint16], :collation_connection => COLLATION[parser.read_uint16], :collation_server => COLLATION[parser.read_uint16], } when :time_zone parser.read_lpstring when :catalog parser.read_lpstring when :lc_time_names parser.read_uint16 when :charset_database parser.read_uint16 when :table_map_for_update parser.read_uint64 when :updated_db_names _query_event_status_updated_db_names when :commit_ts parser.read_uint64 when :microseconds parser.read_uint24 else raise "Unknown status type #{status_type_id}" end end # We may have read too much due to an invalid string read especially. # Raise a more specific exception here instead of the generic # OverReadException from the entire event. if reader.position > end_position raise OverReadException.new("Read past end of Query event status field") end status end
# File lib/mysql_binlog/binlog_event_parser.rb, line 246 def _query_event_status_updated_db_names db_count = parser.read_uint8 return nil if db_count == QUERY_EVENT_OVER_MAX_DBS_IN_EVENT_MTS db_names = [] db_count.times do |n| db_name = "" loop do c = reader.read(1) break if c == "\0" db_name << c end db_names << db_name end db_names end
Parse column metadata within a Table_map
event.
# File lib/mysql_binlog/binlog_event_parser.rb, line 421 def _table_map_event_column_metadata(columns_type) length = parser.read_varint columns_type.map do |column| _table_map_event_column_metadata_read(column) end end
Parse a number of bytes from the metadata section of a Table_map
event representing various fields based on the column type of the column being processed.
# File lib/mysql_binlog/binlog_event_parser.rb, line 379 def _table_map_event_column_metadata_read(column_type) case column_type when :float, :double { :size => parser.read_uint8 } when :varchar { :max_length => parser.read_uint16 } when :bit bits = parser.read_uint8 bytes = parser.read_uint8 { :bits => (bytes * 8) + bits } when :newdecimal { :precision => parser.read_uint8, :decimals => parser.read_uint8, } when :blob, :geometry, :json { :length_size => parser.read_uint8 } when :string, :var_string # The :string type sets a :real_type field to indicate the actual type # which is fundamentally incompatible with :string parsing. Setting # a :type key in this hash will cause table_map_event to override the # main field :type with the provided type here. # See Field_string::do_save_field_metadata for reference. metadata = (parser.read_uint8 << 8) + parser.read_uint8 real_type = MYSQL_TYPES[metadata >> 8] case real_type when :enum, :set { :type => real_type, :size => metadata & 0x00ff } else { :max_length => (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff) } end when :timestamp2, :datetime2, :time2 { :decimals => parser.read_uint8, } end end