summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel2.result3
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel2.test3
-rw-r--r--sql/rpl_parallel.cc27
3 files changed, 29 insertions, 4 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel2.result b/mysql-test/suite/rpl/r/rpl_parallel2.result
index f79661ee6fb..644870475dd 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel2.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel2.result
@@ -1,8 +1,10 @@
include/rpl_init.inc [topology=1->2]
*** MDEV-5509: Incorrect value for Seconds_Behind_Master if parallel replication ***
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+set @old_parallel_mode= @@GLOBAL.slave_parallel_mode;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
+set global slave_parallel_mode= optimistic;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b INT);
CALL mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave");
@@ -127,6 +129,7 @@ UNLOCK TABLES;
UNLOCK TABLES;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+set global slave_parallel_mode= @old_parallel_mode;
include/start_slave.inc
DROP TABLE t1, t2;
include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test
index 3a9c801175f..8934b15e546 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel2.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel2.test
@@ -8,8 +8,10 @@
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+set @old_parallel_mode= @@GLOBAL.slave_parallel_mode;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
+set global slave_parallel_mode= optimistic;
--source include/start_slave.inc
--connection server_1
@@ -219,6 +221,7 @@ UNLOCK TABLES;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+set global slave_parallel_mode= @old_parallel_mode;
--source include/start_slave.inc
--connection server_1
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index fb6f23af295..e58729ebbf3 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -369,13 +369,14 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
}
-static void
+static bool
do_ftwrl_wait(rpl_group_info *rgi,
bool *did_enter_cond, PSI_stage_info *old_stage)
{
THD *thd= rgi->thd;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 sub_id= rgi->gtid_sub_id;
+ bool aborted= false;
DBUG_ENTER("do_ftwrl_wait");
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
@@ -397,7 +398,10 @@ do_ftwrl_wait(rpl_group_info *rgi,
do
{
if (entry->force_abort || rgi->worker_error)
+ {
+ aborted= true;
break;
+ }
if (thd->check_killed())
{
thd->send_kill_message();
@@ -417,7 +421,7 @@ do_ftwrl_wait(rpl_group_info *rgi,
if (sub_id > entry->largest_started_sub_id)
entry->largest_started_sub_id= sub_id;
- DBUG_VOID_RETURN;
+ DBUG_RETURN(aborted);
}
@@ -500,7 +504,22 @@ rpl_unpause_after_ftwrl(THD *thd)
mysql_mutex_lock(&e->LOCK_parallel_entry);
rpt->pause_for_ftwrl = false;
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ /*
+ Do not change pause_sub_id if force_abort is set.
+ force_abort is set in case of STOP SLAVE.
+
+ Reason: If pause_sub_id is not changed and force_abort_is set,
+ any parallel slave thread waiting in do_ftwrl_wait() will
+ on wakeup return from do_ftwrl_wait() with 1. This will set
+ skip_event_group to 1 in handle_rpl_parallel_thread() and the
+ parallel thread will abort at once.
+
+ If pause_sub_id is changed, the code in handle_rpl_parallel_thread()
+ would continue to execute the transaction in the queue, which would
+ cause some transactions to be lost.
+ */
+ if (!e->force_abort)
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
mysql_cond_broadcast(&e->COND_parallel_entry);
mysql_mutex_unlock(&e->LOCK_parallel_entry);
}
@@ -1155,7 +1174,7 @@ handle_rpl_parallel_thread(void *arg)
rgi->worker_error= 1;
}
if (likely(!skip_event_group))
- do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
+ skip_event_group= do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
/*
Register ourself to wait for the previous commit, if we need to do