diff options
-rw-r--r-- | sql/rpl_parallel.cc | 124 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 7 |
2 files changed, 96 insertions, 35 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 8328dd24128..cac47c56f3b 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) } +static void +finish_event_group(THD *thd, int err, uint64 sub_id, + rpl_parallel_entry *entry, wait_for_commit *wfc) +{ + /* + Remove any left-over registration to wait for a prior commit to + complete. Normally, such wait would already have been removed at + this point by wait_for_prior_commit(), but eg. in error case we + might have skipped waiting, so we would need to remove it explicitly. + */ + wfc->unregister_wait_for_prior_commit(); + thd->wait_for_commit_ptr= NULL; + + /* + Record that this event group has finished (eg. transaction is + committed, if transactional), so other event groups will no longer + attempt to wait for us to commit. Once we have increased + entry->last_committed_sub_id, no other threads will execute + register_wait_for_prior_commit() against us. Thus, by doing one + extra (usually redundant) wakeup_subsequent_commits() we can ensure + that no register_wait_for_prior_commit() can ever happen without a + subsequent wakeup_subsequent_commits() to wake it up. + + We can race here with the next transactions, but that is fine, as + long as we check that we do not decrease last_committed_sub_id. If + this commit is done, then any prior commits will also have been + done and also no longer need waiting for. + */ + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (entry->last_committed_sub_id < sub_id) + { + entry->last_committed_sub_id= sub_id; + mysql_cond_broadcast(&entry->COND_parallel_entry); + } + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + wfc->wakeup_subsequent_commits(err); +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; + rpl_group_info *group_rgi= NULL; uint64 event_gtid_sub_id= 0; int err; @@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg) old_msg= thd->proc_info; thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, "Waiting for work from SQL thread"); - while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) + while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed && + !(rpt->current_entry && rpt->current_entry->force_abort)) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); rpt->dequeue(events); thd->exit_cond(old_msg); @@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg) } err= 0; + group_rgi= rgi; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { @@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - - /* - Remove any left-over registration to wait for a prior commit to - complete. Normally, such wait would already have been removed at - this point by wait_for_prior_commit(), but eg. in error case we - might have skipped waiting, so we would need to remove it explicitly. - */ - rgi->commit_orderer.unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr= NULL; - - /* - Record that this event group has finished (eg. transaction is - committed, if transactional), so other event groups will no longer - attempt to wait for us to commit. Once we have increased - entry->last_committed_sub_id, no other threads will execute - register_wait_for_prior_commit() against us. Thus, by doing one - extra (usually redundant) wakeup_subsequent_commits() we can ensure - that no register_wait_for_prior_commit() can ever happen without a - subsequent wakeup_subsequent_commits() to wake it up. - - We can race here with the next transactions, but that is fine, as - long as we check that we do not decrease last_committed_sub_id. If - this commit is done, then any prior commits will also have been - done and also no longer need waiting for. - */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (entry->last_committed_sub_id < event_gtid_sub_id) - { - entry->last_committed_sub_id= event_gtid_sub_id; - mysql_cond_broadcast(&entry->COND_parallel_entry); - } - mysql_mutex_unlock(&entry->LOCK_parallel_entry); - - rgi->commit_orderer.wakeup_subsequent_commits(err); + finish_event_group(thd, err, event_gtid_sub_id, entry, + &rgi->commit_orderer); delete rgi; + group_rgi= rgi= NULL; } events= next; @@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg) goto more_events; } + if (in_event_group && group_rgi->parallel_entry->force_abort) + { + /* + We are asked to abort, without getting the remaining events in the + current event group. + + We have to rollback the current transaction and update the last + sub_id value so that SQL thread will know we are done with the + half-processed event group. + */ + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + group_rgi->is_error= true; + finish_event_group(thd, 1, group_rgi->gtid_sub_id, + group_rgi->parallel_entry, &group_rgi->commit_orderer); + group_rgi->cleanup_context(thd, true); + group_rgi->rli->abort_slave= true; + in_event_group= false; + delete group_rgi; + group_rgi= NULL; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + } if (!in_event_group) { rpt->current_entry= NULL; @@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id) MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); } + else + e->force_abort= false; return e; } @@ -657,6 +692,25 @@ rpl_parallel::wait_for_done() struct rpl_parallel_entry *e; uint32 i; + /* + First signal all workers that they must force quit; no more events will + be queued to complete any partial event groups executed. + */ + for (i= 0; i < domain_hash.records; ++i) + { + rpl_parallel_thread *rpt; + + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + e->force_abort= true; + if ((rpt= e->rpl_thread)) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (rpt->current_entry == e) + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + } + for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 0b9619e5e83..0e88e09652b 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -76,6 +76,13 @@ struct rpl_parallel_entry { uint64 last_seq_no; uint64 last_commit_id; bool active; + /* + Set when SQL thread is shutting down, and no more events can be processed, + so worker threads must force abort any current transactions without + waiting for event groups to complete. + */ + bool force_abort; + rpl_parallel_thread *rpl_thread; /* The sub_id of the last transaction to commit within this domain_id. |