summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/handler.cc2
-rw-r--r--sql/rpl_parallel.cc23
-rw-r--r--sql/rpl_rli.cc4
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/sql_class.cc24
-rw-r--r--sql/sql_class.h6
6 files changed, 32 insertions, 28 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 87fa6f9e959..7eaaaf63f00 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1893,7 +1893,7 @@ int ha_rollback_trans(THD *thd, bool all)
trans == &thd->transaction.stmt);
#ifdef HAVE_REPLICATION
- if (is_real_trans && !(thd->rgi_slave && thd->rgi_slave->aborted))
+ if (is_real_trans)
{
/*
In parallel replication, if we need to rollback during commit, we must
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index a87cd0b50a3..29cda30738c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -173,7 +173,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
mark_start_commit() calls can be made and it is safe to de-allocate
the GCO.
*/
- err= wfc->wait_for_prior_commit(thd);
+ err= wfc->wait_for_prior_commit(thd, true);
if (unlikely(err) && !rgi->worker_error)
signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
@@ -304,8 +304,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2"));
}});
- if (!rgi->aborted)
- rgi->unmark_start_commit();
+ rgi->unmark_start_commit();
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
@@ -1391,25 +1390,14 @@ handle_rpl_parallel_thread(void *arg)
if (!err)
#endif
{
- if (unlikely(thd->check_killed()))
- {
-
- thd->clear_error();
- thd->get_stmt_da()->reset_diagnostics_area();
- thd->send_kill_message();
- err= 1;
- }
- else if ((entry->stop_abrupt(rgi->rli) &&
- entry->rgi_is_safe_to_terminate(rgi)))
+ if (unlikely(thd->check_killed()) ||
+ (entry->stop_abrupt(rgi->rli) &&
+ entry->rgi_is_safe_to_terminate(rgi)))
{
- /*
- Temporarily separated branch, will merge with above
- */
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message();
err= 1;
- rgi->aborted= true;
}
else
err= rpt_handle_event(qev, rpt);
@@ -1990,7 +1978,6 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
- rgi->aborted= 0;
return rgi;
}
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 5077f2fa1c8..48fd9b1f435 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2147,7 +2147,6 @@ rpl_group_info::reinit(Relay_log_info *rli)
did_mark_start_commit= false;
gtid_ev_flags2= 0;
last_master_timestamp = 0;
- aborted= 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
@@ -2156,8 +2155,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
- aborted(0)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 89079e593d0..1391c5cde82 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -749,7 +749,6 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
- bool aborted;
/* When gtid_pending is true, we have not yet done record_gtid(). */
bool gtid_pending;
int worker_error;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 73bb654080a..8f85adbb536 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -7614,7 +7614,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
*/
int
-wait_for_commit::wait_for_prior_commit2(THD *thd)
+wait_for_commit::wait_for_prior_commit2(THD *thd, bool force_wait)
{
PSI_stage_info old_stage;
wait_for_commit *loc_waitee;
@@ -7639,7 +7639,7 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
&stage_waiting_for_prior_transaction_to_commit,
&old_stage);
while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) &&
- likely(!thd->check_killed(1)))
+ (likely(!thd->check_killed(1)) || force_wait))
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
if (!loc_waitee)
{
@@ -7647,6 +7647,23 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
+#ifdef HAVE_REPLICATION
+ {
+ DBUG_ASSERT(thd->rgi_slave && thd->rgi_slave->parallel_entry);
+ rpl_group_info *rgi= thd->rgi_slave;
+ rpl_parallel_entry *e= rgi->parallel_entry;
+ group_commit_orderer *gco= rgi->gco;
+
+ /*
+ If the wait is interrupted by kill but there is a GCO which is still
+ finalizing its commit, we can't yet unregister our waitee, as the order
+ for garbage collection of GCO's needs to still be maintained.
+ */
+ if (gco->prev_gco &&
+ (e->last_committed_sub_id <= gco->prev_gco->last_sub_id))
+ goto end_kill_message;
+ }
+#endif
/*
Wait was interrupted by kill. We need to unregister our wait and give the
error. But if a wakeup is already in progress, then we must ignore the
@@ -7671,6 +7688,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
this->waitee.store(NULL, std::memory_order_relaxed);
+#ifdef HAVE_REPLICATION
+end_kill_message:
+#endif
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index d0c3e0244e7..a07805140e3 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2144,14 +2144,14 @@ struct wait_for_commit
bool commit_started;
void register_wait_for_prior_commit(wait_for_commit *waitee);
- int wait_for_prior_commit(THD *thd)
+ int wait_for_prior_commit(THD *thd, bool force_wait= false)
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waitee.load(std::memory_order_acquire))
- return wait_for_prior_commit2(thd);
+ return wait_for_prior_commit2(thd, force_wait);
else
{
if (wakeup_error)
@@ -2205,7 +2205,7 @@ struct wait_for_commit
void wakeup(int wakeup_error);
- int wait_for_prior_commit2(THD *thd);
+ int wait_for_prior_commit2(THD *thd, bool force_wait= false);
void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();