diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-01-07 14:45:39 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-01-07 14:45:39 +0100 |
commit | f27817c1d0e6d81392470e9086624e88ae08b11f (patch) | |
tree | 07143fafd819462ef1baf0d451d5537f1a60610b | |
parent | 4a3251595cc697bfdb15b67c07514bd3c4779e37 (diff) | |
download | mariadb-git-f27817c1d0e6d81392470e9086624e88ae08b11f.tar.gz |
MDEV-7326: Server deadlock in connection with parallel replication
The bug occurs when a transaction does a retry after all transactions have
done mark_start_commit() in a batch of group commit from the master. In this
case, the retrying transaction can unmark_start_commit() after the following
batch has already started running and de-allocated the GCO. Then after retry,
the transaction will re-do mark_start_commit() on a de-allocated GCO, and also
wakeup of later GCOs can be lost.
This was seen "in the wild" by a user, even though it is not known exactly
what circumstances can lead to retry of one transaction after all transactions
in a group have reached the commit phase.
The lifetime around GCO was somewhat clunky anyway. With this patch, a GCO
lives until rpl_parallel_entry::last_committed_sub_id has reached the last
transaction in the GCO. This guarantees that the GCO will still be alive when
a transaction does mark_start_commit(). Also, we now loop over the list of
active GCOs for wakeup, to ensure we do not lose a wakeup even in the
problematic case.
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 113 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 207 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 108 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 22 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 19 | ||||
-rw-r--r-- | sql/rpl_rli.h | 4 |
6 files changed, 419 insertions, 54 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index a3b423a49c9..7ceb5ee6622 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -1023,6 +1023,119 @@ SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; CHANGE MASTER TO master_use_gtid=slave_pos; include/start_slave.inc +*** MDEV-7326 Server deadlock in connection with parallel replication *** +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid"; +include/start_slave.inc +SET @old_format= @@SESSION.binlog_format; +SET binlog_format= STATEMENT; +INSERT INTO t1 VALUES (foo(50, +"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready", +"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont")); +SET DEBUG_SYNC= "now WAIT_FOR prep_ready"; +INSERT INTO t2 VALUES (foo(50, +"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1", +"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2")); +SET DEBUG_SYNC= "now WAIT_FOR t1_ready1"; +INSERT INTO t1 VALUES (foo(51, +"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1", +"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2")); +SET DEBUG_SYNC= "now WAIT_FOR t2_ready1"; +SET DEBUG_SYNC= "now SIGNAL t1_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t1_ready2"; +INSERT INTO t1 VALUES (52); +SET BINLOG_FORMAT= @old_format; +SELECT * FROM t2 WHERE a>=50 ORDER BY a; +a +50 +SELECT * FROM t1 WHERE a>=50 ORDER BY a; +a +50 +51 +52 +SET DEBUG_SYNC= "now SIGNAL prep_cont"; +SET DEBUG_SYNC= "now WAIT_FOR t3_ready"; +SET DEBUG_SYNC= "now SIGNAL t2_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t2_ready2"; +SET DEBUG_SYNC= "now SIGNAL t1_cont2"; +SELECT * FROM t2 WHERE a>=50 ORDER BY a; +a +50 +SELECT * FROM t1 WHERE a>=50 ORDER BY a; +a +50 +51 +52 +SET DEBUG_SYNC="reset"; +include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +include/start_slave.inc +*** MDEV-7326 Server deadlock in connection with parallel replication *** +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid"; +include/start_slave.inc +SET @old_format= @@SESSION.binlog_format; +SET binlog_format= STATEMENT; +INSERT INTO t1 VALUES (foo(60, +"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready", +"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont")); +SET DEBUG_SYNC= "now WAIT_FOR prep_ready"; +INSERT INTO t2 VALUES (foo(60, +"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1", +"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2")); +SET DEBUG_SYNC= "now WAIT_FOR t1_ready1"; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +SET binlog_format=statement; +INSERT INTO t1 VALUES (foo(61, +"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1", +"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2")); +SET debug_sync='now WAIT_FOR master_queued1'; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +INSERT INTO t6 VALUES (62); +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; +SET debug_sync='RESET'; +SET BINLOG_FORMAT= @old_format; +SELECT * FROM t2 WHERE a>=60 ORDER BY a; +a +60 +SELECT * FROM t1 WHERE a>=60 ORDER BY a; +a +60 +61 +SELECT * FROM t6 WHERE a>=60 ORDER BY a; +a +62 +SET DEBUG_SYNC= "now WAIT_FOR t2_ready1"; +SET DEBUG_SYNC= "now SIGNAL t1_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t1_ready2"; +SET DEBUG_SYNC= "now SIGNAL prep_cont"; +SET DEBUG_SYNC= "now WAIT_FOR t3_ready"; +SET DEBUG_SYNC= "now SIGNAL t2_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t2_ready2"; +SET DEBUG_SYNC= "now SIGNAL t1_cont2"; +SELECT * FROM t2 WHERE a>=60 ORDER BY a; +a +60 +SELECT * FROM t1 WHERE a>=60 ORDER BY a; +a +60 +61 +SELECT * FROM t6 WHERE a>=60 ORDER BY a; +a +62 +SET DEBUG_SYNC="reset"; +include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +include/start_slave.inc include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index a56d45848a5..d4b99d4b0f7 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -1636,6 +1636,213 @@ CHANGE MASTER TO master_use_gtid=slave_pos; --source include/start_slave.inc +--echo *** MDEV-7326 Server deadlock in connection with parallel replication *** +# We use three transactions, each in a separate group commit. +# T1 does mark_start_commit(), then gets a deadlock error. +# T2 wakes up and starts running +# T1 does unmark_start_commit() +# T3 goes to wait for T2 to start its commit +# T2 does mark_start_commit() +# The bug was that at this point, T3 got deadlocked. Because T1 has unmarked(), +# T3 did not yet see the count_committing_event_groups reach its target value +# yet. But when T1 later re-did mark_start_commit(), it failed to send a wakeup +# to T3. + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid"; +--source include/start_slave.inc + +--connection server_1 +SET @old_format= @@SESSION.binlog_format; +SET binlog_format= STATEMENT; +# This debug_sync will linger on and be used to control T3 later. +INSERT INTO t1 VALUES (foo(50, + "rpl_parallel_start_waiting_for_prior SIGNAL t3_ready", + "rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont")); +--save_master_pos +--connection server_2 +# Wait for the debug_sync point for T3 to be set. But let the preparation +# transaction remain hanging, so that T1 and T2 will be scheduled for the +# remaining two worker threads. +SET DEBUG_SYNC= "now WAIT_FOR prep_ready"; + +--connection server_1 +INSERT INTO t2 VALUES (foo(50, + "rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1", + "rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2")); +--save_master_pos + +--connection server_2 +SET DEBUG_SYNC= "now WAIT_FOR t1_ready1"; +# T1 has now done mark_start_commit(). It will later do a rollback and retry. + +--connection server_1 +# Use a MyISAM table for T2 and T3, so they do not trigger the +# rpl_parallel_simulate_temp_err_xid DBUG insertion on XID event. +INSERT INTO t1 VALUES (foo(51, + "rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1", + "rpl_parallel_after_mark_start_commit SIGNAL t2_ready2")); + +--connection server_2 +SET DEBUG_SYNC= "now WAIT_FOR t2_ready1"; +# T2 has now started running, but has not yet done mark_start_commit() +SET DEBUG_SYNC= "now SIGNAL t1_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t1_ready2"; +# T1 has now done unmark_start_commit() in preparation for its retry. + +--connection server_1 +INSERT INTO t1 VALUES (52); +SET BINLOG_FORMAT= @old_format; +SELECT * FROM t2 WHERE a>=50 ORDER BY a; +SELECT * FROM t1 WHERE a>=50 ORDER BY a; + +--connection server_2 +# Let the preparation transaction complete, so that the same worker thread +# can continue with the transaction T3. +SET DEBUG_SYNC= "now SIGNAL prep_cont"; +SET DEBUG_SYNC= "now WAIT_FOR t3_ready"; +# T3 has now gone to wait for T2 to start committing +SET DEBUG_SYNC= "now SIGNAL t2_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t2_ready2"; +# T2 has now done mark_start_commit(). +# Let things run, and check that T3 does not get deadlocked. +SET DEBUG_SYNC= "now SIGNAL t1_cont2"; +--sync_with_master + +--connection server_1 +--save_master_pos +--connection server_2 +--sync_with_master +SELECT * FROM t2 WHERE a>=50 ORDER BY a; +SELECT * FROM t1 WHERE a>=50 ORDER BY a; +SET DEBUG_SYNC="reset"; + +# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC. +--source include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +--source include/start_slave.inc + + +--echo *** MDEV-7326 Server deadlock in connection with parallel replication *** +# Similar to the previous test, but with T2 and T3 in the same GCO. +# We use three transactions, T1 in one group commit and T2/T3 in another. +# T1 does mark_start_commit(), then gets a deadlock error. +# T2 wakes up and starts running +# T1 does unmark_start_commit() +# T3 goes to wait for T1 to start its commit +# T2 does mark_start_commit() +# The bug was that at this point, T3 got deadlocked. T2 increments the +# count_committing_event_groups but does not signal T3, as they are in +# the same GCO. Then later when T1 increments, it would also not signal +# T3, because now the count_committing_event_groups is not equal to the +# wait_count of T3 (it is one larger). + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=3; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid"; +--source include/start_slave.inc + +--connection server_1 +SET @old_format= @@SESSION.binlog_format; +SET binlog_format= STATEMENT; +# This debug_sync will linger on and be used to control T3 later. +INSERT INTO t1 VALUES (foo(60, + "rpl_parallel_start_waiting_for_prior SIGNAL t3_ready", + "rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont")); +--save_master_pos +--connection server_2 +# Wait for the debug_sync point for T3 to be set. But let the preparation +# transaction remain hanging, so that T1 and T2 will be scheduled for the +# remaining two worker threads. +SET DEBUG_SYNC= "now WAIT_FOR prep_ready"; + +--connection server_1 +INSERT INTO t2 VALUES (foo(60, + "rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1", + "rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2")); +--save_master_pos + +--connection server_2 +SET DEBUG_SYNC= "now WAIT_FOR t1_ready1"; +# T1 has now done mark_start_commit(). It will later do a rollback and retry. + +# Do T2 and T3 in a single group commit. +# Use a MyISAM table for T2 and T3, so they do not trigger the +# rpl_parallel_simulate_temp_err_xid DBUG insertion on XID event. +--connection con_temp3 +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +SET binlog_format=statement; +send INSERT INTO t1 VALUES (foo(61, + "rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1", + "rpl_parallel_after_mark_start_commit SIGNAL t2_ready2")); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued1'; + +--connection con_temp4 +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +send INSERT INTO t6 VALUES (62); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; + +--connection con_temp3 +REAP; +--connection con_temp4 +REAP; + +--connection server_1 +SET debug_sync='RESET'; +SET BINLOG_FORMAT= @old_format; +SELECT * FROM t2 WHERE a>=60 ORDER BY a; +SELECT * FROM t1 WHERE a>=60 ORDER BY a; +SELECT * FROM t6 WHERE a>=60 ORDER BY a; + +--connection server_2 +SET DEBUG_SYNC= "now WAIT_FOR t2_ready1"; +# T2 has now started running, but has not yet done mark_start_commit() +SET DEBUG_SYNC= "now SIGNAL t1_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t1_ready2"; +# T1 has now done unmark_start_commit() in preparation for its retry. + +--connection server_2 +# Let the preparation transaction complete, so that the same worker thread +# can continue with the transaction T3. +SET DEBUG_SYNC= "now SIGNAL prep_cont"; +SET DEBUG_SYNC= "now WAIT_FOR t3_ready"; +# T3 has now gone to wait for T2 to start committing +SET DEBUG_SYNC= "now SIGNAL t2_cont1"; +SET DEBUG_SYNC= "now WAIT_FOR t2_ready2"; +# T2 has now done mark_start_commit(). +# Let things run, and check that T3 does not get deadlocked. +SET DEBUG_SYNC= "now SIGNAL t1_cont2"; +--sync_with_master + +--connection server_1 +--save_master_pos +--connection server_2 +--sync_with_master +SELECT * FROM t2 WHERE a>=60 ORDER BY a; +SELECT * FROM t1 WHERE a>=60 ORDER BY a; +SELECT * FROM t6 WHERE a>=60 ORDER BY a; +SET DEBUG_SYNC="reset"; + +# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC. +--source include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +--source include/start_slave.inc + + # Clean up. --connection server_2 --source include/stop_slave.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 1a9e269022a..46c3e4aaaf4 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -106,9 +106,10 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) static void -finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, - rpl_group_info *rgi) +finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, + rpl_parallel_entry *entry, rpl_group_info *rgi) { + THD *thd= rpt->thd; wait_for_commit *wfc= &rgi->commit_orderer; int err; @@ -139,25 +140,47 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; + mysql_mutex_lock(&entry->LOCK_parallel_entry); /* - Record that this event group has finished (eg. transaction is - committed, if transactional), so other event groups will no longer - attempt to wait for us to commit. Once we have increased - entry->last_committed_sub_id, no other threads will execute - register_wait_for_prior_commit() against us. Thus, by doing one - extra (usually redundant) wakeup_subsequent_commits() we can ensure - that no register_wait_for_prior_commit() can ever happen without a - subsequent wakeup_subsequent_commits() to wake it up. - - We can race here with the next transactions, but that is fine, as - long as we check that we do not decrease last_committed_sub_id. If - this commit is done, then any prior commits will also have been - done and also no longer need waiting for. + We need to mark that this event group started its commit phase, in case we + missed it before (otherwise we would deadlock the next event group that is + waiting for this). In most cases (normal DML), it will be a no-op. */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); + rgi->mark_start_commit_no_lock(); + if (entry->last_committed_sub_id < sub_id) + { + /* + Record that this event group has finished (eg. transaction is + committed, if transactional), so other event groups will no longer + attempt to wait for us to commit. Once we have increased + entry->last_committed_sub_id, no other threads will execute + register_wait_for_prior_commit() against us. Thus, by doing one + extra (usually redundant) wakeup_subsequent_commits() we can ensure + that no register_wait_for_prior_commit() can ever happen without a + subsequent wakeup_subsequent_commits() to wake it up. + + We can race here with the next transactions, but that is fine, as + long as we check that we do not decrease last_committed_sub_id. If + this commit is done, then any prior commits will also have been + done and also no longer need waiting for. + */ entry->last_committed_sub_id= sub_id; + /* Now free any GCOs in which all transactions have committed. */ + group_commit_orderer *tmp_gco= rgi->gco; + while (tmp_gco && + (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id)) + tmp_gco= tmp_gco->prev_gco; + while (tmp_gco) + { + group_commit_orderer *prev_gco= tmp_gco->prev_gco; + tmp_gco->next_gco->prev_gco= NULL; + rpt->loc_free_gco(tmp_gco); + tmp_gco= prev_gco; + } + } + /* If this event group got error, then any following event groups that have not yet started should just skip their group, preparing for stop of the @@ -166,12 +189,6 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, if (unlikely(rgi->worker_error) && entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id= sub_id; - /* - We need to mark that this event group started its commit phase, in case we - missed it before (otherwise we would deadlock the next event group that is - waiting for this). In most cases (normal DML), it will be a no-op. - */ - rgi->mark_start_commit_no_lock(); mysql_mutex_unlock(&entry->LOCK_parallel_entry); thd->clear_error(); @@ -329,6 +346,7 @@ do_retry: until after the unmark. */ rgi->unmark_start_commit(); + DEBUG_SYNC(thd, "rpl_parallel_retry_after_unmark"); /* We might get the deadlock error that causes the retry during commit, while @@ -517,7 +535,7 @@ handle_rpl_parallel_thread(void *arg) bool in_event_group= false; bool skip_event_group= false; rpl_group_info *group_rgi= NULL; - group_commit_orderer *gco, *tmp_gco; + group_commit_orderer *gco; uint64 event_gtid_sub_id= 0; rpl_sql_thread_info sql_info(NULL); int err; @@ -610,7 +628,7 @@ handle_rpl_parallel_thread(void *arg) */ group_rgi->cleanup_context(thd, 1); in_event_group= false; - finish_event_group(thd, group_rgi->gtid_sub_id, + finish_event_group(rpt, group_rgi->gtid_sub_id, qev->entry_for_queued, group_rgi); rpt->loc_free_rgi(group_rgi); @@ -664,8 +682,12 @@ handle_rpl_parallel_thread(void *arg) mysql_mutex_lock(&entry->LOCK_parallel_entry); if (!gco->installed) { - if (gco->prev_gco) - gco->prev_gco->next_gco= gco; + group_commit_orderer *prev_gco= gco->prev_gco; + if (prev_gco) + { + prev_gco->last_sub_id= gco->prior_sub_id; + prev_gco->next_gco= gco; + } gco->installed= true; } wait_count= gco->wait_count; @@ -682,6 +704,8 @@ handle_rpl_parallel_thread(void *arg) if (thd->check_killed() && !rgi->worker_error) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); thd->send_kill_message(); slave_output_error_info(rgi, thd); signal_error_to_sql_driver_thread(thd, rgi, 1); @@ -698,18 +722,6 @@ handle_rpl_parallel_thread(void *arg) } while (wait_count > entry->count_committing_event_groups); } - if ((tmp_gco= gco->prev_gco)) - { - /* - Now all the event groups in the previous batch have entered their - commit phase, and will no longer access their gco. So we can free - it here. - */ - DBUG_ASSERT(!tmp_gco->prev_gco); - gco->prev_gco= NULL; - rpt->loc_free_gco(tmp_gco); - } - if (entry->force_abort && wait_count > entry->stop_count) { /* @@ -773,6 +785,7 @@ handle_rpl_parallel_thread(void *arg) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); rgi->mark_start_commit(); + DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit"); } /* @@ -793,6 +806,7 @@ handle_rpl_parallel_thread(void *arg) thd->get_stmt_da()->reset_diagnostics_area(); my_error(ER_LOCK_DEADLOCK, MYF(0)); err= 1; + DEBUG_SYNC(thd, "rpl_parallel_simulate_temp_err_xid"); }); if (!err) #endif @@ -832,7 +846,7 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - finish_event_group(thd, event_gtid_sub_id, entry, rgi); + finish_event_group(rpt, event_gtid_sub_id, entry, rgi); rpt->loc_free_rgi(rgi); thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; @@ -873,7 +887,7 @@ handle_rpl_parallel_thread(void *arg) */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); signal_error_to_sql_driver_thread(thd, group_rgi, 1); - finish_event_group(thd, group_rgi->gtid_sub_id, + finish_event_group(rpt, group_rgi->gtid_sub_id, group_rgi->parallel_entry, group_rgi); in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); @@ -922,7 +936,6 @@ handle_rpl_parallel_thread(void *arg) static void dealloc_gco(group_commit_orderer *gco) { - DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */); mysql_cond_destroy(&gco->COND_group_commit_orderer); my_free(gco); } @@ -1303,7 +1316,8 @@ rpl_parallel_thread::free_rgi(rpl_group_info *rgi) group_commit_orderer * -rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) +rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev, + uint64 prior_sub_id) { group_commit_orderer *gco; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -1319,6 +1333,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) gco->wait_count= wait_count; gco->prev_gco= prev; gco->next_gco= NULL; + gco->prior_sub_id= prior_sub_id; gco->installed= false; return gco; } @@ -1327,7 +1342,6 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) void rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) { - DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */); if (!loc_gco_list) loc_gco_last_ptr_ptr= &gco->next_gco; else @@ -1534,8 +1548,12 @@ static void free_rpl_parallel_entry(void *element) { rpl_parallel_entry *e= (rpl_parallel_entry *)element; - if (e->current_gco) + while (e->current_gco) + { + group_commit_orderer *prev_gco= e->current_gco->prev_gco; dealloc_gco(e->current_gco); + e->current_gco= prev_gco; + } mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -2007,7 +2025,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, uint64 count= e->count_queued_event_groups; group_commit_orderer *gco; - if (!(gco= cur_thread->get_gco(count, e->current_gco))) + if (!(gco= cur_thread->get_gco(count, e->current_gco, e->current_sub_id))) { cur_thread->free_rgi(rgi); cur_thread->free_qev(qev); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 239818855b8..2604cd98527 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -39,9 +39,12 @@ struct inuse_relaylog; rpl_parallel_entry::count_committing_event_groups has reached gco->next_gco->wait_count. - - When gco->wait_count is reached for a worker and the wait completes, - the worker frees gco->prev_gco; at this point it is guaranteed not to - be needed any longer. + - The gco lives until all its event groups have completed their commit. + This is detected by rpl_parallel_entry::last_committed_sub_id being + greater than or equal gco->last_sub_id. Once this happens, the gco is + freed. Note that since update of last_committed_sub_id can happen + out-of-order, the thread that frees a given gco can be for any later + event group, not necessarily an event group from the gco being freed. */ struct group_commit_orderer { /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ @@ -49,6 +52,16 @@ struct group_commit_orderer { uint64 wait_count; group_commit_orderer *prev_gco; group_commit_orderer *next_gco; + /* + The sub_id of last event group in this the previous GCO. + Only valid if prev_gco != NULL. + */ + uint64 prior_sub_id; + /* + The sub_id of the last event group in this GCO. Only valid when next_gco + is non-NULL. + */ + uint64 last_sub_id; bool installed; }; @@ -168,7 +181,8 @@ struct rpl_parallel_thread { LOCK_rpl_thread mutex. */ void free_rgi(rpl_group_info *rgi); - group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); + group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev, + uint64 first_sub_id); /* Put a gco on the local free list, to be later released to the global free list by batch_free(). diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 629e046ed0a..a751dd16650 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1849,11 +1849,20 @@ void rpl_group_info::slave_close_thread_tables(THD *thd) static void -mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco) +mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco, + rpl_group_info *rgi) { + group_commit_orderer *tmp; uint64 count= ++e->count_committing_event_groups; - if (gco->next_gco && gco->next_gco->wait_count == count) - mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer); + /* Signal any following GCO whose wait_count has been reached now. */ + tmp= gco; + while ((tmp= tmp->next_gco)) + { + uint64 wait_count= tmp->wait_count; + if (wait_count > count) + break; + mysql_cond_broadcast(&tmp->COND_group_commit_orderer); + } } @@ -1862,7 +1871,7 @@ rpl_group_info::mark_start_commit_no_lock() { if (did_mark_start_commit) return; - mark_start_commit_inner(parallel_entry, gco); + mark_start_commit_inner(parallel_entry, gco, this); did_mark_start_commit= true; } @@ -1877,7 +1886,7 @@ rpl_group_info::mark_start_commit() e= this->parallel_entry; mysql_mutex_lock(&e->LOCK_parallel_entry); - mark_start_commit_inner(e, gco); + mark_start_commit_inner(e, gco, this); mysql_mutex_unlock(&e->LOCK_parallel_entry); did_mark_start_commit= true; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 9885417aa3f..fb4e3261468 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -563,6 +563,10 @@ struct rpl_group_info (When we execute in parallel the transactions that group committed together on the master, we still need to wait for any prior transactions to have reached the commit stage). + + The pointed-to gco is only valid for as long as + gtid_sub_id < parallel_entry->last_committed_sub_id. After that, it can + be freed by another thread. */ group_commit_orderer *gco; |