diff options
30 files changed, 2079 insertions, 123 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 6a52c9fe29a..94443791441 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -1002,6 +1002,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, switch (ev_type) { case QUERY_EVENT: + case QUERY_COMPRESSED_EVENT: { Query_log_event *qe= (Query_log_event*)ev; if (!qe->is_trans_keyword()) @@ -1243,6 +1244,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, case WRITE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: { Rows_log_event *e= (Rows_log_event*) ev; if (print_row_event(print_event_info, ev, e->get_table_id(), diff --git a/mysql-test/include/binlog_start_pos.inc b/mysql-test/include/binlog_start_pos.inc index a187e18b3a4..942a124d639 100644 --- a/mysql-test/include/binlog_start_pos.inc +++ b/mysql-test/include/binlog_start_pos.inc @@ -10,19 +10,19 @@ # # Format_description_log_event length = # 19 /* event common header */ + -# 58 /* misc stuff in the Format description header */ + +# 57 /* misc stuff in the Format description header */ + # number of events + # 1 /* Checksum algorithm */ + # 4 /* CRC32 length */ # -# With current number of events = 164, +# With current number of events = 171, # -# binlog_start_pos = 4 + 19 + 57 + 163 + 1 + 4 = 249. +# binlog_start_pos = 4 + 19 + 57 + 171 + 1 + 4 = 256. # ############################################################################## -let $binlog_start_pos=249; +let $binlog_start_pos=256; --disable_query_log -SET @binlog_start_pos=249; +SET @binlog_start_pos=256; --enable_query_log diff --git a/mysql-test/include/show_binlog_events2.inc b/mysql-test/include/show_binlog_events2.inc index eefefe4bfbe..84c62cced66 100644 --- a/mysql-test/include/show_binlog_events2.inc +++ b/mysql-test/include/show_binlog_events2.inc @@ -4,7 +4,7 @@ if ($binlog_start) } if (!$binlog_start) { - --let $_binlog_start=249 + --let $_binlog_start=256 } if ($binlog_file) { diff --git a/mysql-test/r/flush2.result b/mysql-test/r/flush2.result index ff5d8755f01..a66b0d5c688 100644 --- a/mysql-test/r/flush2.result +++ b/mysql-test/r/flush2.result @@ -4,6 +4,8 @@ show variables like 'log_bin%'; Variable_name Value log_bin OFF log_bin_basename +log_bin_compress OFF +log_bin_compress_min_len 256 log_bin_index log_bin_trust_function_creators ON show variables like 'relay_log%'; @@ -20,6 +22,8 @@ show variables like 'log_bin%'; Variable_name Value log_bin OFF log_bin_basename +log_bin_compress OFF +log_bin_compress_min_len 256 log_bin_index log_bin_trust_function_creators ON show variables like 'relay_log%'; diff --git a/mysql-test/r/mysqlbinlog_row_compressed.result b/mysql-test/r/mysqlbinlog_row_compressed.result new file mode 100644 index 00000000000..a612433fc2f --- /dev/null +++ b/mysql-test/r/mysqlbinlog_row_compressed.result @@ -0,0 +1,453 @@ +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; +FLUSH BINARY LOGS; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at 4 +#<date> server id 1 end_log_pos 256 CRC32 XXX Start: xxx +ROLLBACK/*!*/; +# at 256 +#<date> server id 1 end_log_pos 285 CRC32 XXX Gtid list [] +# at 285 +#<date> server id 1 end_log_pos 329 CRC32 XXX Binlog checkpoint master-bin.000001 +# at 329 +#<date> server id 1 end_log_pos 371 CRC32 XXX GTID 0-1-1 ddl +/*!100101 SET @@session.skip_parallel_replication=0*//*!*/; +/*!100001 SET @@session.gtid_domain_id=0*//*!*/; +/*!100001 SET @@session.server_id=1*//*!*/; +/*!100001 SET @@session.gtid_seq_no=1*//*!*/; +# at 371 +#<date> server id 1 end_log_pos 533 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +use `test`/*!*/; +SET TIMESTAMP=X/*!*/; +SET @@session.pseudo_thread_id=4/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1, @@session.check_constraint_checks=1/*!*/; +SET @@session.sql_mode=1342177280/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at 533 +#<date> server id 1 end_log_pos 575 CRC32 XXX GTID 0-1-2 ddl +/*!100001 SET @@session.gtid_seq_no=2*//*!*/; +# at 575 +#<date> server id 1 end_log_pos 727 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at 727 +#<date> server id 1 end_log_pos 769 CRC32 XXX GTID 0-1-3 +/*!100001 SET @@session.gtid_seq_no=3*//*!*/; +BEGIN +/*!*/; +# at 769 +#<date> server id 1 end_log_pos 825 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 825 +#<date> server id 1 end_log_pos 893 CRC32 XXX Write_compressed_rows: table id 30 flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 893 +#<date> server id 1 end_log_pos 966 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 966 +#<date> server id 1 end_log_pos 1008 CRC32 XXX GTID 0-1-4 +/*!100001 SET @@session.gtid_seq_no=4*//*!*/; +BEGIN +/*!*/; +# at 1008 +#<date> server id 1 end_log_pos 1064 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1064 +#<date> server id 1 end_log_pos 1131 CRC32 XXX Write_compressed_rows: table id 30 flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +# at 1131 +#<date> server id 1 end_log_pos 1204 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1204 +#<date> server id 1 end_log_pos 1246 CRC32 XXX GTID 0-1-5 +/*!100001 SET @@session.gtid_seq_no=5*//*!*/; +BEGIN +/*!*/; +# at 1246 +#<date> server id 1 end_log_pos 1302 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1302 +#<date> server id 1 end_log_pos 1368 CRC32 XXX Write_compressed_rows: table id 30 flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* MEDIUMINT meta=0 nullable=1 is_null=1 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 1368 +#<date> server id 1 end_log_pos 1441 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1441 +#<date> server id 1 end_log_pos 1483 CRC32 XXX GTID 0-1-6 +/*!100001 SET @@session.gtid_seq_no=6*//*!*/; +BEGIN +/*!*/; +# at 1483 +#<date> server id 1 end_log_pos 1539 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1539 +#<date> server id 1 end_log_pos 1606 CRC32 XXX Write_compressed_rows: table id 30 flags: STMT_END_F +### INSERT INTO `test`.`t1` +### SET +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=0 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 1606 +#<date> server id 1 end_log_pos 1679 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1679 +#<date> server id 1 end_log_pos 1721 CRC32 XXX GTID 0-1-7 +/*!100001 SET @@session.gtid_seq_no=7*//*!*/; +BEGIN +/*!*/; +# at 1721 +#<date> server id 1 end_log_pos 1777 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 1777 +#<date> server id 1 end_log_pos 1868 CRC32 XXX Write_compressed_rows: table id 31 flags: STMT_END_F +### INSERT INTO `test`.`t2` +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### INSERT INTO `test`.`t2` +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### INSERT INTO `test`.`t2` +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* INT meta=0 nullable=1 is_null=1 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### INSERT INTO `test`.`t2` +### SET +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=0 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 1868 +#<date> server id 1 end_log_pos 1941 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1941 +#<date> server id 1 end_log_pos 1983 CRC32 XXX GTID 0-1-8 +/*!100001 SET @@session.gtid_seq_no=8*//*!*/; +BEGIN +/*!*/; +# at 1983 +#<date> server id 1 end_log_pos 2039 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 2039 +#<date> server id 1 end_log_pos 2138 CRC32 XXX Update_compressed_rows: table id 31 flags: STMT_END_F +### UPDATE `test`.`t2` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### SET +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### UPDATE `test`.`t2` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=4 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### SET +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### UPDATE `test`.`t2` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* INT meta=0 nullable=1 is_null=1 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### SET +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 2138 +#<date> server id 1 end_log_pos 2211 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2211 +#<date> server id 1 end_log_pos 2253 CRC32 XXX GTID 0-1-9 +/*!100001 SET @@session.gtid_seq_no=9*//*!*/; +BEGIN +/*!*/; +# at 2253 +#<date> server id 1 end_log_pos 2309 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 2309 +#<date> server id 1 end_log_pos 2401 CRC32 XXX Delete_compressed_rows: table id 30 flags: STMT_END_F +### DELETE FROM `test`.`t1` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=4 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=NULL /* MEDIUMINT meta=0 nullable=1 is_null=1 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t1` +### WHERE +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* TINYINT meta=0 nullable=1 is_null=0 */ +### @5=0 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @6=5 /* LONGINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 2401 +#<date> server id 1 end_log_pos 2474 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2474 +#<date> server id 1 end_log_pos 2516 CRC32 XXX GTID 0-1-10 +/*!100001 SET @@session.gtid_seq_no=10*//*!*/; +BEGIN +/*!*/; +# at 2516 +#<date> server id 1 end_log_pos 2572 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 2572 +#<date> server id 1 end_log_pos 2657 CRC32 XXX Delete_compressed_rows: table id 31 flags: STMT_END_F +### DELETE FROM `test`.`t2` +### WHERE +### @1=10 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=11 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=12 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=5 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +### DELETE FROM `test`.`t2` +### WHERE +### @1=13 /* INT meta=0 nullable=0 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### @3=2 /* INT meta=0 nullable=1 is_null=0 */ +### @4=3 /* INT meta=0 nullable=1 is_null=0 */ +### @5=0 /* INT meta=0 nullable=1 is_null=0 */ +### @6=5 /* MEDIUMINT meta=0 nullable=1 is_null=0 */ +### @7=6 /* INT meta=0 nullable=1 is_null=0 */ +### @8=7 /* INT meta=0 nullable=1 is_null=0 */ +### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ +# at 2657 +#<date> server id 1 end_log_pos 2730 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2730 +#<date> server id 1 end_log_pos 2778 CRC32 XXX Rotate to master-bin.000002 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/; + +Test mysqlbinlog | mysql type point-in-time recovery with compressed events. + +FLUSH BINARY LOGS; +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, c VARCHAR(100)); +INSERT INTO t3 VALUES (0, 10, "hello"); +BEGIN; +INSERT INTO t3 VALUES (1, 10, "cat"), (2, 10, "mouse"), (3, 10, "dog"); +INSERT INTO t3 VALUES (4, 10, "goodbye"); +COMMIT; +UPDATE t3 SET b=b+100 where a<>1; +DELETE FROM t3 WHERE a=2; +SET @old_image=@@binlog_row_image; +SET binlog_row_image=minimal; +INSERT INTO t3 VALUES (5, 20, "red"), (6, 30, "green"), (7, 40, "blue"); +INSERT INTO t3 VALUES (8, 20, "rigel"); +UPDATE t3 SET c = concat("colour of ", c) WHERE a > 5; +UPDATE t3 SET b=b*2 WHERE a IN (5,6,7); +DELETE FROM t3 WHERE a=6; +SET binlog_row_image=@old_image; +SELECT * FROM t3 ORDER BY a; +a b c +0 110 hello +1 10 cat +3 110 dog +4 110 goodbye +5 40 red +7 80 colour of blue +8 20 colour of rigel +FLUSH LOGS; +DROP TABLE t3; +SELECT * FROM t3 ORDER BY a; +a b c +0 110 hello +1 10 cat +3 110 dog +4 110 goodbye +5 40 red +7 80 colour of blue +8 20 colour of rigel +DROP TABLE t1,t2,t3; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/r/mysqlbinlog_row_minimal.result b/mysql-test/r/mysqlbinlog_row_minimal.result index 2737d61eca4..2fb721d4103 100644 --- a/mysql-test/r/mysqlbinlog_row_minimal.result +++ b/mysql-test/r/mysqlbinlog_row_minimal.result @@ -14,20 +14,20 @@ FLUSH BINARY LOGS; /*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; DELIMITER /*!*/; # at 4 -#<date> server id 1 end_log_pos 249 CRC32 XXX Start: xxx +#<date> server id 1 end_log_pos 256 CRC32 XXX Start: xxx ROLLBACK/*!*/; -# at 249 -#<date> server id 1 end_log_pos 278 CRC32 XXX Gtid list [] -# at 278 -#<date> server id 1 end_log_pos 322 CRC32 XXX Binlog checkpoint master-bin.000001 -# at 322 -#<date> server id 1 end_log_pos 364 CRC32 XXX GTID 0-1-1 ddl +# at 256 +#<date> server id 1 end_log_pos 285 CRC32 XXX Gtid list [] +# at 285 +#<date> server id 1 end_log_pos 329 CRC32 XXX Binlog checkpoint master-bin.000001 +# at 329 +#<date> server id 1 end_log_pos 371 CRC32 XXX GTID 0-1-1 ddl /*!100101 SET @@session.skip_parallel_replication=0*//*!*/; /*!100001 SET @@session.gtid_domain_id=0*//*!*/; /*!100001 SET @@session.server_id=1*//*!*/; /*!100001 SET @@session.gtid_seq_no=1*//*!*/; -# at 364 -#<date> server id 1 end_log_pos 548 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 371 +#<date> server id 1 end_log_pos 555 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 use `test`/*!*/; SET TIMESTAMP=X/*!*/; SET @@session.pseudo_thread_id=4/*!*/; @@ -40,23 +40,23 @@ SET @@session.lc_time_names=0/*!*/; SET @@session.collation_database=DEFAULT/*!*/; CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)) /*!*/; -# at 548 -#<date> server id 1 end_log_pos 590 CRC32 XXX GTID 0-1-2 ddl +# at 555 +#<date> server id 1 end_log_pos 597 CRC32 XXX GTID 0-1-2 ddl /*!100001 SET @@session.gtid_seq_no=2*//*!*/; -# at 590 -#<date> server id 1 end_log_pos 767 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 597 +#<date> server id 1 end_log_pos 774 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)) /*!*/; -# at 767 -#<date> server id 1 end_log_pos 809 CRC32 XXX GTID 0-1-3 +# at 774 +#<date> server id 1 end_log_pos 816 CRC32 XXX GTID 0-1-3 /*!100001 SET @@session.gtid_seq_no=3*//*!*/; BEGIN /*!*/; -# at 809 -#<date> server id 1 end_log_pos 865 CRC32 XXX Table_map: `test`.`t1` mapped to number num -# at 865 -#<date> server id 1 end_log_pos 934 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F +# at 816 +#<date> server id 1 end_log_pos 872 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 872 +#<date> server id 1 end_log_pos 941 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -68,20 +68,20 @@ BEGIN ### @7=6 /* INT meta=0 nullable=1 is_null=0 */ ### @8=7 /* INT meta=0 nullable=1 is_null=0 */ ### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */ -# at 934 -#<date> server id 1 end_log_pos 1007 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 941 +#<date> server id 1 end_log_pos 1014 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 1007 -#<date> server id 1 end_log_pos 1049 CRC32 XXX GTID 0-1-4 +# at 1014 +#<date> server id 1 end_log_pos 1056 CRC32 XXX GTID 0-1-4 /*!100001 SET @@session.gtid_seq_no=4*//*!*/; BEGIN /*!*/; -# at 1049 -#<date> server id 1 end_log_pos 1105 CRC32 XXX Table_map: `test`.`t1` mapped to number num -# at 1105 -#<date> server id 1 end_log_pos 1173 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F +# at 1056 +#<date> server id 1 end_log_pos 1112 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1112 +#<date> server id 1 end_log_pos 1180 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=11 /* INT meta=0 nullable=0 is_null=0 */ @@ -93,20 +93,20 @@ BEGIN ### @7=6 /* INT meta=0 nullable=1 is_null=0 */ ### @8=7 /* INT meta=0 nullable=1 is_null=0 */ ### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */ -# at 1173 -#<date> server id 1 end_log_pos 1246 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 1180 +#<date> server id 1 end_log_pos 1253 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 1246 -#<date> server id 1 end_log_pos 1288 CRC32 XXX GTID 0-1-5 +# at 1253 +#<date> server id 1 end_log_pos 1295 CRC32 XXX GTID 0-1-5 /*!100001 SET @@session.gtid_seq_no=5*//*!*/; BEGIN /*!*/; -# at 1288 -#<date> server id 1 end_log_pos 1344 CRC32 XXX Table_map: `test`.`t1` mapped to number num -# at 1344 -#<date> server id 1 end_log_pos 1411 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F +# at 1295 +#<date> server id 1 end_log_pos 1351 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1351 +#<date> server id 1 end_log_pos 1418 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=12 /* INT meta=0 nullable=0 is_null=0 */ @@ -118,20 +118,20 @@ BEGIN ### @7=6 /* INT meta=0 nullable=1 is_null=0 */ ### @8=7 /* INT meta=0 nullable=1 is_null=0 */ ### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ -# at 1411 -#<date> server id 1 end_log_pos 1484 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 1418 +#<date> server id 1 end_log_pos 1491 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 1484 -#<date> server id 1 end_log_pos 1526 CRC32 XXX GTID 0-1-6 +# at 1491 +#<date> server id 1 end_log_pos 1533 CRC32 XXX GTID 0-1-6 /*!100001 SET @@session.gtid_seq_no=6*//*!*/; BEGIN /*!*/; -# at 1526 -#<date> server id 1 end_log_pos 1582 CRC32 XXX Table_map: `test`.`t1` mapped to number num -# at 1582 -#<date> server id 1 end_log_pos 1652 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F +# at 1533 +#<date> server id 1 end_log_pos 1589 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 1589 +#<date> server id 1 end_log_pos 1659 CRC32 XXX Write_rows: table id 30 flags: STMT_END_F ### INSERT INTO `test`.`t1` ### SET ### @1=13 /* INT meta=0 nullable=0 is_null=0 */ @@ -143,20 +143,20 @@ BEGIN ### @7=6 /* INT meta=0 nullable=1 is_null=0 */ ### @8=7 /* INT meta=0 nullable=1 is_null=0 */ ### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ -# at 1652 -#<date> server id 1 end_log_pos 1725 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 1659 +#<date> server id 1 end_log_pos 1732 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 1725 -#<date> server id 1 end_log_pos 1767 CRC32 XXX GTID 0-1-7 +# at 1732 +#<date> server id 1 end_log_pos 1774 CRC32 XXX GTID 0-1-7 /*!100001 SET @@session.gtid_seq_no=7*//*!*/; BEGIN /*!*/; -# at 1767 -#<date> server id 1 end_log_pos 1823 CRC32 XXX Table_map: `test`.`t2` mapped to number num -# at 1823 -#<date> server id 1 end_log_pos 1990 CRC32 XXX Write_rows: table id 31 flags: STMT_END_F +# at 1774 +#<date> server id 1 end_log_pos 1830 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 1830 +#<date> server id 1 end_log_pos 1997 CRC32 XXX Write_rows: table id 31 flags: STMT_END_F ### INSERT INTO `test`.`t2` ### SET ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -201,20 +201,20 @@ BEGIN ### @7=6 /* INT meta=0 nullable=1 is_null=0 */ ### @8=7 /* INT meta=0 nullable=1 is_null=0 */ ### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */ -# at 1990 -#<date> server id 1 end_log_pos 2063 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 1997 +#<date> server id 1 end_log_pos 2070 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 2063 -#<date> server id 1 end_log_pos 2105 CRC32 XXX GTID 0-1-8 +# at 2070 +#<date> server id 1 end_log_pos 2112 CRC32 XXX GTID 0-1-8 /*!100001 SET @@session.gtid_seq_no=8*//*!*/; BEGIN /*!*/; -# at 2105 -#<date> server id 1 end_log_pos 2161 CRC32 XXX Table_map: `test`.`t2` mapped to number num -# at 2161 -#<date> server id 1 end_log_pos 2235 CRC32 XXX Update_rows: table id 31 flags: STMT_END_F +# at 2112 +#<date> server id 1 end_log_pos 2168 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 2168 +#<date> server id 1 end_log_pos 2242 CRC32 XXX Update_rows: table id 31 flags: STMT_END_F ### UPDATE `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -233,20 +233,20 @@ BEGIN ### @5=NULL /* INT meta=0 nullable=1 is_null=1 */ ### SET ### @5=5 /* INT meta=0 nullable=1 is_null=0 */ -# at 2235 -#<date> server id 1 end_log_pos 2308 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 2242 +#<date> server id 1 end_log_pos 2315 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 2308 -#<date> server id 1 end_log_pos 2350 CRC32 XXX GTID 0-1-9 +# at 2315 +#<date> server id 1 end_log_pos 2357 CRC32 XXX GTID 0-1-9 /*!100001 SET @@session.gtid_seq_no=9*//*!*/; BEGIN /*!*/; -# at 2350 -#<date> server id 1 end_log_pos 2406 CRC32 XXX Table_map: `test`.`t1` mapped to number num -# at 2406 -#<date> server id 1 end_log_pos 2460 CRC32 XXX Delete_rows: table id 30 flags: STMT_END_F +# at 2357 +#<date> server id 1 end_log_pos 2413 CRC32 XXX Table_map: `test`.`t1` mapped to number num +# at 2413 +#<date> server id 1 end_log_pos 2467 CRC32 XXX Delete_rows: table id 30 flags: STMT_END_F ### DELETE FROM `test`.`t1` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -259,20 +259,20 @@ BEGIN ### DELETE FROM `test`.`t1` ### WHERE ### @1=13 /* INT meta=0 nullable=0 is_null=0 */ -# at 2460 -#<date> server id 1 end_log_pos 2533 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 2467 +#<date> server id 1 end_log_pos 2540 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 2533 -#<date> server id 1 end_log_pos 2575 CRC32 XXX GTID 0-1-10 +# at 2540 +#<date> server id 1 end_log_pos 2582 CRC32 XXX GTID 0-1-10 /*!100001 SET @@session.gtid_seq_no=10*//*!*/; BEGIN /*!*/; -# at 2575 -#<date> server id 1 end_log_pos 2631 CRC32 XXX Table_map: `test`.`t2` mapped to number num -# at 2631 -#<date> server id 1 end_log_pos 2685 CRC32 XXX Delete_rows: table id 31 flags: STMT_END_F +# at 2582 +#<date> server id 1 end_log_pos 2638 CRC32 XXX Table_map: `test`.`t2` mapped to number num +# at 2638 +#<date> server id 1 end_log_pos 2692 CRC32 XXX Delete_rows: table id 31 flags: STMT_END_F ### DELETE FROM `test`.`t2` ### WHERE ### @1=10 /* INT meta=0 nullable=0 is_null=0 */ @@ -285,13 +285,13 @@ BEGIN ### DELETE FROM `test`.`t2` ### WHERE ### @1=13 /* INT meta=0 nullable=0 is_null=0 */ -# at 2685 -#<date> server id 1 end_log_pos 2758 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +# at 2692 +#<date> server id 1 end_log_pos 2765 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 SET TIMESTAMP=X/*!*/; COMMIT /*!*/; -# at 2758 -#<date> server id 1 end_log_pos 2806 CRC32 XXX Rotate to master-bin.000002 pos: 4 +# at 2765 +#<date> server id 1 end_log_pos 2813 CRC32 XXX Rotate to master-bin.000002 pos: 4 DELIMITER ; # End of log file ROLLBACK /* added by mysqlbinlog */; diff --git a/mysql-test/r/mysqlbinlog_stmt_compressed.result b/mysql-test/r/mysqlbinlog_stmt_compressed.result new file mode 100644 index 00000000000..99f9c7e9914 --- /dev/null +++ b/mysql-test/r/mysqlbinlog_stmt_compressed.result @@ -0,0 +1,207 @@ +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; +FLUSH BINARY LOGS; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at 4 +#<date> server id 1 end_log_pos 256 CRC32 XXX Start: xxx +ROLLBACK/*!*/; +# at 256 +#<date> server id 1 end_log_pos 285 CRC32 XXX Gtid list [] +# at 285 +#<date> server id 1 end_log_pos 329 CRC32 XXX Binlog checkpoint master-bin.000001 +# at 329 +#<date> server id 1 end_log_pos 371 CRC32 XXX GTID 0-1-1 ddl +/*!100101 SET @@session.skip_parallel_replication=0*//*!*/; +/*!100001 SET @@session.gtid_domain_id=0*//*!*/; +/*!100001 SET @@session.server_id=1*//*!*/; +/*!100001 SET @@session.gtid_seq_no=1*//*!*/; +# at 371 +#<date> server id 1 end_log_pos 533 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +use `test`/*!*/; +SET TIMESTAMP=X/*!*/; +SET @@session.pseudo_thread_id=4/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1, @@session.check_constraint_checks=1/*!*/; +SET @@session.sql_mode=1342177280/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at 533 +#<date> server id 1 end_log_pos 575 CRC32 XXX GTID 0-1-2 ddl +/*!100001 SET @@session.gtid_seq_no=2*//*!*/; +# at 575 +#<date> server id 1 end_log_pos 727 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)) +/*!*/; +# at 727 +#<date> server id 1 end_log_pos 769 CRC32 XXX GTID 0-1-3 +/*!100001 SET @@session.gtid_seq_no=3*//*!*/; +BEGIN +/*!*/; +# at 769 +#<date> server id 1 end_log_pos 897 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "") +/*!*/; +# at 897 +#<date> server id 1 end_log_pos 970 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 970 +#<date> server id 1 end_log_pos 1012 CRC32 XXX GTID 0-1-4 +/*!100001 SET @@session.gtid_seq_no=4*//*!*/; +BEGIN +/*!*/; +# at 1012 +#<date> server id 1 end_log_pos 1140 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL) +/*!*/; +# at 1140 +#<date> server id 1 end_log_pos 1213 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1213 +#<date> server id 1 end_log_pos 1255 CRC32 XXX GTID 0-1-5 +/*!100001 SET @@session.gtid_seq_no=5*//*!*/; +BEGIN +/*!*/; +# at 1255 +#<date> server id 1 end_log_pos 1385 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A") +/*!*/; +# at 1385 +#<date> server id 1 end_log_pos 1458 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1458 +#<date> server id 1 end_log_pos 1500 CRC32 XXX GTID 0-1-6 +/*!100001 SET @@session.gtid_seq_no=6*//*!*/; +BEGIN +/*!*/; +# at 1500 +#<date> server id 1 end_log_pos 1627 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A") +/*!*/; +# at 1627 +#<date> server id 1 end_log_pos 1700 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1700 +#<date> server id 1 end_log_pos 1742 CRC32 XXX GTID 0-1-7 +/*!100001 SET @@session.gtid_seq_no=7*//*!*/; +BEGIN +/*!*/; +# at 1742 +#<date> server id 1 end_log_pos 1850 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +INSERT INTO t2 SELECT * FROM t1 +/*!*/; +# at 1850 +#<date> server id 1 end_log_pos 1923 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 1923 +#<date> server id 1 end_log_pos 1965 CRC32 XXX GTID 0-1-8 +/*!100001 SET @@session.gtid_seq_no=8*//*!*/; +BEGIN +/*!*/; +# at 1965 +#<date> server id 1 end_log_pos 2082 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL +/*!*/; +# at 2082 +#<date> server id 1 end_log_pos 2155 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2155 +#<date> server id 1 end_log_pos 2197 CRC32 XXX GTID 0-1-9 +/*!100001 SET @@session.gtid_seq_no=9*//*!*/; +BEGIN +/*!*/; +# at 2197 +#<date> server id 1 end_log_pos 2288 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +DELETE FROM t1 +/*!*/; +# at 2288 +#<date> server id 1 end_log_pos 2361 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2361 +#<date> server id 1 end_log_pos 2403 CRC32 XXX GTID 0-1-10 +/*!100001 SET @@session.gtid_seq_no=10*//*!*/; +BEGIN +/*!*/; +# at 2403 +#<date> server id 1 end_log_pos 2494 CRC32 XXX Query_compressed thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +DELETE FROM t2 +/*!*/; +# at 2494 +#<date> server id 1 end_log_pos 2567 CRC32 XXX Query thread_id=4 exec_time=x error_code=0 +SET TIMESTAMP=X/*!*/; +COMMIT +/*!*/; +# at 2567 +#<date> server id 1 end_log_pos 2615 CRC32 XXX Rotate to master-bin.000002 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/; + +Test mysqlbinlog | mysql type point-in-time recovery with compressed events. + +FLUSH BINARY LOGS; +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, c VARCHAR(100)); +INSERT INTO t3 VALUES (0, 10, "hello"); +BEGIN; +INSERT INTO t3 VALUES (1, 10, "cat"), (2, 10, "mouse"), (3, 10, "dog"); +INSERT INTO t3 VALUES (4, 10, "goodbye"); +COMMIT; +DELETE FROM t3 WHERE a=2; +SELECT * FROM t3 ORDER BY a; +a b c +0 10 hello +1 10 cat +3 10 dog +4 10 goodbye +FLUSH LOGS; +DROP TABLE t3; +SELECT * FROM t3 ORDER BY a; +a b c +0 10 hello +1 10 cat +3 10 dog +4 10 goodbye +DROP TABLE t1,t2,t3; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index 32d38cd41ef..203921ce08f 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -344,6 +344,10 @@ The following options may be given as the first argument: We strongly recommend to use either --log-basename or specify a filename to ensure that replication doesn't stop if the real hostname of the computer changes. + --log-bin-compress Whether the binary log can be compressed + --log-bin-compress-min-len[=#] + Minimum length of sql statement(in statement mode) or + record(in row mode)that can be compressed. --log-bin-index=name File that holds the names for last binary log files. --log-bin-trust-function-creators @@ -1267,6 +1271,8 @@ lc-time-names en_US local-infile TRUE lock-wait-timeout 31536000 log-bin (No default value) +log-bin-compress FALSE +log-bin-compress-min-len 256 log-bin-index (No default value) log-bin-trust-function-creators FALSE log-error diff --git a/mysql-test/suite/binlog/r/binlog_variables_log_bin.result b/mysql-test/suite/binlog/r/binlog_variables_log_bin.result index 215e14f97df..d05a28847b4 100644 --- a/mysql-test/suite/binlog/r/binlog_variables_log_bin.result +++ b/mysql-test/suite/binlog/r/binlog_variables_log_bin.result @@ -3,6 +3,10 @@ Variable_name log_bin Value ON Variable_name log_bin_basename Value MYSQLTEST_VARDIR/mysqld.1/data/other +Variable_name log_bin_compress +Value OFF +Variable_name log_bin_compress_min_len +Value 256 Variable_name log_bin_index Value MYSQLTEST_VARDIR/mysqld.1/data/mysqld-bin.index Variable_name log_bin_trust_function_creators diff --git a/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result b/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result index fb7324ced34..09f2feae9c2 100644 --- a/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result +++ b/mysql-test/suite/binlog/r/binlog_variables_log_bin_index.result @@ -3,6 +3,10 @@ Variable_name log_bin Value ON Variable_name log_bin_basename Value MYSQLTEST_VARDIR/mysqld.1/data/other +Variable_name log_bin_compress +Value OFF +Variable_name log_bin_compress_min_len +Value 256 Variable_name log_bin_index Value MYSQLTEST_VARDIR/tmp/something.index Variable_name log_bin_trust_function_creators diff --git a/mysql-test/suite/rpl/r/rpl_binlog_compress.result b/mysql-test/suite/rpl/r/rpl_binlog_compress.result new file mode 100644 index 00000000000..d729611e885 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_binlog_compress.result @@ -0,0 +1,76 @@ +include/master-slave.inc +[connection master] +set @old_log_bin_compress=@@log_bin_compress; +set @old_log_bin_compress_min_len=@@log_bin_compress_min_len; +set @old_binlog_format=@@binlog_format; +set @old_binlog_row_image=@@binlog_row_image; +set global log_bin_compress=on; +set global log_bin_compress_min_len=10; +drop table if exists t1; +Warnings: +Note 1051 Unknown table 'test.t1' +CREATE TABLE t1 (pr_id int(10) unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, pr_page int(11) NOT NULL, pr_type varbinary(60) NOT NULL, test int, UNIQUE KEY pr_pagetype (pr_page,pr_type)) ENGINE=myisam AUTO_INCREMENT=136; +set binlog_format=statement; +insert into t1 (pr_page, pr_type, test) values(1,"one",0),(2,"two",0); +replace into t1 (pr_page, pr_type,test) values(1,"one",2); +update t1 set test=test+1 where pr_page > 1; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +connection slave; +connection slave; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +connection master; +set binlog_format=row; +insert into t1 (pr_page, pr_type, test) values(3,"three",0),(4,"four",4),(5, "five", 0); +replace into t1 (pr_page, pr_type,test) values(3,"one",2); +update t1 set test=test+1 where pr_page > 3; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +142 3 one 2 +connection slave; +connection slave; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +142 3 one 2 +connection master; +set binlog_row_image=minimal; +insert into t1 (pr_page, pr_type, test) values(6,"six",0),(7,"seven",7),(8, "eight", 0); +replace into t1 (pr_page, pr_type,test) values(6,"six",2); +update t1 set test=test+1 where pr_page > 6; +delete from t1 where test=1; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +144 7 seven 8 +142 3 one 2 +146 6 six 2 +connection slave; +connection slave; +select * from t1; +pr_id pr_page pr_type test +138 1 one 2 +140 4 four 5 +139 3 three 0 +144 7 seven 8 +142 3 one 2 +146 6 six 2 +connection master; +drop table t1; +set global log_bin_compress=@old_log_bin_compress; +set global log_bin_compress_min_len=@old_log_bin_compress_min_len; +set binlog_format=@old_binlog_format; +set binlog_row_image=@old_binlog_row_image; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_checksum.result b/mysql-test/suite/rpl/r/rpl_checksum.result index 820224d99da..e74e5af9f84 100644 --- a/mysql-test/suite/rpl/r/rpl_checksum.result +++ b/mysql-test/suite/rpl/r/rpl_checksum.result @@ -79,7 +79,7 @@ connection slave; set @@global.debug_dbug='d,simulate_slave_unaware_checksum'; start slave; include/wait_for_slave_io_error.inc [errno=1236] -Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 368, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 249.'' +Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 375, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 256.'' select count(*) as zero from t1; zero 0 diff --git a/mysql-test/suite/rpl/t/rpl_binlog_compress.test b/mysql-test/suite/rpl/t/rpl_binlog_compress.test new file mode 100644 index 00000000000..ef1e45084b6 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_binlog_compress.test @@ -0,0 +1,61 @@ +# +# Test of compressed binlog with replication +# + +source include/master-slave.inc; + +set @old_log_bin_compress=@@log_bin_compress; +set @old_log_bin_compress_min_len=@@log_bin_compress_min_len; +set @old_binlog_format=@@binlog_format; +set @old_binlog_row_image=@@binlog_row_image; + +set global log_bin_compress=on; +set global log_bin_compress_min_len=10; + +drop table if exists t1; +CREATE TABLE t1 (pr_id int(10) unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, pr_page int(11) NOT NULL, pr_type varbinary(60) NOT NULL, test int, UNIQUE KEY pr_pagetype (pr_page,pr_type)) ENGINE=myisam AUTO_INCREMENT=136; + +set binlog_format=statement; +insert into t1 (pr_page, pr_type, test) values(1,"one",0),(2,"two",0); +replace into t1 (pr_page, pr_type,test) values(1,"one",2); +update t1 set test=test+1 where pr_page > 1; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; + + +set binlog_format=row; +insert into t1 (pr_page, pr_type, test) values(3,"three",0),(4,"four",4),(5, "five", 0); +replace into t1 (pr_page, pr_type,test) values(3,"one",2); +update t1 set test=test+1 where pr_page > 3; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; + + +set binlog_row_image=minimal; +insert into t1 (pr_page, pr_type, test) values(6,"six",0),(7,"seven",7),(8, "eight", 0); +replace into t1 (pr_page, pr_type,test) values(6,"six",2); +update t1 set test=test+1 where pr_page > 6; +delete from t1 where test=1; + +select * from t1; +sync_slave_with_master; +connection slave; +select * from t1; +connection master; +drop table t1; + +set global log_bin_compress=@old_log_bin_compress; +set global log_bin_compress_min_len=@old_log_bin_compress_min_len; +set binlog_format=@old_binlog_format; +set binlog_row_image=@old_binlog_row_image; +--source include/rpl_end.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result index 335716e3df3..b435844b4b5 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result @@ -1619,6 +1619,34 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST OFF,ON READ_ONLY YES COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME LOG_BIN_COMPRESS +SESSION_VALUE NULL +GLOBAL_VALUE OFF +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE OFF +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE BOOLEAN +VARIABLE_COMMENT Whether the binary log can be compressed +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST OFF,ON +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL +VARIABLE_NAME LOG_BIN_COMPRESS_MIN_LEN +SESSION_VALUE NULL +GLOBAL_VALUE 256 +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE 256 +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE INT UNSIGNED +VARIABLE_COMMENT Minimum length of sql statement(in statement mode) or record(in row mode)that can be compressed. +NUMERIC_MIN_VALUE 10 +NUMERIC_MAX_VALUE 1024 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL VARIABLE_NAME LOG_BIN_TRUST_FUNCTION_CREATORS SESSION_VALUE NULL GLOBAL_VALUE ON diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 6cc43cdc1dc..a7d0cc0f804 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -1759,6 +1759,34 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY YES COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME LOG_BIN_COMPRESS +SESSION_VALUE NULL +GLOBAL_VALUE OFF +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE OFF +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE BOOLEAN +VARIABLE_COMMENT Whether the binary log can be compressed +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST OFF,ON +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL +VARIABLE_NAME LOG_BIN_COMPRESS_MIN_LEN +SESSION_VALUE NULL +GLOBAL_VALUE 256 +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE 256 +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE INT UNSIGNED +VARIABLE_COMMENT Minimum length of sql statement(in statement mode) or record(in row mode)that can be compressed. +NUMERIC_MIN_VALUE 10 +NUMERIC_MAX_VALUE 1024 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL VARIABLE_NAME LOG_BIN_INDEX SESSION_VALUE NULL GLOBAL_VALUE diff --git a/mysql-test/t/mysqlbinlog_row_compressed.test b/mysql-test/t/mysqlbinlog_row_compressed.test new file mode 100644 index 00000000000..1a7ce093986 --- /dev/null +++ b/mysql-test/t/mysqlbinlog_row_compressed.test @@ -0,0 +1,68 @@ +# +# Test for compressed row event +# + +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc + +# +# +# mysqlbinlog: compressed row event +# +# + +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; + +--let $binlog = query_get_value(SHOW MASTER STATUS, File, 1) +--let $datadir = `SELECT @@datadir` + +FLUSH BINARY LOGS; +--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR +--replace_regex /\d{6} *\d*:\d\d:\d\d/<date>/ /Start:.*at startup/Start: xxx/ /SET TIMESTAMP=\d*/SET TIMESTAMP=X/ /exec_time=\d*/exec_time=x/ /mapped to number \d*/mapped to number num/ /CRC32 0x[0-9a-f]+/CRC32 XXX/ +--exec $MYSQL_BINLOG --verbose --verbose --base64-output=DECODE-ROWS $datadir/$binlog + +--echo +--echo Test mysqlbinlog | mysql type point-in-time recovery with compressed events. +--echo + +FLUSH BINARY LOGS; +--let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1) +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, c VARCHAR(100)); +INSERT INTO t3 VALUES (0, 10, "hello"); +BEGIN; +INSERT INTO t3 VALUES (1, 10, "cat"), (2, 10, "mouse"), (3, 10, "dog"); +INSERT INTO t3 VALUES (4, 10, "goodbye"); +COMMIT; +UPDATE t3 SET b=b+100 where a<>1; +DELETE FROM t3 WHERE a=2; +SET @old_image=@@binlog_row_image; +SET binlog_row_image=minimal; +INSERT INTO t3 VALUES (5, 20, "red"), (6, 30, "green"), (7, 40, "blue"); +INSERT INTO t3 VALUES (8, 20, "rigel"); +UPDATE t3 SET c = concat("colour of ", c) WHERE a > 5; +UPDATE t3 SET b=b*2 WHERE a IN (5,6,7); +DELETE FROM t3 WHERE a=6; +SET binlog_row_image=@old_image; +SELECT * FROM t3 ORDER BY a; +FLUSH LOGS; +DROP TABLE t3; + +--let $MYSQLD_DATADIR= `select @@datadir` +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/$binlog_file | $MYSQL + +SELECT * FROM t3 ORDER BY a; + +DROP TABLE t1,t2,t3; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/mysql-test/t/mysqlbinlog_stmt_compressed.test b/mysql-test/t/mysqlbinlog_stmt_compressed.test new file mode 100644 index 00000000000..c4331ddf229 --- /dev/null +++ b/mysql-test/t/mysqlbinlog_stmt_compressed.test @@ -0,0 +1,59 @@ +# +# Test for compressed query event +# + +--source include/have_log_bin.inc +--source include/have_binlog_format_statement.inc + +# +# +# mysqlbinlog: compressed query event +# +# + +SET GLOBAL log_bin_compress=on; +SET GLOBAL log_bin_compress_min_len=10; +CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1)); +CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1)); +INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, ""); +INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL); +INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A"); +INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A"); +INSERT INTO t2 SELECT * FROM t1; +UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL; +DELETE FROM t1; +DELETE FROM t2; + +--let $binlog = query_get_value(SHOW MASTER STATUS, File, 1) +--let $datadir = `SELECT @@datadir` + +FLUSH BINARY LOGS; +--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR +--replace_regex /\d{6} *\d*:\d\d:\d\d/<date>/ /Start:.*at startup/Start: xxx/ /SET TIMESTAMP=\d*/SET TIMESTAMP=X/ /exec_time=\d*/exec_time=x/ /mapped to number \d*/mapped to number num/ /CRC32 0x[0-9a-f]+/CRC32 XXX/ +--exec $MYSQL_BINLOG --verbose --verbose --base64-output=DECODE-ROWS $datadir/$binlog + +--echo +--echo Test mysqlbinlog | mysql type point-in-time recovery with compressed events. +--echo + +FLUSH BINARY LOGS; +--let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1) +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, c VARCHAR(100)); +INSERT INTO t3 VALUES (0, 10, "hello"); +BEGIN; +INSERT INTO t3 VALUES (1, 10, "cat"), (2, 10, "mouse"), (3, 10, "dog"); +INSERT INTO t3 VALUES (4, 10, "goodbye"); +COMMIT; +DELETE FROM t3 WHERE a=2; +SELECT * FROM t3 ORDER BY a; +FLUSH LOGS; +DROP TABLE t3; + +--let $MYSQLD_DATADIR= `select @@datadir` +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/$binlog_file | $MYSQL + +SELECT * FROM t3 ORDER BY a; + +DROP TABLE t1,t2,t3; +SET GLOBAL log_bin_compress=off; +SET GLOBAL log_bin_compress_min_len=256; diff --git a/sql/log.cc b/sql/log.cc index 2c290715741..4d903154d98 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -9838,7 +9838,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ((last_gtid_standalone && !ev->is_part_of_group(typ)) || (!last_gtid_standalone && (typ == XID_EVENT || - (typ == QUERY_EVENT && + (LOG_EVENT_IS_QUERY(typ) && (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))))) { diff --git a/sql/log_event.cc b/sql/log_event.cc index 01bb5e7a561..422928495a5 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -51,6 +51,7 @@ #include "rpl_utility.h" #include "rpl_constants.h" #include "sql_digest.h" +#include "zlib.h" #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1)) @@ -702,6 +703,380 @@ char *str_to_hex(char *to, const char *from, uint len) return to; // pointer to end 0 of 'to' } +#define BINLOG_COMPRESSED_HEADER_LEN 1 +#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4 +/** + Compressed Record + Record Header: 1 Byte + 7 Bit: Always 1, mean compressed; + 4-6 Bit: Compressed algorithm - Always 0, means zlib + It maybe support other compression algorithm in the future. + 0-3 Bit: Bytes of "Record Original Length" + Record Original Length: 1-4 Bytes + Compressed Buf: +*/ + +/** + Get the length of compress content. +*/ + +uint32 binlog_get_compress_len(uint32 len) +{ + /* 5 for the begin content, 1 reserved for a '\0'*/ + return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES) + + compressBound(len) + 1); +} + +/** + Compress buf from 'src' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst', which + can be got by binlog_get_uncompress_len, is enough to hold + the content uncompressed. + 2) The 'comlen' should stored the length of 'dst', and it will + be set as the size of compressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen) +{ + uchar lenlen; + if (len & 0xFF000000) + { + dst[1] = uchar(len >> 24); + dst[2] = uchar(len >> 16); + dst[3] = uchar(len >> 8); + dst[4] = uchar(len); + lenlen = 4; + } + else if (len & 0x00FF0000) + { + dst[1] = uchar(len >> 16); + dst[2] = uchar(len >> 8); + dst[3] = uchar(len); + lenlen = 3; + } + else if (len & 0x0000FF00) + { + dst[1] = uchar(len >> 8); + dst[2] = uchar(len); + lenlen = 2; + } + else + { + dst[1] = uchar(len); + lenlen = 1; + } + dst[0] = 0x80 | (lenlen & 0x07); + + uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1; + if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, + (const Bytef *)src, (uLongf)len) != Z_OK) + { + return 1; + } + *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen; + return 0; +} + +/** + Convert a query_compressed_log_event to query_log_event + from 'src' to 'dst', the size after compression stored in 'newlen'. + + @Note: + 1) The caller should call my_free to release 'dst' if *is_malloc is + returned as true. + 2) If *is_malloc is retuened as false, then 'dst' reuses the passed-in + 'buf'. + + return zero if successful, non-zero otherwise. +*/ + +int +query_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, const char *src, ulong src_len, + char* buf, ulong buf_size, bool* is_malloc, char **dst, + ulong *newlen) +{ + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + const char *end = src + len; + + // bad event + if (src_len < len ) + return 1; + + DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT); + + uint8 common_header_len= description_event->common_header_len; + uint8 post_header_len= + description_event->post_header_len[QUERY_COMPRESSED_EVENT-1]; + + *is_malloc = false; + + tmp += common_header_len; + // bad event + if (end <= tmp) + return 1; + + uint db_len = (uint)tmp[Q_DB_LEN_OFFSET]; + uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET); + + tmp += post_header_len + status_vars_len + db_len + 1; + // bad event + if (end <= tmp) + return 1; + + int32 comp_len = len - (tmp - src) - + (contain_checksum ? BINLOG_CHECKSUM_LEN : 0); + uint32 un_len = binlog_get_uncompress_len(tmp); + + // bad event + if (comp_len < 0 || un_len == 0) + return 1; + + *newlen = (tmp - src) + un_len; + if(contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + uint32 alloc_size = ALIGN_SIZE(*newlen); + char *new_dst = NULL; + + + if (alloc_size <= buf_size) + { + new_dst = buf; + } + else + { + new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME)); + if (!new_dst) + return 1; + + *is_malloc = true; + } + + /* copy the head*/ + memcpy(new_dst, src , tmp - src); + if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), + comp_len, &un_len)) + { + if (*is_malloc) + my_free(new_dst); + + *is_malloc = false; + + return 1; + } + + new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT; + int4store(new_dst + EVENT_LEN_OFFSET, *newlen); + if(contain_checksum) + { + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(new_dst + clear_len, + my_checksum(0L, (uchar *)new_dst, clear_len)); + } + *dst = new_dst; + return 0; +} + +int +row_log_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, const char *src, ulong src_len, + char* buf, ulong buf_size, bool* is_malloc, char **dst, + ulong *newlen) +{ + Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET]; + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + char *new_dst = NULL; + const char *end = tmp + len; + + // bad event + if (src_len < len) + return 1; + + DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type)); + + uint8 common_header_len= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[type-1]; + + tmp += common_header_len + ROWS_HEADER_LEN_V1; + if (post_header_len == ROWS_HEADER_LEN_V2) + { + /* + Have variable length header, check length, + which includes length bytes + */ + + // bad event + if (end - tmp <= 2) + return 1; + + uint16 var_header_len= uint2korr(tmp); + DBUG_ASSERT(var_header_len >= 2); + + /* skip over var-len header, extracting 'chunks' */ + tmp += var_header_len; + + /* get the uncompressed event type */ + type= + (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT); + } + else + { + /* get the uncompressed event type */ + type= (Log_event_type) + (type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1); + } + + //bad event + if (end <= tmp) + return 1; + + ulong m_width = net_field_length((uchar **)&tmp); + tmp += (m_width + 7) / 8; + + if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT) + { + tmp += (m_width + 7) / 8; + } + + //bad event + if (end <= tmp) + return 1; + + uint32 un_len = binlog_get_uncompress_len(tmp); + //bad event + if (un_len == 0) + return 1; + + long comp_len = len - (tmp - src) - + (contain_checksum ? BINLOG_CHECKSUM_LEN : 0); + //bad event + if (comp_len <=0) + return 1; + + *newlen = (tmp - src) + un_len; + if(contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + uint32 alloc_size = ALIGN_SIZE(*newlen); + + *is_malloc = false; + if (alloc_size <= buf_size) + { + new_dst = buf; + } + else + { + new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME)); + if (!new_dst) + return 1; + + *is_malloc = true; + } + + /* Copy the head. */ + memcpy(new_dst, src , tmp - src); + /* Uncompress the body. */ + if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), + comp_len, &un_len)) + { + if (*is_malloc) + my_free(new_dst); + + return 1; + } + + new_dst[EVENT_TYPE_OFFSET] = type; + int4store(new_dst + EVENT_LEN_OFFSET, *newlen); + if(contain_checksum){ + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(new_dst + clear_len, + my_checksum(0L, (uchar *)new_dst, clear_len)); + } + *dst = new_dst; + return 0; +} + +/** + Get the length of uncompress content. + return 0 means error. +*/ + +uint32 binlog_get_uncompress_len(const char *buf) +{ + DBUG_ASSERT((buf[0] & 0xe0) == 0x80); + uint32 lenlen = buf[0] & 0x07; + uint32 len = 0; + switch(lenlen) + { + case 1: + len = uchar(buf[1]); + break; + case 2: + len = uchar(buf[1]) << 8 | uchar(buf[2]); + break; + case 3: + len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]); + break; + case 4: + len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | + uchar(buf[3]) << 8 | uchar(buf[4]); + break; + default: + DBUG_ASSERT(lenlen >= 1 && lenlen <= 4); + break; + } + return len; +} + +/** + Uncompress the content in 'src' with length of 'len' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst' (which + can be got by statement_get_uncompress_len) is enough to hold + the content uncompressed. + 2) The 'newlen' should stored the length of 'dst', and it will + be set as the size of uncompressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_uncompress(const char *src, char *dst, uint32 len, + uint32 *newlen) +{ + if((src[0] & 0x80) == 0) + { + return 1; + } + + uint32 lenlen= src[0] & 0x07; + uLongf buflen= *newlen; + + uint32 alg = (src[0] & 0x70) >> 4; + switch(alg) + { + case 0: + // zlib + if(uncompress((Bytef *)dst, &buflen, + (const Bytef*)src + 1 + lenlen, len - 1 - lenlen) != Z_OK) + { + return 1; + } + break; + default: + //TODO + //bad algorithm + return 1; + } + + DBUG_ASSERT(*newlen == (uint32)buflen); + *newlen = (uint32)buflen; + return 0; +} + #ifndef MYSQL_CLIENT /** @@ -828,6 +1203,13 @@ const char* Log_event::get_type_str(Log_event_type type) case TRANSACTION_CONTEXT_EVENT: return "Transaction_context"; case VIEW_CHANGE_EVENT: return "View_change"; case XA_PREPARE_LOG_EVENT: return "XA_prepare"; + case QUERY_COMPRESSED_EVENT: return "Query_compressed"; + case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed"; + case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed"; + case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed"; + case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1"; + case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1"; + case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1"; default: return "Unknown"; /* impossible */ } @@ -1661,6 +2043,10 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case QUERY_EVENT: ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT); break; + case QUERY_COMPRESSED_EVENT: + ev = new Query_compressed_log_event(buf, event_len, fdle, + QUERY_COMPRESSED_EVENT); + break; case LOAD_EVENT: ev = new Load_log_event(buf, event_len, fdle); break; @@ -1735,6 +2121,19 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ev = new Delete_rows_log_event(buf, event_len, fdle); break; + case WRITE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + ev = new Write_rows_compressed_log_event(buf, event_len, fdle); + break; + case UPDATE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + ev = new Update_rows_compressed_log_event(buf, event_len, fdle); + break; + case DELETE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + ev = new Delete_rows_compressed_log_event(buf, event_len, fdle); + break; + /* MySQL GTID events are ignored */ case GTID_LOG_EVENT: case ANONYMOUS_GTID_LOG_EVENT: @@ -1778,7 +2177,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, else { DBUG_PRINT("error",("Unknown event code: %d", - (int) buf[EVENT_TYPE_OFFSET])); + (uchar) buf[EVENT_TYPE_OFFSET])); ev= NULL; break; } @@ -2836,6 +3235,27 @@ void Log_event::print_base64(IO_CACHE* file, glob_description_event); break; } + case WRITE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Write_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } + case UPDATE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Update_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } + case DELETE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Delete_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } default: break; } @@ -3190,6 +3610,24 @@ bool Query_log_event::write() write_footer(); } +bool Query_compressed_log_event::write() +{ + const char *query_tmp = query; + uint32 q_len_tmp = q_len; + uint32 alloc_size; + bool ret = true; + q_len = alloc_size = binlog_get_compress_len(q_len); + query = (char *)my_safe_alloca(alloc_size); + if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len)) + { + ret = Query_log_event::write(); + } + my_safe_afree((void *)query, alloc_size); + query = query_tmp; + q_len = q_len_tmp; + return ret; +} + /** The simplest constructor that could possibly work. This is used for creating static objects that have a special meaning and are invisible @@ -3382,6 +3820,16 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d", (ulong) flags2, sql_mode, cache_type)); } + +Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, const char* query_arg, + ulong query_length, bool using_trans, + bool direct, bool suppress_use, int errcode) + :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct, + suppress_use, errcode), + query_buf(0) +{ + +} #endif /* MYSQL_CLIENT */ @@ -3788,6 +4236,39 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, DBUG_VOID_RETURN; } +Query_compressed_log_event::Query_compressed_log_event(const char *buf, + uint event_len, + const Format_description_log_event + *description_event, + Log_event_type event_type) + :Query_log_event(buf, event_len, description_event, event_type), + query_buf(NULL) +{ + if(query) + { + uint32 un_len=binlog_get_uncompress_len(query); + if (!un_len) + { + query = 0; + return; + } + + /* Reserve one byte for '\0' */ + query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1), + MYF(MY_WME)); + if(query_buf && + !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len)) + { + query_buf[un_len] = 0; + query = (const char *)query_buf; + q_len = un_len; + } + else + { + query= 0; + } + } +} /* Replace a binlog event read into a packet with a dummy event. Either a @@ -5056,6 +5537,15 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN; post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN; + //compressed event + post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + // Sanity-check that all post header lengths are initialized. int i; for (i=0; i<number_of_event_types; i++) @@ -9404,7 +9894,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); uint8 const common_header_len= description_event->common_header_len; - Log_event_type event_type= (Log_event_type) buf[EVENT_TYPE_OFFSET]; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; m_type= event_type; uint8 const post_header_len= description_event->post_header_len[event_type-1]; @@ -9503,8 +9993,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */ - if ((event_type == UPDATE_ROWS_EVENT) || - (event_type == UPDATE_ROWS_EVENT_V1)) + if (LOG_EVENT_IS_UPDATE_ROW(event_type)) { DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); @@ -9551,6 +10040,35 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, DBUG_VOID_RETURN; } +void Rows_log_event::uncompress_buf() +{ + uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf); + if (!un_len) + return; + + uchar *new_buf= (uchar*) my_malloc(ALIGN_SIZE(un_len), MYF(MY_WME)); + if (new_buf) + { + if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, + m_rows_cur - m_rows_buf, &un_len)) + { + my_free(m_rows_buf); + m_rows_buf = new_buf; +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + m_curr_row= m_rows_buf; +#endif + m_rows_end= m_rows_buf + un_len; + m_rows_cur= m_rows_end; + return; + } + else + { + my_free(new_buf); + } + } + m_cols.bitmap= 0; // catch it in is_valid +} + Rows_log_event::~Rows_log_event() { if (m_cols.bitmap == m_bitbuf) // no my_malloc happened @@ -9573,7 +10091,8 @@ int Rows_log_event::get_data_size() (m_rows_cur - m_rows_buf);); int data_size= 0; - bool is_v2_event= get_type_code() > DELETE_ROWS_EVENT_V1; + Log_event_type type = get_type_code(); + bool is_v2_event= LOG_EVENT_IS_ROW_V2(type); if (is_v2_event) { data_size= ROWS_HEADER_LEN_V2 + @@ -10388,6 +10907,27 @@ bool Rows_log_event::write_data_body() return res; } + +bool Rows_log_event::write_compressed() +{ + uchar *m_rows_buf_tmp = m_rows_buf; + uchar *m_rows_cur_tmp = m_rows_cur; + bool ret = true; + uint32 comlen, alloc_size; + comlen= alloc_size= binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); + m_rows_buf = (uchar *)my_safe_alloca(alloc_size); + if(m_rows_buf && + !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, + m_rows_cur_tmp - m_rows_buf_tmp, &comlen)) + { + m_rows_cur= comlen + m_rows_buf; + ret= Log_event::write(); + } + my_safe_afree(m_rows_buf, alloc_size); + m_rows_buf= m_rows_buf_tmp; + m_rows_cur= m_rows_cur_tmp; + return ret; +} #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) @@ -11302,6 +11842,21 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, is_transactional, WRITE_ROWS_EVENT_V1) { } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event( + THD *thd_arg, + TABLE *tbl_arg, + ulong tid_arg, + bool is_transactional) + : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) +{ + m_type = WRITE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Write_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} #endif /* @@ -11314,6 +11869,15 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event( + const char *buf, uint event_len, + const Format_description_log_event + *description_event) +: Write_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -11805,6 +12369,28 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) {DBUG_SET("+d,simulate_my_b_fill_error");}); Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } + +void Write_rows_compressed_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + bool is_malloc = false; + if(!row_log_event_uncompress(glob_description_event, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, + "Write_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, + "ERROR: uncompress write_compressed_rows failed\n"); + } +} #endif @@ -11990,7 +12576,7 @@ void issue_long_find_row_warning(Log_event_type type, if (delta > LONG_FIND_ROW_THRESHOLD) { rgi->set_long_find_row_note_printed(); - const char* evt_type= type == DELETE_ROWS_EVENT ? " DELETE" : "n UPDATE"; + const char* evt_type= LOG_EVENT_IS_DELETE_ROW(type) ? " DELETE" : "n UPDATE"; const char* scan_type= is_index_scan ? "scanning an index" : "scanning the table"; sql_print_information("The slave is applying a ROW event on behalf of a%s statement " @@ -12325,6 +12911,20 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, DELETE_ROWS_EVENT_V1) { } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( + THD *thd_arg, TABLE *tbl_arg, + ulong tid_arg, + bool is_transactional) + : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) +{ + m_type= DELETE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Delete_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} #endif /* #if !defined(MYSQL_CLIENT) */ /* @@ -12337,6 +12937,15 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( + const char *buf, uint event_len, + const Format_description_log_event + *description_event) + : Delete_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12434,6 +13043,28 @@ void Delete_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } + +void Delete_rows_compressed_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + bool is_malloc = false; + if(!row_log_event_uncompress(glob_description_event, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, + "Delete_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, + "ERROR: uncompress delete_compressed_rows failed\n"); + } +} #endif @@ -12461,6 +13092,19 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, init(tbl_arg->rpl_write_set); } +Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid, + bool is_transactional) +: Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional) +{ + m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Update_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} + void Update_rows_log_event::init(MY_BITMAP const *cols) { /* if my_bitmap_init fails, caught in is_valid() */ @@ -12499,6 +13143,15 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Update_rows_compressed_log_event::Update_rows_compressed_log_event( + const char *buf, uint event_len, + const Format_description_log_event + *description_event) + : Update_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12651,6 +13304,27 @@ void Update_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } + +void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + char *new_buf; + ulong len; + bool is_malloc= false; + if(!row_log_event_uncompress(glob_description_event, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, UINT_MAX32, NULL, 0, &is_malloc, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, + "Update_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, + "ERROR: uncompress update_compressed_rows failed\n"); + } +} #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) @@ -12778,7 +13452,7 @@ err: DBUG_ASSERT(error != 0); sql_print_error("Error in Log_event::read_log_event(): " "'%s', data_len: %d, event_type: %d", - error,data_len,head[EVENT_TYPE_OFFSET]); + error,data_len,(uchar)head[EVENT_TYPE_OFFSET]); } (*arg_buf)+= data_len; (*arg_buf_len)-= data_len; diff --git a/sql/log_event.h b/sql/log_event.h index 306f78ca4c9..bbefbe26f41 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -692,11 +692,75 @@ enum Log_event_type START_ENCRYPTION_EVENT= 164, + /* + Compressed binlog event. + + Note that the order between WRITE/UPDATE/DELETE events is significant; + this is so that we can convert from the compressed to the uncompressed + event type with (type-WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT) + and similar for _V1. + */ + QUERY_COMPRESSED_EVENT = 165, + WRITE_ROWS_COMPRESSED_EVENT_V1 = 166, + UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167, + DELETE_ROWS_COMPRESSED_EVENT_V1 = 168, + WRITE_ROWS_COMPRESSED_EVENT = 169, + UPDATE_ROWS_COMPRESSED_EVENT = 170, + DELETE_ROWS_COMPRESSED_EVENT = 171, + /* Add new MariaDB events here - right above this comment! */ ENUM_END_EVENT /* end marker */ }; +static inline bool LOG_EVENT_IS_QUERY(enum Log_event_type type) +{ + return type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT; +} + + +static inline bool LOG_EVENT_IS_WRITE_ROW(enum Log_event_type type) +{ + return type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || + type == WRITE_ROWS_COMPRESSED_EVENT || + type == WRITE_ROWS_COMPRESSED_EVENT_V1; +} + + +static inline bool LOG_EVENT_IS_UPDATE_ROW(enum Log_event_type type) +{ + return type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || + type == UPDATE_ROWS_COMPRESSED_EVENT || + type == UPDATE_ROWS_COMPRESSED_EVENT_V1; +} + + +static inline bool LOG_EVENT_IS_DELETE_ROW(enum Log_event_type type) +{ + return type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || + type == DELETE_ROWS_COMPRESSED_EVENT || + type == DELETE_ROWS_COMPRESSED_EVENT_V1; +} + + +static inline bool LOG_EVENT_IS_ROW_COMPRESSED(enum Log_event_type type) +{ + return type == WRITE_ROWS_COMPRESSED_EVENT || + type == WRITE_ROWS_COMPRESSED_EVENT_V1 || + type == UPDATE_ROWS_COMPRESSED_EVENT || + type == UPDATE_ROWS_COMPRESSED_EVENT_V1 || + type == DELETE_ROWS_COMPRESSED_EVENT || + type == DELETE_ROWS_COMPRESSED_EVENT_V1; +} + + +static inline bool LOG_EVENT_IS_ROW_V2(enum Log_event_type type) +{ + return (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT) || + (type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT); +} + + /* The number of types we handle in Format_description_log_event (UNKNOWN_EVENT is not to be handled, it does not exist in binlogs, it does not have a @@ -2045,9 +2109,37 @@ public: /* !!! Public in this patch to allow old usage */ !strncasecmp(query, "SAVEPOINT", 9) || !strncasecmp(query, "ROLLBACK", 8); } - bool is_begin() { return !strcmp(query, "BEGIN"); } - bool is_commit() { return !strcmp(query, "COMMIT"); } - bool is_rollback() { return !strcmp(query, "ROLLBACK"); } + virtual bool is_begin() { return !strcmp(query, "BEGIN"); } + virtual bool is_commit() { return !strcmp(query, "COMMIT"); } + virtual bool is_rollback() { return !strcmp(query, "ROLLBACK"); } +}; + +class Query_compressed_log_event:public Query_log_event{ +protected: + Log_event::Byte* query_buf; // point to the uncompressed query +public: + Query_compressed_log_event(const char* buf, uint event_len, + const Format_description_log_event *description_event, + Log_event_type event_type); + ~Query_compressed_log_event() + { + if (query_buf) + my_free(query_buf); + } + Log_event_type get_type_code() { return QUERY_COMPRESSED_EVENT; } + + /* + the min length of log_bin_compress_min_len is 10, + means that Begin/Commit/Rollback would never be compressed! + */ + virtual bool is_begin() { return false; } + virtual bool is_commit() { return false; } + virtual bool is_rollback() { return false; } +#ifdef MYSQL_SERVER + Query_compressed_log_event(THD* thd_arg, const char* query_arg, ulong query_length, + bool using_trans, bool direct, bool suppress_use, int error); + virtual bool write(); +#endif }; @@ -4341,6 +4433,7 @@ public: #ifdef MYSQL_SERVER virtual bool write_data_header(); virtual bool write_data_body(); + virtual bool write_compressed(); virtual const char *get_db() { return m_table->s->db.str; } #endif /* @@ -4375,6 +4468,7 @@ protected: #endif Rows_log_event(const char *row_data, uint event_len, const Format_description_log_event *description_event); + void uncompress_buf(); #ifdef MYSQL_CLIENT void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); @@ -4587,6 +4681,23 @@ private: #endif }; +class Write_rows_compressed_log_event : public Write_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id, + bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Write_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; /** @class Update_rows_log_event @@ -4657,6 +4768,24 @@ protected: #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ }; +class Update_rows_compressed_log_event : public Update_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id, + bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Update_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + /** @class Delete_rows_log_event @@ -4723,6 +4852,23 @@ protected: #endif }; +class Delete_rows_compressed_log_event : public Delete_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Delete_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + #include "log_event_old.h" @@ -4964,4 +5110,19 @@ extern TYPELIB binlog_checksum_typelib; @} (end of group Replication) */ + +int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen); +int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen); +uint32 binlog_get_compress_len(uint32 len); +uint32 binlog_get_uncompress_len(const char *buf); + +int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, ulong src_len, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen); + +int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, ulong src_len, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen); + + #endif /* _log_event_h */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 310ccb047c4..8881e0423f5 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -389,6 +389,8 @@ static DYNAMIC_ARRAY all_options; /* Global variables */ bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0; +bool opt_bin_log_compress; +uint opt_bin_log_compress_min_len; my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0; my_bool debug_assert_on_not_freed_memory= 0; my_bool disable_log_notes; diff --git a/sql/mysqld.h b/sql/mysqld.h index 294b463161b..02bbdf839c1 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -109,7 +109,8 @@ extern CHARSET_INFO *character_set_filesystem; extern MY_BITMAP temp_pool; extern bool opt_large_files; -extern bool opt_update_log, opt_bin_log, opt_error_log; +extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress; +extern uint opt_bin_log_compress_min_len; extern my_bool opt_log, opt_bootstrap; extern my_bool opt_backup_history_log; extern my_bool opt_backup_progress_log; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index eeb96d8608a..ec5ea725487 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -645,7 +645,7 @@ is_group_ending(Log_event *ev, Log_event_type event_type) { if (event_type == XID_EVENT) return 1; - if (event_type == QUERY_EVENT) + if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed { Query_log_event *qev = (Query_log_event *)ev; if (qev->is_commit()) @@ -2511,7 +2511,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); if (typ == XID_EVENT || - (typ == QUERY_EVENT && + (typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback()))) rli->gtid_skip_flag= GTID_SKIP_NOT; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 9e2977c8bc5..f687f645171 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1732,6 +1732,12 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, case DELETE_ROWS_EVENT: case UPDATE_ROWS_EVENT: case WRITE_ROWS_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: /* After the last Rows event has been applied, the saved Annotate_rows event (if any) is not needed anymore and can be deleted. diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index ade788623e3..75eb8931c35 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7237,3 +7237,5 @@ ER_INVALID_DEFAULT_PARAM ukr "Значення за замовчуванням не підтримано для цього випадку використання параьетра" ER_BINLOG_NON_SUPPORTED_BULK eng "Only row based replication supported for bulk operations" +ER_BINLOG_UNCOMPRESS_ERROR + eng "Uncompress the compressed binlog failed" diff --git a/sql/slave.cc b/sql/slave.cc index 3a6517f54c4..f59a9142c8d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3772,7 +3772,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) } /* Check for an event that starts or stops a transaction */ - if (typ == QUERY_EVENT) + if (LOG_EVENT_IS_QUERY(typ)) { Query_log_event *qev= (Query_log_event*) ev; /* @@ -3912,7 +3912,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", if ((typ == XID_EVENT) || - ((typ == QUERY_EVENT) && + (LOG_EVENT_IS_QUERY(typ) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); @@ -5710,6 +5710,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) bool gtid_skip_enqueue= false; bool got_gtid_event= false; rpl_gtid event_gtid; + bool is_compress_event = false; + char* new_buf = NULL; + char new_buf_arr[4096]; + bool is_malloc = false; /* FD_q must have been prepared for the first R_a event @@ -5756,7 +5760,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { char *debug_event_buf_c = (char*) buf; int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); @@ -6190,6 +6194,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) inc_pos= event_len; } break; + /* + Binlog compressed event should uncompress in IO thread + */ + case QUERY_COMPRESSED_EVENT: + inc_pos= event_len; + if (query_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + buf = new_buf; + is_compress_event = true; + goto default_action; + + case WRITE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + inc_pos = event_len; + { + if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + } + buf = new_buf; + is_compress_event = true; + goto default_action; #ifndef DBUG_OFF case XID_EVENT: @@ -6208,7 +6257,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_after_2_events", { if (mi->dbug_do_disconnect && - (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) || + (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) || ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) && (--mi->dbug_event_counter == 0)) { @@ -6221,7 +6270,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_before_commit", { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg))) { @@ -6241,7 +6290,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ++mi->events_queued_since_last_gtid; } - inc_pos= event_len; + if (!is_compress_event) + inc_pos= event_len; + break; } @@ -6332,8 +6383,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* everything is filtered out from non-master */ (s_id != mi->master_id || /* for the master meta information is necessary */ - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || /* Check whether it needs to be filtered based on domain_id @@ -6362,9 +6413,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ if (!(s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -6405,7 +6456,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) buf[EVENT_TYPE_OFFSET])) || (!mi->last_queued_gtid_standalone && ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg)))))) { @@ -6435,6 +6486,9 @@ err: mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); + if(is_malloc) + my_free((void *)new_buf); + DBUG_RETURN(error); } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index b896f4567af..8fabc8f593e 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -6386,7 +6386,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= + Rows_log_event* ev; + if (binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Write_rows_compressed_log_event*>(0)); + else + ev = binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -6434,8 +6441,15 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, DBUG_DUMP("after_row", after_row, after_size); #endif - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(before_size + after_size)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + before_size + after_size, is_trans, + static_cast<Update_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, before_size + after_size, is_trans, static_cast<Update_rows_log_event*>(0)); @@ -6487,8 +6501,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Delete_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Delete_rows_log_event*>(0)); @@ -6953,15 +6974,27 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, flush the pending rows event if necessary. */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + int error = 0; + /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query log event is written to the binary log, we pretend that no table maps were written. - */ - int error= mysql_bin_log.write(&qinfo); + */ + if(binlog_should_compress(query_len)) + { + Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + else + { + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + binlog_table_maps= 0; DBUG_RETURN(error); } diff --git a/sql/sql_class.h b/sql/sql_class.h index 93d3a4a0cd4..eeb6c917054 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5698,6 +5698,12 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage, #define THD_EXIT_COND(P1, P2) \ thd_exit_cond(P1, P2, __func__, __FILE__, __LINE__) +inline bool binlog_should_compress(ulong len) +{ + return opt_bin_log_compress && + len >= opt_bin_log_compress_min_len; +} + #endif /* MYSQL_SERVER */ #endif /* SQL_CLASS_INCLUDED */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index bdf90f7caf6..c115ac5f0ec 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1629,7 +1629,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset, break; case GTID_UNTIL_STOP_AFTER_TRANSACTION: if (event_type != XID_EVENT && - (event_type != QUERY_EVENT || + (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ !Query_log_event::peek_is_commit_rollback (info->packet->ptr()+*ev_offset, info->packet->length()-*ev_offset, @@ -1863,7 +1863,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, return NULL; case GTID_SKIP_TRANSACTION: if (event_type == XID_EVENT || - (event_type == QUERY_EVENT && + (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, len - ev_offset, current_checksum_alg))) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index b6359ffbd48..27e6463eb58 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1157,6 +1157,18 @@ static Sys_var_mybool Sys_log_bin( "log_bin", "Whether the binary log is enabled", READ_ONLY GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(FALSE)); +static Sys_var_mybool Sys_log_bin_compress( + "log_bin_compress", "Whether the binary log can be compressed", + GLOBAL_VAR(opt_bin_log_compress), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +/* the min length is 10, means that Begin/Commit/Rollback would never be compressed! */ +static Sys_var_uint Sys_log_bin_compress_min_len( + "log_bin_compress_min_len", + "Minimum length of sql statement(in statement mode) or record(in row mode)" + "that can be compressed.", + GLOBAL_VAR(opt_bin_log_compress_min_len), + CMD_LINE(OPT_ARG), VALID_RANGE(10, 1024), DEFAULT(256), BLOCK_SIZE(1)); + static Sys_var_mybool Sys_trust_function_creators( "log_bin_trust_function_creators", "If set to FALSE (the default), then when --log-bin is used, creation " |