summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSujatha <sujatha.sivakumar@mariadb.com>2019-09-30 13:22:37 +0530
committerSujatha <sujatha.sivakumar@mariadb.com>2019-09-30 13:22:37 +0530
commit9b80f9300d6060f754d95d5a7b92706ee7d47e5e (patch)
tree37bd6a54bf5838813dfee17c17aae97816f82a55
parent677cc64428c0e6b0f596c6e861a5a0f7344078c6 (diff)
downloadmariadb-git-9b80f9300d6060f754d95d5a7b92706ee7d47e5e.tar.gz
MDEV-20645: Replication consistency is broken as workers miss the error notification from an earlier failed group.
Analysis: ======== In general if there are three groups. 1 - Inserts 32 which fails due to local entry '32' on slave. 2 - Inserts 33 3 - Inserts 34 Each group considers itself as a waiter and it waits for prior group 'waitee'. This is done in 'register_wait_for_prior_event_group_commit'. If there is no other parallel group being scheduled then no waitee will be there. Let us assume 3 groups are being scheduled in parallel. 3-> waits for 2-> waits for->1 '1' upon completion it checks is there any registered subsequent waiter. If so it wakes up the subsequent waiter with its execution status. This execution status is stored in wakeup_error. If '1' failed then it sends corresponding wakeup_error to 2. Then '2' aborts and it propagates error to '3'. So all further commits are aborted. This mechanism works only when all transactions reach a stage where they are waiting for their prior commit to complete. In case of optimistic following scenario occurs. 1,2,3 are scheduled in parallel. 3 - Reaches group_commit_code waits for 2 to complete. 1 - errors out sets stop_on_error_sub_id=1. When a group execution results in error its corresponding sub_id is set to 'stop_on_error_sub_id'. Any new groups queued for execution will check if their sub_id is > stop_on_error_sub_id. If it is true their execution will be skipped as prior group execution failed. 'skip_event_group=1' will be set. Since the execution of SQL thread is about to stop we just skip execution of all the following event groups. We still do all the normal waiting and wakeup processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. Upon error '1' transaction checks for registered waiters. Since no one is there it simply goes away. 2 - Starts the execution. It checks do I have a waitee. Since wait_commit_sub_id == entry->last_committed_sub_id no waitee is set. Secondly: 'entry->stop_on_error_sub_id' is set by '1'st execution. Now 'handle_parallel_thread' code checks if the current group 'sub_id' is greater than the 'sub_id' set within 'stop_on_error_sub_id'. Since the above is true 'skip_event_group=true' is set. Simply call 'wait_for_prior_commit' to wakeup all waiters. Group '2' didn't had any waitee and its execution is skipped. Hence its wakeup_error=0.It sends a positive wakeup signal to '3'. Which commits. This results in a missed transaction. i.e 33 is missed and 34 is committed. Fix: === When a worker learns that an earlier transaction execution has failed, and it should not proceed for further execution, it should mark its own execution status as failed so that it alerts its followers to abort as well.
-rw-r--r--mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.result57
-rw-r--r--mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.test1
-rw-r--r--mysql-test/suite/rpl/include/rpl_parallel_ignored_errors.inc112
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_ignored_errors.result57
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_ignored_errors.test1
-rw-r--r--sql/rpl_parallel.cc16
6 files changed, 244 insertions, 0 deletions
diff --git a/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.result b/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.result
new file mode 100644
index 00000000000..9fbd329d58c
--- /dev/null
+++ b/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.result
@@ -0,0 +1,57 @@
+include/master-slave.inc
+[connection master]
+connection server_2;
+include/stop_slave.inc
+connection server_2;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL slave_parallel_mode='optimistic';
+SET GLOBAL slave_parallel_threads= 3;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+include/start_slave.inc
+connection server_2;
+connection server_1;
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
+include/save_master_gtid.inc
+connection server_1;
+connection server_2;
+include/sync_with_master_gtid.inc
+connection server_2;
+connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
+BEGIN;
+INSERT INTO t1 VALUES (32);
+connection server_1;
+INSERT INTO t1 VALUES (32);
+connection server_2;
+SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
+SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
+connection server_1;
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (33);
+connection server_2;
+SET debug_sync='now WAIT_FOR reached_pause';
+connection server_1;
+INSERT INTO t1 VALUES (34);
+connection server_2;
+connection con_temp2;
+COMMIT;
+connection server_2;
+include/stop_slave.inc
+connection server_2;
+include/assert.inc [table t1 should have zero rows where a>32]
+connection server_2;
+SELECT * FROM t1 WHERE a>32;
+a
+DELETE FROM t1 WHERE a=32;
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL slave_parallel_mode=@old_parallel_mode;
+SET GLOBAL debug_dbug=@old_debug;
+SET DEBUG_SYNC= 'RESET';
+include/start_slave.inc
+connection server_2;
+connection server_1;
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.test b/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.test
new file mode 100644
index 00000000000..8a26778c8f2
--- /dev/null
+++ b/mysql-test/suite/binlog_encryption/rpl_parallel_ignored_errors.test
@@ -0,0 +1 @@
+--source suite/rpl/include/rpl_parallel_ignored_errors.inc
diff --git a/mysql-test/suite/rpl/include/rpl_parallel_ignored_errors.inc b/mysql-test/suite/rpl/include/rpl_parallel_ignored_errors.inc
new file mode 100644
index 00000000000..25da12f30a3
--- /dev/null
+++ b/mysql-test/suite/rpl/include/rpl_parallel_ignored_errors.inc
@@ -0,0 +1,112 @@
+# ==== Purpose ====
+#
+# Test verifies that, in parallel replication, transaction failure notification
+# is propagated to all the workers. Workers should abort the execution of
+# transaction event groups, whose event positions are higher than the failing
+# transaction group.
+#
+# ==== Implementation ====
+#
+# Steps:
+# 0 - Create a table t1 on master which has a primary key. Enable parallel
+# replication on slave with slave_parallel_mode='optimistic' and
+# slave_parallel_threads=3.
+# 1 - On slave start a transaction and execute a local INSERT statement
+# which will insert value 32. This is done to block the INSERT coming
+# from master.
+# 2 - On master execute an INSERT statement with value 32, so that it is
+# blocked on slave.
+# 3 - On slave enable a debug sync point such that it holds the worker thread
+# execution as soon as work is scheduled to it.
+# 4 - INSERT value 33 on master. It will be held on slave by other worker
+# thread due to debug simulation.
+# 5 - INSERT value 34 on master.
+# 6 - On slave, enusre that INSERT 34 has reached a state where it waits for
+# its prior transactions to commit.
+# 7 - Commit the local INSERT 32 on slave server so that first worker will
+# error out.
+# 8 - Now send a continue signal to second worker processing 33. It should
+# wakeup and propagate the error to INSERT 34.
+# 9 - Upon slave stop due to error, check that no rows are found after the
+# failed INSERT 32.
+#
+# ==== References ====
+#
+# MDEV-20645: Replication consistency is broken as workers miss the error
+# notification from an earlier failed group.
+#
+
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/have_binlog_format_statement.inc
+--source include/master-slave.inc
+
+--enable_connect_log
+--connection server_2
+--source include/stop_slave.inc
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL slave_parallel_mode='optimistic';
+SET GLOBAL slave_parallel_threads= 3;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+--source include/start_slave.inc
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+
+--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+BEGIN;
+INSERT INTO t1 VALUES (32);
+
+--connection server_1
+INSERT INTO t1 VALUES (32);
+
+--connection server_2
+--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE info like "INSERT INTO t1 VALUES (32)"
+--source include/wait_condition.inc
+SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
+SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
+
+--connection server_1
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (33);
+
+--connection server_2
+SET debug_sync='now WAIT_FOR reached_pause';
+
+--connection server_1
+INSERT INTO t1 VALUES (34);
+
+--connection server_2
+--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE state like "Waiting for prior transaction to commit"
+--source include/wait_condition.inc
+--connection con_temp2
+COMMIT;
+
+# Clean up.
+--connection server_2
+--source include/stop_slave.inc
+--let $assert_cond= COUNT(*) = 0 FROM t1 WHERE a>32
+--let $assert_text= table t1 should have zero rows where a>32
+--source include/assert.inc
+SELECT * FROM t1 WHERE a>32;
+DELETE FROM t1 WHERE a=32;
+
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL slave_parallel_mode=@old_parallel_mode;
+SET GLOBAL debug_dbug=@old_debug;
+SET DEBUG_SYNC= 'RESET';
+--source include/start_slave.inc
+
+--connection server_1
+DROP TABLE t1;
+--disable_connect_log
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_ignored_errors.result b/mysql-test/suite/rpl/r/rpl_parallel_ignored_errors.result
new file mode 100644
index 00000000000..9fbd329d58c
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_parallel_ignored_errors.result
@@ -0,0 +1,57 @@
+include/master-slave.inc
+[connection master]
+connection server_2;
+include/stop_slave.inc
+connection server_2;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL slave_parallel_mode='optimistic';
+SET GLOBAL slave_parallel_threads= 3;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+include/start_slave.inc
+connection server_2;
+connection server_1;
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=InnoDB;
+include/save_master_gtid.inc
+connection server_1;
+connection server_2;
+include/sync_with_master_gtid.inc
+connection server_2;
+connect con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
+BEGIN;
+INSERT INTO t1 VALUES (32);
+connection server_1;
+INSERT INTO t1 VALUES (32);
+connection server_2;
+SET GLOBAL debug_dbug="+d,hold_worker_on_schedule";
+SET debug_sync="debug_sync_action SIGNAL reached_pause WAIT_FOR continue_worker";
+connection server_1;
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (33);
+connection server_2;
+SET debug_sync='now WAIT_FOR reached_pause';
+connection server_1;
+INSERT INTO t1 VALUES (34);
+connection server_2;
+connection con_temp2;
+COMMIT;
+connection server_2;
+include/stop_slave.inc
+connection server_2;
+include/assert.inc [table t1 should have zero rows where a>32]
+connection server_2;
+SELECT * FROM t1 WHERE a>32;
+a
+DELETE FROM t1 WHERE a=32;
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL slave_parallel_mode=@old_parallel_mode;
+SET GLOBAL debug_dbug=@old_debug;
+SET DEBUG_SYNC= 'RESET';
+include/start_slave.inc
+connection server_2;
+connection server_1;
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_ignored_errors.test b/mysql-test/suite/rpl/t/rpl_parallel_ignored_errors.test
new file mode 100644
index 00000000000..90f09a76546
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_ignored_errors.test
@@ -0,0 +1 @@
+--source include/rpl_parallel_ignored_errors.inc
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 8fef2d66635..574730b96c1 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -228,6 +228,12 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ DBUG_EXECUTE_IF("hold_worker_on_schedule", {
+ if (entry->stop_on_error_sub_id < (uint64)ULONGLONG_MAX)
+ {
+ debug_sync_set_action(thd, STRING_WITH_LEN("now SIGNAL continue_worker"));
+ }
+ });
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
wait_for_pending_deadlock_kill(thd, rgi);
@@ -1096,6 +1102,13 @@ handle_rpl_parallel_thread(void *arg)
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("hold_worker_on_schedule", {
+ if (rgi->current_gtid.domain_id == 0 &&
+ rgi->current_gtid.seq_no == 100) {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL reached_pause WAIT_FOR continue_worker"));
+ }
+ });
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 &&
rgi->current_gtid.seq_no == 100) {
@@ -1137,7 +1150,10 @@ handle_rpl_parallel_thread(void *arg)
skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ {
skip_event_group= true;
+ rgi->worker_error= 1;
+ }
if (likely(!skip_event_group))
do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);