summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result42
-rw-r--r--mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt2
-rw-r--r--mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test66
-rw-r--r--sql/rpl_mi.h21
-rw-r--r--sql/slave.cc200
5 files changed, 321 insertions, 10 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result b/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result
new file mode 100644
index 00000000000..dc6a67b48d2
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_row_end_of_statement_loss.result
@@ -0,0 +1,42 @@
+include/master-slave.inc
+[connection master]
+connection slave;
+call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement");
+call mtr.add_suppression("Relay log write failure: could not queue event from master");
+SET @save_debug= @@global.debug;
+SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
+include/stop_slave.inc
+CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT, MASTER_USE_GTID=SLAVE_POS;
+connection master;
+CREATE TABLE t (a INT, b text(8192));;
+INSERT INTO t values (1, repeat('b', 8192)), (1, repeat('b', 8192));
+connection slave;
+START SLAVE IO_THREAD;
+include/wait_for_slave_io_error.inc [errno=1595]
+SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
+include/start_slave.inc
+connection master;
+connection slave;
+connection slave;
+include/stop_slave.inc
+connection master;
+SET @save_log_bin_compress= @@GLOBAL.log_bin_compress;
+SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len;
+SET @@GLOBAL.log_bin_compress=ON;
+SET @@GLOBAL.log_bin_compress_min_len=10;
+INSERT INTO t values (2, repeat('b', 8192)), (2, repeat('b', 8192));
+connection slave;
+SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
+START SLAVE IO_THREAD;
+include/wait_for_slave_io_error.inc [errno=1595]
+SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
+include/start_slave.inc
+connection master;
+connection slave;
+connection master;
+SET @@GLOBAL.log_bin_compress= @save_log_bin_compress;
+SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len;
+DROP TABLE t;
+connection slave;
+SET GLOBAL debug_dbug= @save_debug;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt
new file mode 100644
index 00000000000..144bbca0730
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss-master.opt
@@ -0,0 +1,2 @@
+--binlog-row-event-max-size=8192
+
diff --git a/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test
new file mode 100644
index 00000000000..5b2d99f3bf1
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_row_end_of_statement_loss.test
@@ -0,0 +1,66 @@
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+--source include/master-slave.inc
+
+# Loss of STMT_END flagged event must error out the IO thread
+--connection slave
+call mtr.add_suppression("Slave IO thread did not receive an expected Rows-log end-of-statement");
+call mtr.add_suppression("Relay log write failure: could not queue event from master");
+
+SET @save_debug= @@global.debug;
+SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
+--source include/stop_slave.inc
+--replace_result $MASTER_MYPORT MASTER_PORT
+--eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, MASTER_USE_GTID=SLAVE_POS
+
+--connection master
+--let $max_row_size=8192
+--eval CREATE TABLE t (a INT, b text($max_row_size));
+--eval INSERT INTO t values (1, repeat('b', $max_row_size)), (1, repeat('b', $max_row_size))
+
+# Prove that the missed STMT_END marked rows-event causes the io thread stop.
+--connection slave
+START SLAVE IO_THREAD;
+--let $slave_io_errno=1595
+--source include/wait_for_slave_io_error.inc
+SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
+--source include/start_slave.inc
+
+--connection master
+sync_slave_with_master;
+
+# Compressed version of the above
+--connection slave
+--source include/stop_slave.inc
+
+--connection master
+SET @save_log_bin_compress= @@GLOBAL.log_bin_compress;
+SET @save_log_bin_compress_min_len= @@GLOBAL.log_bin_compress_min_len;
+
+SET @@GLOBAL.log_bin_compress=ON;
+SET @@GLOBAL.log_bin_compress_min_len=10;
+
+--eval INSERT INTO t values (2, repeat('b', $max_row_size)), (2, repeat('b', $max_row_size))
+
+# Prove that the missed STMT_END marked rows-event causes the io thread stop.
+--connection slave
+SET GLOBAL debug_dbug="+d,simulate_stmt_end_rows_event_loss";
+START SLAVE IO_THREAD;
+--let $slave_io_errno=1595
+--source include/wait_for_slave_io_error.inc
+SET GLOBAL debug_dbug="-d,simulate_stmt_end_rows_event_loss";
+--source include/start_slave.inc
+
+--connection master
+sync_slave_with_master;
+
+# cleanup
+
+--connection master
+SET @@GLOBAL.log_bin_compress= @save_log_bin_compress;
+SET @@GLOBAL.log_bin_compress_min_len= @save_log_bin_compress_min_len;
+DROP TABLE t;
+sync_slave_with_master;
+SET GLOBAL debug_dbug= @save_debug;
+
+--source include/rpl_end.inc
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index ccc1be6e5ce..b304e45f86a 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -133,6 +133,19 @@ public:
extern TYPELIB slave_parallel_mode_typelib;
+typedef struct st_rows_event_tracker
+{
+ char binlog_file_name[FN_REFLEN];
+ my_off_t first_seen;
+ my_off_t last_seen;
+ bool stmt_end_seen;
+ void update(const char* file_name, size_t pos,
+ const char* buf,
+ const Format_description_log_event *fdle);
+ void reset();
+ bool check_and_report(const char* file_name, size_t pos);
+} Rows_event_tracker;
+
/*****************************************************************************
Replication IO Thread
@@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability
uint64 gtid_reconnect_event_skip_count;
/* gtid_event_seen is false until we receive first GTID event from master. */
bool gtid_event_seen;
+ /**
+ The struct holds some history of Rows- log-event reading/queuing
+ by the receiver thread. Its fields are updated per each such event
+ at time of queue_event(), and they are checked to detect
+ the Rows- event group integrity violation at time of first non-Rows-
+ event gets handled.
+ */
+ Rows_event_tracker rows_event_tracker;
bool in_start_all_slaves, in_stop_all_slaves;
bool in_flush_all_relay_logs;
uint users; /* Active user for object */
diff --git a/sql/slave.cc b/sql/slave.cc
index c686553fdd5..e5c502c2de5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3349,7 +3349,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
we suppress prints to .err file as long as the reconnect
happens without problems
*/
- *suppress_warnings= TRUE;
+ *suppress_warnings=
+ global_system_variables.log_warnings < 2 ? TRUE : FALSE;
}
else
{
@@ -4274,6 +4275,7 @@ pthread_handler_t handle_slave_io(void *arg)
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
mysql_cond_broadcast(&mi->start_cond);
+ mi->rows_event_tracker.reset();
DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu",
mi->master_log_name, mi->master_log_pos));
@@ -4356,6 +4358,10 @@ connected:
*/
mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid;
mi->gtid_event_seen= false;
+ /*
+ Reset stale state of the rows-event group tracker at reconnect.
+ */
+ mi->rows_event_tracker.reset();
}
#ifdef ENABLED_DEBUG_SYNC
@@ -5752,7 +5758,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
char* new_buf = NULL;
char new_buf_arr[4096];
bool is_malloc = false;
-
+ bool is_rows_event= false;
/*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
@@ -6186,11 +6192,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action;
- if (unlikely(!mi->gtid_event_seen))
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
{
- mi->gtid_event_seen= true;
- if (mi->gtid_reconnect_event_skip_count)
+ if (likely(!mi->gtid_event_seen))
{
+ mi->gtid_event_seen= true;
/*
If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check
@@ -6219,13 +6225,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err;
}
+ if (global_system_variables.log_warnings > 1)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_information("Slave IO thread is reconnected to "
+ "receive Gtid_log_event %s. It is to skip %llu "
+ "already received events including the gtid one",
+ gtid_text.ptr(),
+ mi->events_queued_since_last_gtid);
+ }
+ goto default_action;
}
- }
+ else
+ {
+ bool first;
+ StringBuffer<1024> gtid_text;
- if (unlikely(mi->gtid_reconnect_event_skip_count))
- {
- goto default_action;
+ gtid_text.append(STRING_WITH_LEN("Last received gtid: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ gtid_text.append(STRING_WITH_LEN(", currently received: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &event_gtid, &first);
+
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Slave IO thread has received a new Gtid_log_event "
+ "while skipping already logged events "
+ "after reconnect. %s. %llu remains to be skipped. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ goto err;
+ }
}
+ mi->gtid_event_seen= true;
/*
We have successfully queued to relay log everything before this GTID, so
@@ -6292,8 +6330,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
}
- buf = new_buf;
is_compress_event = true;
+ buf = new_buf;
+ /*
+ As we are uncertain about compressed V2 rows events, we don't track
+ them
+ */
+ if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET]))
+ goto default_action;
+ /* fall through */
+ case WRITE_ROWS_EVENT_V1:
+ case UPDATE_ROWS_EVENT_V1:
+ case DELETE_ROWS_EVENT_V1:
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ {
+ is_rows_event= true;
+ mi->rows_event_tracker.update(mi->master_log_name,
+ mi->master_log_pos,
+ buf,
+ mi->rli.relay_log.
+ description_event_for_queue);
+
+ DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss",
+ {
+ mi->rows_event_tracker.stmt_end_seen= false;
+ });
+ }
goto default_action;
#ifndef DBUG_OFF
@@ -6352,6 +6416,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
/*
+ Integrity of Rows- event group check.
+ A sequence of Rows- events must end with STMT_END_F flagged one.
+ Even when Heartbeat event interrupts Rows- events flow this must indicate a
+ malfunction e.g logging on the master.
+ */
+ if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) &&
+ !is_rows_event &&
+ mi->rows_event_tracker.check_and_report(mi->master_log_name,
+ mi->master_log_pos))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+
+ /*
If we filter events master-side (eg. @@skip_replication), we will see holes
in the event positions from the master. If we see such a hole, adjust
mi->master_log_pos accordingly so we maintain the correct position (for
@@ -6519,6 +6598,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID.
*/
+ if (mi->gtid_reconnect_event_skip_count)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_error("Slave IO thread received a terminal event from "
+ "group %s whose retrieval was interrupted "
+ "with reconnect. We still had %llu events to read. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ }
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
@@ -7518,6 +7612,92 @@ bool rpl_master_erroneous_autoinc(THD *thd)
return FALSE;
}
+
+static bool get_row_event_stmt_end(const char* buf,
+ const Format_description_log_event *fdle)
+{
+ uint8 const common_header_len= fdle->common_header_len;
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
+
+ uint8 const post_header_len= fdle->post_header_len[event_type-1];
+ const char *flag_start= buf + common_header_len;
+ /*
+ The term 4 below signifies that master is of 'an intermediate source', see
+ Rows_log_event::Rows_log_event.
+ */
+ flag_start += RW_MAPID_OFFSET + (post_header_len == 6) ? 4 : RW_FLAGS_OFFSET;
+
+ return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0;
+}
+
+
+/*
+ Reset log event tracking data.
+*/
+
+void Rows_event_tracker::reset()
+{
+ binlog_file_name[0]= 0;
+ first_seen= last_seen= 0;
+ stmt_end_seen= false;
+}
+
+
+/*
+ Update log event tracking data.
+
+ The first- and last- seen event binlog position get memorized, as
+ well as the end-of-statement status of the last one.
+*/
+
+void Rows_event_tracker::update(const char* file_name, size_t pos,
+ const char* buf,
+ const Format_description_log_event *fdle)
+{
+ if (!first_seen)
+ {
+ first_seen= pos;
+ strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1);
+ }
+ last_seen= pos;
+ DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
+ stmt_end_seen= get_row_event_stmt_end(buf, fdle);
+};
+
+
+/**
+ The function is called at next event reading
+ after a sequence of Rows- log-events. It checks the end-of-statement status
+ of the past sequence to report on any isssue.
+ In the positive case the tracker gets reset.
+
+ @return true when the Rows- event group integrity found compromised,
+ false otherwise.
+*/
+bool Rows_event_tracker::check_and_report(const char* file_name,
+ size_t pos)
+{
+ if (last_seen)
+ {
+ // there was at least one "block" event previously
+ if (!stmt_end_seen)
+ {
+ sql_print_error("Slave IO thread did not receive an expected "
+ "Rows-log end-of-statement for event starting "
+ "at log '%s' position %llu "
+ "whose last block was seen at log '%s' position %llu. "
+ "The end-of-statement should have been delivered "
+ "before the current one at log '%s' position %llu",
+ binlog_file_name, first_seen,
+ binlog_file_name, last_seen, file_name, pos);
+ return true;
+ }
+ reset();
+ }
+
+ return false;
+}
+
/**
@} (end of group Replication)
*/