summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2023-05-15 15:29:26 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2023-05-17 07:38:34 -0600
commit306161d0c42430e6d0e1b900d4ab3eda773dbe76 (patch)
tree8adce7199e590b54a630e70c32f7235f74d1ebde
parent1dd7a094f64201b2def98885e2fb459aac63c1fc (diff)
downloadmariadb-git-bb-10.4-MDEV-13915-leader-abort.tar.gz
MDEV-13915: Previous GCO freed while in usebb-10.4-MDEV-13915-leader-abort
If a future transaction in do_gco_wait is killed, it will skip all future waits and finish up, including the garbage collection of previous GCOs, which may still be in their commit phase. This incremental patch fixes this logic so transactions from future GCOs will not remove their wait conditions, and if it is the last wait_for_prior_commit call in finish_event_group, it will wait for the commit before moving on to garbage collection.
-rw-r--r--sql/handler.cc2
-rw-r--r--sql/rpl_parallel.cc23
-rw-r--r--sql/rpl_rli.cc4
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/sql_class.cc24
-rw-r--r--sql/sql_class.h6
6 files changed, 32 insertions, 28 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 87fa6f9e959..7eaaaf63f00 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1893,7 +1893,7 @@ int ha_rollback_trans(THD *thd, bool all)
trans == &thd->transaction.stmt);
#ifdef HAVE_REPLICATION
- if (is_real_trans && !(thd->rgi_slave && thd->rgi_slave->aborted))
+ if (is_real_trans)
{
/*
In parallel replication, if we need to rollback during commit, we must
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index a87cd0b50a3..29cda30738c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -173,7 +173,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
mark_start_commit() calls can be made and it is safe to de-allocate
the GCO.
*/
- err= wfc->wait_for_prior_commit(thd);
+ err= wfc->wait_for_prior_commit(thd, true);
if (unlikely(err) && !rgi->worker_error)
signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
@@ -304,8 +304,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2"));
}});
- if (!rgi->aborted)
- rgi->unmark_start_commit();
+ rgi->unmark_start_commit();
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
@@ -1391,25 +1390,14 @@ handle_rpl_parallel_thread(void *arg)
if (!err)
#endif
{
- if (unlikely(thd->check_killed()))
- {
-
- thd->clear_error();
- thd->get_stmt_da()->reset_diagnostics_area();
- thd->send_kill_message();
- err= 1;
- }
- else if ((entry->stop_abrupt(rgi->rli) &&
- entry->rgi_is_safe_to_terminate(rgi)))
+ if (unlikely(thd->check_killed()) ||
+ (entry->stop_abrupt(rgi->rli) &&
+ entry->rgi_is_safe_to_terminate(rgi)))
{
- /*
- Temporarily separated branch, will merge with above
- */
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message();
err= 1;
- rgi->aborted= true;
}
else
err= rpt_handle_event(qev, rpt);
@@ -1990,7 +1978,6 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
- rgi->aborted= 0;
return rgi;
}
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 5077f2fa1c8..48fd9b1f435 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2147,7 +2147,6 @@ rpl_group_info::reinit(Relay_log_info *rli)
did_mark_start_commit= false;
gtid_ev_flags2= 0;
last_master_timestamp = 0;
- aborted= 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
@@ -2156,8 +2155,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
- aborted(0)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 89079e593d0..1391c5cde82 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -749,7 +749,6 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
- bool aborted;
/* When gtid_pending is true, we have not yet done record_gtid(). */
bool gtid_pending;
int worker_error;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 73bb654080a..8f85adbb536 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -7614,7 +7614,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
*/
int
-wait_for_commit::wait_for_prior_commit2(THD *thd)
+wait_for_commit::wait_for_prior_commit2(THD *thd, bool force_wait)
{
PSI_stage_info old_stage;
wait_for_commit *loc_waitee;
@@ -7639,7 +7639,7 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
&stage_waiting_for_prior_transaction_to_commit,
&old_stage);
while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) &&
- likely(!thd->check_killed(1)))
+ (likely(!thd->check_killed(1)) || force_wait))
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
if (!loc_waitee)
{
@@ -7647,6 +7647,23 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
+#ifdef HAVE_REPLICATION
+ {
+ DBUG_ASSERT(thd->rgi_slave && thd->rgi_slave->parallel_entry);
+ rpl_group_info *rgi= thd->rgi_slave;
+ rpl_parallel_entry *e= rgi->parallel_entry;
+ group_commit_orderer *gco= rgi->gco;
+
+ /*
+ If the wait is interrupted by kill but there is a GCO which is still
+ finalizing its commit, we can't yet unregister our waitee, as the order
+ for garbage collection of GCO's needs to still be maintained.
+ */
+ if (gco->prev_gco &&
+ (e->last_committed_sub_id <= gco->prev_gco->last_sub_id))
+ goto end_kill_message;
+ }
+#endif
/*
Wait was interrupted by kill. We need to unregister our wait and give the
error. But if a wakeup is already in progress, then we must ignore the
@@ -7671,6 +7688,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
this->waitee.store(NULL, std::memory_order_relaxed);
+#ifdef HAVE_REPLICATION
+end_kill_message:
+#endif
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index d0c3e0244e7..a07805140e3 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2144,14 +2144,14 @@ struct wait_for_commit
bool commit_started;
void register_wait_for_prior_commit(wait_for_commit *waitee);
- int wait_for_prior_commit(THD *thd)
+ int wait_for_prior_commit(THD *thd, bool force_wait= false)
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waitee.load(std::memory_order_acquire))
- return wait_for_prior_commit2(thd);
+ return wait_for_prior_commit2(thd, force_wait);
else
{
if (wakeup_error)
@@ -2205,7 +2205,7 @@ struct wait_for_commit
void wakeup(int wakeup_error);
- int wait_for_prior_commit2(THD *thd);
+ int wait_for_prior_commit2(THD *thd, bool force_wait= false);
void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();