diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result | 42 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt | 2 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test | 66 | ||||
-rw-r--r-- | sql/rpl_mi.h | 21 | ||||
-rw-r--r-- | sql/slave.cc | 200 |
5 files changed, 321 insertions, 10 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result b/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result new file mode 100644 index 00000000000..dc6a67b48d2 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result @@ -0,0 +1,42 @@ +include/master-slave.inc +[connection master] +connection slave; +call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement"); +call mtr.add_suppression("Relay log write failure: could not queue event from master"); +SET @save_debug= @@global.debug; +SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss"; +include/stop_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT, MASTER_USE_GTID=SLAVE_POS; +connection master; +CREATE TABLE t (a INT, b text(8192));; +INSERT INTO t values (1, repeat('b', 8192)), (1, repeat('b', 8192)); +connection slave; +START SLAVE IO_THREAD; +include/wait_for_slave_io_error.inc [errno=1595] +SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss"; +include/start_slave.inc +connection master; +connection slave; +connection slave; +include/stop_slave.inc +connection master; +SET @save_log_bin_compress= @@GLOBAL.log_bin_compress; +SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len; +SET @@GLOBAL.log_bin_compress=ON; +SET @@GLOBAL.log_bin_compress_min_len=10; +INSERT INTO t values (2, repeat('b', 8192)), (2, repeat('b', 8192)); +connection slave; +SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss"; +START SLAVE IO_THREAD; +include/wait_for_slave_io_error.inc [errno=1595] +SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss"; +include/start_slave.inc +connection master; +connection slave; +connection master; +SET @@GLOBAL.log_bin_compress= @save_log_bin_compress; +SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len; +DROP TABLE t; +connection slave; +SET GLOBAL debug_dbug= @save_debug; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt new file mode 100644 index 00000000000..144bbca0730 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt @@ -0,0 +1,2 @@ +--binlog-row-event-max-size=8192 + diff --git a/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test new file mode 100644 index 00000000000..5b2d99f3bf1 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test @@ -0,0 +1,66 @@ +--source include/have_debug.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc + +# Loss of STMT_END flagged event must error out the IO thread +--connection slave +call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement"); +call mtr.add_suppression("Relay log write failure: could not queue event from master"); + +SET @save_debug= @@global.debug; +SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss"; +--source include/stop_slave.inc +--replace_result $MASTER_MYPORT MASTER_PORT +--eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, MASTER_USE_GTID=SLAVE_POS + +--connection master +--let $max_row_size=8192 +--eval CREATE TABLE t (a INT, b text($max_row_size)); +--eval INSERT INTO t values (1, repeat('b', $max_row_size)), (1, repeat('b', $max_row_size)) + +# Prove that the missed STMT_END marked rows-event causes the io thread stop. +--connection slave +START SLAVE IO_THREAD; +--let $slave_io_errno=1595 +--source include/wait_for_slave_io_error.inc +SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss"; +--source include/start_slave.inc + +--connection master +sync_slave_with_master; + +# Compressed version of the above +--connection slave +--source include/stop_slave.inc + +--connection master +SET @save_log_bin_compress= @@GLOBAL.log_bin_compress; +SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len; + +SET @@GLOBAL.log_bin_compress=ON; +SET @@GLOBAL.log_bin_compress_min_len=10; + +--eval INSERT INTO t values (2, repeat('b', $max_row_size)), (2, repeat('b', $max_row_size)) + +# Prove that the missed STMT_END marked rows-event causes the io thread stop. +--connection slave +SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss"; +START SLAVE IO_THREAD; +--let $slave_io_errno=1595 +--source include/wait_for_slave_io_error.inc +SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss"; +--source include/start_slave.inc + +--connection master +sync_slave_with_master; + +# cleanup + +--connection master +SET @@GLOBAL.log_bin_compress= @save_log_bin_compress; +SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len; +DROP TABLE t; +sync_slave_with_master; +SET GLOBAL debug_dbug= @save_debug; + +--source include/rpl_end.inc diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index ccc1be6e5ce..b304e45f86a 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -133,6 +133,19 @@ public: extern TYPELIB slave_parallel_mode_typelib; +typedef struct st_rows_event_tracker +{ + char binlog_file_name[FN_REFLEN]; + my_off_t first_seen; + my_off_t last_seen; + bool stmt_end_seen; + void update(const char* file_name, size_t pos, + const char* buf, + const Format_description_log_event *fdle); + void reset(); + bool check_and_report(const char* file_name, size_t pos); +} Rows_event_tracker; + /***************************************************************************** Replication IO Thread @@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + /** + The struct holds some history of Rows- log-event reading/queuing + by the receiver thread. Its fields are updated per each such event + at time of queue_event(), and they are checked to detect + the Rows- event group integrity violation at time of first non-Rows- + event gets handled. + */ + Rows_event_tracker rows_event_tracker; bool in_start_all_slaves, in_stop_all_slaves; bool in_flush_all_relay_logs; uint users; /* Active user for object */ diff --git a/sql/slave.cc b/sql/slave.cc index c686553fdd5..e5c502c2de5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3349,7 +3349,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, we suppress prints to .err file as long as the reconnect happens without problems */ - *suppress_warnings= TRUE; + *suppress_warnings= + global_system_variables.log_warnings < 2 ? TRUE : FALSE; } else { @@ -4274,6 +4275,7 @@ pthread_handler_t handle_slave_io(void *arg) mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); mysql_cond_broadcast(&mi->start_cond); + mi->rows_event_tracker.reset(); DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", mi->master_log_name, mi->master_log_pos)); @@ -4356,6 +4358,10 @@ connected: */ mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; mi->gtid_event_seen= false; + /* + Reset stale state of the rows-event group tracker at reconnect. + */ + mi->rows_event_tracker.reset(); } #ifdef ENABLED_DEBUG_SYNC @@ -5752,7 +5758,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) char* new_buf = NULL; char new_buf_arr[4096]; bool is_malloc = false; - + bool is_rows_event= false; /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -6186,11 +6192,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) got_gtid_event= true; if (mi->using_gtid == Master_info::USE_GTID_NO) goto default_action; - if (unlikely(!mi->gtid_event_seen)) + if (unlikely(mi->gtid_reconnect_event_skip_count)) { - mi->gtid_event_seen= true; - if (mi->gtid_reconnect_event_skip_count) + if (likely(!mi->gtid_event_seen)) { + mi->gtid_event_seen= true; /* If we are reconnecting, and we need to skip a partial event group already queued to the relay log before the reconnect, then we check @@ -6219,13 +6225,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); goto err; } + if (global_system_variables.log_warnings > 1) + { + bool first= true; + StringBuffer<1024> gtid_text; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_information("Slave IO thread is reconnected to " + "receive Gtid_log_event %s. It is to skip %llu " + "already received events including the gtid one", + gtid_text.ptr(), + mi->events_queued_since_last_gtid); + } + goto default_action; } - } + else + { + bool first; + StringBuffer<1024> gtid_text; - if (unlikely(mi->gtid_reconnect_event_skip_count)) - { - goto default_action; + gtid_text.append(STRING_WITH_LEN("Last received gtid: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + gtid_text.append(STRING_WITH_LEN(", currently received: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &event_gtid, &first); + + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Slave IO thread has received a new Gtid_log_event " + "while skipping already logged events " + "after reconnect. %s. %llu remains to be skipped. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + goto err; + } } + mi->gtid_event_seen= true; /* We have successfully queued to relay log everything before this GTID, so @@ -6292,8 +6330,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } } - buf = new_buf; is_compress_event = true; + buf = new_buf; + /* + As we are uncertain about compressed V2 rows events, we don't track + them + */ + if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET])) + goto default_action; + /* fall through */ + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + { + is_rows_event= true; + mi->rows_event_tracker.update(mi->master_log_name, + mi->master_log_pos, + buf, + mi->rli.relay_log. + description_event_for_queue); + + DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss", + { + mi->rows_event_tracker.stmt_end_seen= false; + }); + } goto default_action; #ifndef DBUG_OFF @@ -6352,6 +6416,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } /* + Integrity of Rows- event group check. + A sequence of Rows- events must end with STMT_END_F flagged one. + Even when Heartbeat event interrupts Rows- events flow this must indicate a + malfunction e.g logging on the master. + */ + if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) && + !is_rows_event && + mi->rows_event_tracker.check_and_report(mi->master_log_name, + mi->master_log_pos)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + + /* If we filter events master-side (eg. @@skip_replication), we will see holes in the event positions from the master. If we see such a hole, adjust mi->master_log_pos accordingly so we maintain the correct position (for @@ -6519,6 +6598,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) The whole of the current event group is queued. So in case of reconnect we can start from after the current GTID. */ + if (mi->gtid_reconnect_event_skip_count) + { + bool first= true; + StringBuffer<1024> gtid_text; + + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_error("Slave IO thread received a terminal event from " + "group %s whose retrieval was interrupted " + "with reconnect. We still had %llu events to read. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + } mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; @@ -7518,6 +7612,92 @@ bool rpl_master_erroneous_autoinc(THD *thd) return FALSE; } + +static bool get_row_event_stmt_end(const char* buf, + const Format_description_log_event *fdle) +{ + uint8 const common_header_len= fdle->common_header_len; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; + + uint8 const post_header_len= fdle->post_header_len[event_type-1]; + const char *flag_start= buf + common_header_len; + /* + The term 4 below signifies that master is of 'an intermediate source', see + Rows_log_event::Rows_log_event. + */ + flag_start += RW_MAPID_OFFSET + (post_header_len == 6) ? 4 : RW_FLAGS_OFFSET; + + return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; +} + + +/* + Reset log event tracking data. +*/ + +void Rows_event_tracker::reset() +{ + binlog_file_name[0]= 0; + first_seen= last_seen= 0; + stmt_end_seen= false; +} + + +/* + Update log event tracking data. + + The first- and last- seen event binlog position get memorized, as + well as the end-of-statement status of the last one. +*/ + +void Rows_event_tracker::update(const char* file_name, size_t pos, + const char* buf, + const Format_description_log_event *fdle) +{ + if (!first_seen) + { + first_seen= pos; + strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1); + } + last_seen= pos; + DBUG_ASSERT(stmt_end_seen == 0); // We can only have one + stmt_end_seen= get_row_event_stmt_end(buf, fdle); +}; + + +/** + The function is called at next event reading + after a sequence of Rows- log-events. It checks the end-of-statement status + of the past sequence to report on any isssue. + In the positive case the tracker gets reset. + + @return true when the Rows- event group integrity found compromised, + false otherwise. +*/ +bool Rows_event_tracker::check_and_report(const char* file_name, + size_t pos) +{ + if (last_seen) + { + // there was at least one "block" event previously + if (!stmt_end_seen) + { + sql_print_error("Slave IO thread did not receive an expected " + "Rows-log end-of-statement for event starting " + "at log '%s' position %llu " + "whose last block was seen at log '%s' position %llu. " + "The end-of-statement should have been delivered " + "before the current one at log '%s' position %llu", + binlog_file_name, first_seen, + binlog_file_name, last_seen, file_name, pos); + return true; + } + reset(); + } + + return false; +} + /** @} (end of group Replication) */ |