summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-11-05 12:01:26 +0100
committerunknown <knielsen@knielsen-hq.org>2013-11-05 12:01:26 +0100
commitc834242ad42e3b7a4ed3a1a5de086754acbe52a1 (patch)
tree026d77a1e29793353622bee3da0724c753fca604 /sql/rpl_parallel.cc
parentbf603250b02c936a271d628c93078cba3d081823 (diff)
downloadmariadb-git-c834242ad42e3b7a4ed3a1a5de086754acbe52a1.tar.gz
MDEV-4506: Parallel replication
MDEV-5217: SQL thread hangs during stop if error occurs in the middle of an event group Normally, when we stop the slave SQL thread in parallel replication, we want the worker threads to continue processing events until the end of the current event group. But if we stop due to an error that prevents further events from being queued, such as an error reading the relay log, no more events can be queued for the workers, so they have to abort even if they are in the middle of an event group. There was a bug that we would deadlock, the workers waiting for more events to be queued for the event group, the SQL thread stopped and waiting for the workers to complete their current event group before exiting. Fixed by now signalling from the SQL thread to all workers when it is about to exit, and cleaning up in all workers when so signalled. This patch fixes one of multiple problems reported in MDEV-5217.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc124
1 files changed, 89 insertions, 35 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 8328dd24128..cac47c56f3b 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -120,6 +120,46 @@ sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
}
+static void
+finish_event_group(THD *thd, int err, uint64 sub_id,
+ rpl_parallel_entry *entry, wait_for_commit *wfc)
+{
+ /*
+ Remove any left-over registration to wait for a prior commit to
+ complete. Normally, such wait would already have been removed at
+ this point by wait_for_prior_commit(), but eg. in error case we
+ might have skipped waiting, so we would need to remove it explicitly.
+ */
+ wfc->unregister_wait_for_prior_commit();
+ thd->wait_for_commit_ptr= NULL;
+
+ /*
+ Record that this event group has finished (eg. transaction is
+ committed, if transactional), so other event groups will no longer
+ attempt to wait for us to commit. Once we have increased
+ entry->last_committed_sub_id, no other threads will execute
+ register_wait_for_prior_commit() against us. Thus, by doing one
+ extra (usually redundant) wakeup_subsequent_commits() we can ensure
+ that no register_wait_for_prior_commit() can ever happen without a
+ subsequent wakeup_subsequent_commits() to wake it up.
+
+ We can race here with the next transactions, but that is fine, as
+ long as we check that we do not decrease last_committed_sub_id. If
+ this commit is done, then any prior commits will also have been
+ done and also no longer need waiting for.
+ */
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (entry->last_committed_sub_id < sub_id)
+ {
+ entry->last_committed_sub_id= sub_id;
+ mysql_cond_broadcast(&entry->COND_parallel_entry);
+ }
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ wfc->wakeup_subsequent_commits(err);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -128,6 +168,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ rpl_group_info *group_rgi= NULL;
uint64 event_gtid_sub_id= 0;
int err;
@@ -174,7 +215,8 @@ handle_rpl_parallel_thread(void *arg)
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
+ while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
+ !(rpt->current_entry && rpt->current_entry->force_abort))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->dequeue(events);
thd->exit_cond(old_msg);
@@ -200,6 +242,7 @@ handle_rpl_parallel_thread(void *arg)
}
err= 0;
+ group_rgi= rgi;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
@@ -295,41 +338,10 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
-
- /*
- Remove any left-over registration to wait for a prior commit to
- complete. Normally, such wait would already have been removed at
- this point by wait_for_prior_commit(), but eg. in error case we
- might have skipped waiting, so we would need to remove it explicitly.
- */
- rgi->commit_orderer.unregister_wait_for_prior_commit();
- thd->wait_for_commit_ptr= NULL;
-
- /*
- Record that this event group has finished (eg. transaction is
- committed, if transactional), so other event groups will no longer
- attempt to wait for us to commit. Once we have increased
- entry->last_committed_sub_id, no other threads will execute
- register_wait_for_prior_commit() against us. Thus, by doing one
- extra (usually redundant) wakeup_subsequent_commits() we can ensure
- that no register_wait_for_prior_commit() can ever happen without a
- subsequent wakeup_subsequent_commits() to wake it up.
-
- We can race here with the next transactions, but that is fine, as
- long as we check that we do not decrease last_committed_sub_id. If
- this commit is done, then any prior commits will also have been
- done and also no longer need waiting for.
- */
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (entry->last_committed_sub_id < event_gtid_sub_id)
- {
- entry->last_committed_sub_id= event_gtid_sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
-
- rgi->commit_orderer.wakeup_subsequent_commits(err);
+ finish_event_group(thd, err, event_gtid_sub_id, entry,
+ &rgi->commit_orderer);
delete rgi;
+ group_rgi= rgi= NULL;
}
events= next;
@@ -349,6 +361,27 @@ handle_rpl_parallel_thread(void *arg)
goto more_events;
}
+ if (in_event_group && group_rgi->parallel_entry->force_abort)
+ {
+ /*
+ We are asked to abort, without getting the remaining events in the
+ current event group.
+
+ We have to rollback the current transaction and update the last
+ sub_id value so that SQL thread will know we are done with the
+ half-processed event group.
+ */
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ group_rgi->is_error= true;
+ finish_event_group(thd, 1, group_rgi->gtid_sub_id,
+ group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->cleanup_context(thd, true);
+ group_rgi->rli->abort_slave= true;
+ in_event_group= false;
+ delete group_rgi;
+ group_rgi= NULL;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ }
if (!in_event_group)
{
rpt->current_entry= NULL;
@@ -646,6 +679,8 @@ rpl_parallel::find(uint32 domain_id)
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
}
+ else
+ e->force_abort= false;
return e;
}
@@ -657,6 +692,25 @@ rpl_parallel::wait_for_done()
struct rpl_parallel_entry *e;
uint32 i;
+ /*
+ First signal all workers that they must force quit; no more events will
+ be queued to complete any partial event groups executed.
+ */
+ for (i= 0; i < domain_hash.records; ++i)
+ {
+ rpl_parallel_thread *rpt;
+
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ e->force_abort= true;
+ if ((rpt= e->rpl_thread))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_entry == e)
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
+
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);