diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-11-05 12:01:26 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-11-05 12:01:26 +0100 |
commit | c834242ad42e3b7a4ed3a1a5de086754acbe52a1 (patch) | |
tree | 026d77a1e29793353622bee3da0724c753fca604 /sql/rpl_parallel.cc | |
parent | bf603250b02c936a271d628c93078cba3d081823 (diff) | |
download | mariadb-git-c834242ad42e3b7a4ed3a1a5de086754acbe52a1.tar.gz |
MDEV-4506: Parallel replication
MDEV-5217: SQL thread hangs during stop if error occurs in the middle of an event group
Normally, when we stop the slave SQL thread in parallel replication, we want
the worker threads to continue processing events until the end of the current
event group. But if we stop due to an error that prevents further events from
being queued, such as an error reading the relay log, no more events can be
queued for the workers, so they have to abort even if they are in the middle
of an event group. There was a bug that we would deadlock, the workers
waiting for more events to be queued for the event group, the SQL thread
stopped and waiting for the workers to complete their current event group
before exiting.
Fixed by now signalling from the SQL thread to all workers when it is about
to exit, and cleaning up in all workers when so signalled.
This patch fixes one of multiple problems reported in MDEV-5217.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 124 |
1 files changed, 89 insertions, 35 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 8328dd24128..cac47c56f3b 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) } +static void +finish_event_group(THD *thd, int err, uint64 sub_id, + rpl_parallel_entry *entry, wait_for_commit *wfc) +{ + /* + Remove any left-over registration to wait for a prior commit to + complete. Normally, such wait would already have been removed at + this point by wait_for_prior_commit(), but eg. in error case we + might have skipped waiting, so we would need to remove it explicitly. + */ + wfc->unregister_wait_for_prior_commit(); + thd->wait_for_commit_ptr= NULL; + + /* + Record that this event group has finished (eg. transaction is + committed, if transactional), so other event groups will no longer + attempt to wait for us to commit. Once we have increased + entry->last_committed_sub_id, no other threads will execute + register_wait_for_prior_commit() against us. Thus, by doing one + extra (usually redundant) wakeup_subsequent_commits() we can ensure + that no register_wait_for_prior_commit() can ever happen without a + subsequent wakeup_subsequent_commits() to wake it up. + + We can race here with the next transactions, but that is fine, as + long as we check that we do not decrease last_committed_sub_id. If + this commit is done, then any prior commits will also have been + done and also no longer need waiting for. + */ + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (entry->last_committed_sub_id < sub_id) + { + entry->last_committed_sub_id= sub_id; + mysql_cond_broadcast(&entry->COND_parallel_entry); + } + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + wfc->wakeup_subsequent_commits(err); +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; + rpl_group_info *group_rgi= NULL; uint64 event_gtid_sub_id= 0; int err; @@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg) old_msg= thd->proc_info; thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, "Waiting for work from SQL thread"); - while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) + while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed && + !(rpt->current_entry && rpt->current_entry->force_abort)) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); rpt->dequeue(events); thd->exit_cond(old_msg); @@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg) } err= 0; + group_rgi= rgi; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { @@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - - /* - Remove any left-over registration to wait for a prior commit to - complete. Normally, such wait would already have been removed at - this point by wait_for_prior_commit(), but eg. in error case we - might have skipped waiting, so we would need to remove it explicitly. - */ - rgi->commit_orderer.unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr= NULL; - - /* - Record that this event group has finished (eg. transaction is - committed, if transactional), so other event groups will no longer - attempt to wait for us to commit. Once we have increased - entry->last_committed_sub_id, no other threads will execute - register_wait_for_prior_commit() against us. Thus, by doing one - extra (usually redundant) wakeup_subsequent_commits() we can ensure - that no register_wait_for_prior_commit() can ever happen without a - subsequent wakeup_subsequent_commits() to wake it up. - - We can race here with the next transactions, but that is fine, as - long as we check that we do not decrease last_committed_sub_id. If - this commit is done, then any prior commits will also have been - done and also no longer need waiting for. - */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (entry->last_committed_sub_id < event_gtid_sub_id) - { - entry->last_committed_sub_id= event_gtid_sub_id; - mysql_cond_broadcast(&entry->COND_parallel_entry); - } - mysql_mutex_unlock(&entry->LOCK_parallel_entry); - - rgi->commit_orderer.wakeup_subsequent_commits(err); + finish_event_group(thd, err, event_gtid_sub_id, entry, + &rgi->commit_orderer); delete rgi; + group_rgi= rgi= NULL; } events= next; @@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg) goto more_events; } + if (in_event_group && group_rgi->parallel_entry->force_abort) + { + /* + We are asked to abort, without getting the remaining events in the + current event group. + + We have to rollback the current transaction and update the last + sub_id value so that SQL thread will know we are done with the + half-processed event group. + */ + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + group_rgi->is_error= true; + finish_event_group(thd, 1, group_rgi->gtid_sub_id, + group_rgi->parallel_entry, &group_rgi->commit_orderer); + group_rgi->cleanup_context(thd, true); + group_rgi->rli->abort_slave= true; + in_event_group= false; + delete group_rgi; + group_rgi= NULL; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + } if (!in_event_group) { rpt->current_entry= NULL; @@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id) MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); } + else + e->force_abort= false; return e; } @@ -657,6 +692,25 @@ rpl_parallel::wait_for_done() struct rpl_parallel_entry *e; uint32 i; + /* + First signal all workers that they must force quit; no more events will + be queued to complete any partial event groups executed. + */ + for (i= 0; i < domain_hash.records; ++i) + { + rpl_parallel_thread *rpt; + + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + e->force_abort= true; + if ((rpt= e->rpl_thread)) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (rpt->current_entry == e) + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + } + for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); |