diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel2.result | 3 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel2.test | 3 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 27 |
3 files changed, 29 insertions, 4 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel2.result b/mysql-test/suite/rpl/r/rpl_parallel2.result index f79661ee6fb..644870475dd 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel2.result +++ b/mysql-test/suite/rpl/r/rpl_parallel2.result @@ -1,8 +1,10 @@ include/rpl_init.inc [topology=1->2] *** MDEV-5509: Incorrect value for Seconds_Behind_Master if parallel replication *** SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +set @old_parallel_mode= @@GLOBAL.slave_parallel_mode; include/stop_slave.inc SET GLOBAL slave_parallel_threads=5; +set global slave_parallel_mode= optimistic; include/start_slave.inc CREATE TABLE t1 (a INT PRIMARY KEY, b INT); CALL mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave"); @@ -127,6 +129,7 @@ UNLOCK TABLES; UNLOCK TABLES; include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; +set global slave_parallel_mode= @old_parallel_mode; include/start_slave.inc DROP TABLE t1, t2; include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test index 3a9c801175f..8934b15e546 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel2.test +++ b/mysql-test/suite/rpl/t/rpl_parallel2.test @@ -8,8 +8,10 @@ --connection server_2 SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +set @old_parallel_mode= @@GLOBAL.slave_parallel_mode; --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=5; +set global slave_parallel_mode= optimistic; --source include/start_slave.inc --connection server_1 @@ -219,6 +221,7 @@ UNLOCK TABLES; --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; +set global slave_parallel_mode= @old_parallel_mode; --source include/start_slave.inc --connection server_1 diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index fb6f23af295..e58729ebbf3 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -369,13 +369,14 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, } -static void +static bool do_ftwrl_wait(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage) { THD *thd= rgi->thd; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 sub_id= rgi->gtid_sub_id; + bool aborted= false; DBUG_ENTER("do_ftwrl_wait"); mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); @@ -397,7 +398,10 @@ do_ftwrl_wait(rpl_group_info *rgi, do { if (entry->force_abort || rgi->worker_error) + { + aborted= true; break; + } if (thd->check_killed()) { thd->send_kill_message(); @@ -417,7 +421,7 @@ do_ftwrl_wait(rpl_group_info *rgi, if (sub_id > entry->largest_started_sub_id) entry->largest_started_sub_id= sub_id; - DBUG_VOID_RETURN; + DBUG_RETURN(aborted); } @@ -500,7 +504,22 @@ rpl_unpause_after_ftwrl(THD *thd) mysql_mutex_lock(&e->LOCK_parallel_entry); rpt->pause_for_ftwrl = false; mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - e->pause_sub_id= (uint64)ULONGLONG_MAX; + /* + Do not change pause_sub_id if force_abort is set. + force_abort is set in case of STOP SLAVE. + + Reason: If pause_sub_id is not changed and force_abort_is set, + any parallel slave thread waiting in do_ftwrl_wait() will + on wakeup return from do_ftwrl_wait() with 1. This will set + skip_event_group to 1 in handle_rpl_parallel_thread() and the + parallel thread will abort at once. + + If pause_sub_id is changed, the code in handle_rpl_parallel_thread() + would continue to execute the transaction in the queue, which would + cause some transactions to be lost. + */ + if (!e->force_abort) + e->pause_sub_id= (uint64)ULONGLONG_MAX; mysql_cond_broadcast(&e->COND_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry); } @@ -1155,7 +1174,7 @@ handle_rpl_parallel_thread(void *arg) rgi->worker_error= 1; } if (likely(!skip_event_group)) - do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + skip_event_group= do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); /* Register ourself to wait for the previous commit, if we need to do |