diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel2.result | 3 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel2.test | 3 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 27 |
3 files changed, 29 insertions, 4 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel2.result b/mysql-test/suite/rpl/r/rpl_parallel2.result index 42270916e4f..559c56271b8 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel2.result +++ b/mysql-test/suite/rpl/r/rpl_parallel2.result @@ -2,8 +2,10 @@ include/rpl_init.inc [topology=1->2] *** MDEV-5509: Incorrect value for Seconds_Behind_Master if parallel replication *** connection server_2; SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +set @old_parallel_mode= @@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL slave_parallel_threads=5; +set global slave_parallel_mode= optimistic; include/start_slave.inc connection server_1; CREATE TABLE t1 (a INT PRIMARY KEY, b INT); @@ -157,6 +159,7 @@ UNLOCK TABLES; connection server_2; include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; +set global slave_parallel_mode= @old_parallel_mode; include/start_slave.inc connection server_1; DROP TABLE t1, t2; diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test index 3a9c801175f..8934b15e546 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel2.test +++ b/mysql-test/suite/rpl/t/rpl_parallel2.test @@ -8,8 +8,10 @@ --connection server_2 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +set @old_parallel_mode= @@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=5; +set global slave_parallel_mode= optimistic; --source include/start_slave.inc --connection server_1 @@ -219,6 +221,7 @@ UNLOCK TABLES; --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; +set global slave_parallel_mode= @old_parallel_mode; --source include/start_slave.inc --connection server_1 diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index eef6f734f1f..154636480ca 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -396,13 +396,14 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, } -static void +static bool do_ftwrl_wait(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage) { THD *thd= rgi->thd; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 sub_id= rgi->gtid_sub_id; + bool aborted= false; DBUG_ENTER("do_ftwrl_wait"); mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); @@ -425,7 +426,10 @@ do_ftwrl_wait(rpl_group_info *rgi, do { if (entry->force_abort || rgi->worker_error) + { + aborted= true; break; + } if (unlikely(thd->check_killed())) { slave_output_error_info(rgi, thd); @@ -444,7 +448,7 @@ do_ftwrl_wait(rpl_group_info *rgi, if (sub_id > entry->largest_started_sub_id) entry->largest_started_sub_id= sub_id; - DBUG_VOID_RETURN; + DBUG_RETURN(aborted); } @@ -530,7 +534,22 @@ rpl_unpause_after_ftwrl(THD *thd) mysql_mutex_lock(&e->LOCK_parallel_entry); rpt->pause_for_ftwrl = false; mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - e->pause_sub_id= (uint64)ULONGLONG_MAX; + /* + Do not change pause_sub_id if force_abort is set. + force_abort is set in case of STOP SLAVE. + + Reason: If pause_sub_id is not changed and force_abort_is set, + any parallel slave thread waiting in do_ftwrl_wait() will + on wakeup return from do_ftwrl_wait() with 1. This will set + skip_event_group to 1 in handle_rpl_parallel_thread() and the + parallel thread will abort at once. + + If pause_sub_id is changed, the code in handle_rpl_parallel_thread() + would continue to execute the transaction in the queue, which would + cause some transactions to be lost. + */ + if (!e->force_abort) + e->pause_sub_id= (uint64)ULONGLONG_MAX; mysql_cond_broadcast(&e->COND_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry); } @@ -1222,7 +1241,7 @@ handle_rpl_parallel_thread(void *arg) rgi->worker_error= 1; } if (likely(!skip_event_group)) - do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + skip_event_group= do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); /* Register ourself to wait for the previous commit, if we need to do |