diff options
-rw-r--r-- | sql/handler.cc | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 23 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.h | 1 | ||||
-rw-r--r-- | sql/sql_class.cc | 24 | ||||
-rw-r--r-- | sql/sql_class.h | 6 |
6 files changed, 32 insertions, 28 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 87fa6f9e959..7eaaaf63f00 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1893,7 +1893,7 @@ int ha_rollback_trans(THD *thd, bool all) trans == &thd->transaction.stmt); #ifdef HAVE_REPLICATION - if (is_real_trans && !(thd->rgi_slave && thd->rgi_slave->aborted)) + if (is_real_trans) { /* In parallel replication, if we need to rollback during commit, we must diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index a87cd0b50a3..29cda30738c 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -173,7 +173,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, mark_start_commit() calls can be made and it is safe to de-allocate the GCO. */ - err= wfc->wait_for_prior_commit(thd); + err= wfc->wait_for_prior_commit(thd, true); if (unlikely(err) && !rgi->worker_error) signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; @@ -304,8 +304,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err) debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2")); }}); - if (!rgi->aborted) - rgi->unmark_start_commit(); + rgi->unmark_start_commit(); rgi->cleanup_context(thd, true); rgi->rli->abort_slave= true; @@ -1391,25 +1390,14 @@ handle_rpl_parallel_thread(void *arg) if (!err) #endif { - if (unlikely(thd->check_killed())) - { - - thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); - thd->send_kill_message(); - err= 1; - } - else if ((entry->stop_abrupt(rgi->rli) && - entry->rgi_is_safe_to_terminate(rgi))) + if (unlikely(thd->check_killed()) || + (entry->stop_abrupt(rgi->rli) && + entry->rgi_is_safe_to_terminate(rgi))) { - /* - Temporarily separated branch, will merge with above - */ thd->clear_error(); thd->get_stmt_da()->reset_diagnostics_area(); thd->send_kill_message(); err= 1; - rgi->aborted= true; } else err= rpt_handle_event(qev, rpt); @@ -1990,7 +1978,6 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE; - rgi->aborted= 0; return rgi; } diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 5077f2fa1c8..48fd9b1f435 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -2147,7 +2147,6 @@ rpl_group_info::reinit(Relay_log_info *rli) did_mark_start_commit= false; gtid_ev_flags2= 0; last_master_timestamp = 0; - aborted= 0; gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; speculation= SPECULATE_NO; commit_orderer.reinit(); @@ -2156,8 +2155,7 @@ rpl_group_info::reinit(Relay_log_info *rli) rpl_group_info::rpl_group_info(Relay_log_info *rli) : thd(0), wait_commit_sub_id(0), wait_commit_group_info(0), parallel_entry(0), - deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false), - aborted(0) + deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { reinit(rli); bzero(¤t_gtid, sizeof(current_gtid)); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 89079e593d0..1391c5cde82 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -749,7 +749,6 @@ struct rpl_group_info */ char future_event_master_log_name[FN_REFLEN]; bool is_parallel_exec; - bool aborted; /* When gtid_pending is true, we have not yet done record_gtid(). */ bool gtid_pending; int worker_error; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 73bb654080a..8f85adbb536 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -7614,7 +7614,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) */ int -wait_for_commit::wait_for_prior_commit2(THD *thd) +wait_for_commit::wait_for_prior_commit2(THD *thd, bool force_wait) { PSI_stage_info old_stage; wait_for_commit *loc_waitee; @@ -7639,7 +7639,7 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) &stage_waiting_for_prior_transaction_to_commit, &old_stage); while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) && - likely(!thd->check_killed(1))) + (likely(!thd->check_killed(1)) || force_wait)) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); if (!loc_waitee) { @@ -7647,6 +7647,23 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); goto end; } +#ifdef HAVE_REPLICATION + { + DBUG_ASSERT(thd->rgi_slave && thd->rgi_slave->parallel_entry); + rpl_group_info *rgi= thd->rgi_slave; + rpl_parallel_entry *e= rgi->parallel_entry; + group_commit_orderer *gco= rgi->gco; + + /* + If the wait is interrupted by kill but there is a GCO which is still + finalizing its commit, we can't yet unregister our waitee, as the order + for garbage collection of GCO's needs to still be maintained. + */ + if (gco->prev_gco && + (e->last_committed_sub_id <= gco->prev_gco->last_sub_id)) + goto end_kill_message; + } +#endif /* Wait was interrupted by kill. We need to unregister our wait and give the error. But if a wakeup is already in progress, then we must ignore the @@ -7671,6 +7688,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); this->waitee.store(NULL, std::memory_order_relaxed); +#ifdef HAVE_REPLICATION +end_kill_message: +#endif wakeup_error= thd->killed_errno(); if (!wakeup_error) wakeup_error= ER_QUERY_INTERRUPTED; diff --git a/sql/sql_class.h b/sql/sql_class.h index d0c3e0244e7..a07805140e3 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2144,14 +2144,14 @@ struct wait_for_commit bool commit_started; void register_wait_for_prior_commit(wait_for_commit *waitee); - int wait_for_prior_commit(THD *thd) + int wait_for_prior_commit(THD *thd, bool force_wait= false) { /* Quick inline check, to avoid function call and locking in the common case where no wakeup is registered, or a registered wait was already signalled. */ if (waitee.load(std::memory_order_acquire)) - return wait_for_prior_commit2(thd); + return wait_for_prior_commit2(thd, force_wait); else { if (wakeup_error) @@ -2205,7 +2205,7 @@ struct wait_for_commit void wakeup(int wakeup_error); - int wait_for_prior_commit2(THD *thd); + int wait_for_prior_commit2(THD *thd, bool force_wait= false); void wakeup_subsequent_commits2(int wakeup_error); void unregister_wait_for_prior_commit2(); |