summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/rpl_parallel.cc124
-rw-r--r--sql/rpl_parallel.h7
2 files changed, 96 insertions, 35 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 8328dd24128..cac47c56f3b 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
}
+static void
+finish_event_group(THD *thd, int err, uint64 sub_id,
+ rpl_parallel_entry *entry, wait_for_commit *wfc)
+{
+ /*
+ Remove any left-over registration to wait for a prior commit to
+ complete. Normally, such wait would already have been removed at
+ this point by wait_for_prior_commit(), but eg. in error case we
+ might have skipped waiting, so we would need to remove it explicitly.
+ */
+ wfc->unregister_wait_for_prior_commit();
+ thd->wait_for_commit_ptr= NULL;
+
+ /*
+ Record that this event group has finished (eg. transaction is
+ committed, if transactional), so other event groups will no longer
+ attempt to wait for us to commit. Once we have increased
+ entry->last_committed_sub_id, no other threads will execute
+ register_wait_for_prior_commit() against us. Thus, by doing one
+ extra (usually redundant) wakeup_subsequent_commits() we can ensure
+ that no register_wait_for_prior_commit() can ever happen without a
+ subsequent wakeup_subsequent_commits() to wake it up.
+
+ We can race here with the next transactions, but that is fine, as
+ long as we check that we do not decrease last_committed_sub_id. If
+ this commit is done, then any prior commits will also have been
+ done and also no longer need waiting for.
+ */
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (entry->last_committed_sub_id < sub_id)
+ {
+ entry->last_committed_sub_id= sub_id;
+ mysql_cond_broadcast(&entry->COND_parallel_entry);
+ }
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ wfc->wakeup_subsequent_commits(err);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ rpl_group_info *group_rgi= NULL;
uint64 event_gtid_sub_id= 0;
int err;
@@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg)
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
+ while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
+ !(rpt->current_entry && rpt->current_entry->force_abort))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->dequeue(events);
thd->exit_cond(old_msg);
@@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg)
}
err= 0;
+ group_rgi= rgi;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
@@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
-
- /*
- Remove any left-over registration to wait for a prior commit to
- complete. Normally, such wait would already have been removed at
- this point by wait_for_prior_commit(), but eg. in error case we
- might have skipped waiting, so we would need to remove it explicitly.
- */
- rgi->commit_orderer.unregister_wait_for_prior_commit();
- thd->wait_for_commit_ptr= NULL;
-
- /*
- Record that this event group has finished (eg. transaction is
- committed, if transactional), so other event groups will no longer
- attempt to wait for us to commit. Once we have increased
- entry->last_committed_sub_id, no other threads will execute
- register_wait_for_prior_commit() against us. Thus, by doing one
- extra (usually redundant) wakeup_subsequent_commits() we can ensure
- that no register_wait_for_prior_commit() can ever happen without a
- subsequent wakeup_subsequent_commits() to wake it up.
-
- We can race here with the next transactions, but that is fine, as
- long as we check that we do not decrease last_committed_sub_id. If
- this commit is done, then any prior commits will also have been
- done and also no longer need waiting for.
- */
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (entry->last_committed_sub_id < event_gtid_sub_id)
- {
- entry->last_committed_sub_id= event_gtid_sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
-
- rgi->commit_orderer.wakeup_subsequent_commits(err);
+ finish_event_group(thd, err, event_gtid_sub_id, entry,
+ &rgi->commit_orderer);
delete rgi;
+ group_rgi= rgi= NULL;
}
events= next;
@@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg)
goto more_events;
}
+ if (in_event_group && group_rgi->parallel_entry->force_abort)
+ {
+ /*
+ We are asked to abort, without getting the remaining events in the
+ current event group.
+
+ We have to rollback the current transaction and update the last
+ sub_id value so that SQL thread will know we are done with the
+ half-processed event group.
+ */
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ group_rgi->is_error= true;
+ finish_event_group(thd, 1, group_rgi->gtid_sub_id,
+ group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->cleanup_context(thd, true);
+ group_rgi->rli->abort_slave= true;
+ in_event_group= false;
+ delete group_rgi;
+ group_rgi= NULL;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ }
if (!in_event_group)
{
rpt->current_entry= NULL;
@@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id)
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
}
+ else
+ e->force_abort= false;
return e;
}
@@ -657,6 +692,25 @@ rpl_parallel::wait_for_done()
struct rpl_parallel_entry *e;
uint32 i;
+ /*
+ First signal all workers that they must force quit; no more events will
+ be queued to complete any partial event groups executed.
+ */
+ for (i= 0; i < domain_hash.records; ++i)
+ {
+ rpl_parallel_thread *rpt;
+
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ e->force_abort= true;
+ if ((rpt= e->rpl_thread))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_entry == e)
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
+
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 0b9619e5e83..0e88e09652b 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -76,6 +76,13 @@ struct rpl_parallel_entry {
uint64 last_seq_no;
uint64 last_commit_id;
bool active;
+ /*
+ Set when SQL thread is shutting down, and no more events can be processed,
+ so worker threads must force abort any current transactions without
+ waiting for event groups to complete.
+ */
+ bool force_abort;
+
rpl_parallel_thread *rpl_thread;
/*
The sub_id of the last transaction to commit within this domain_id.