summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-03-21 10:11:28 +0100
committerunknown <knielsen@knielsen-hq.org>2014-03-21 10:11:28 +0100
commita5418c5540f06e2ec4016734e9f1cc5c50c01de0 (patch)
tree4e21b26065c2691e19322d4671ff7986e716672f /sql/rpl_parallel.cc
parente59dec0345bba78ba83928060c20c5861cf205f1 (diff)
downloadmariadb-git-a5418c5540f06e2ec4016734e9f1cc5c50c01de0.tar.gz
MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction
When a transaction fails in parallel replication, it should signal the error to any following transactions doing wait_for_prior_commit() on it. But the code for this was incorrect, and would not correctly remember a prior error when sending the signal. This caused corruption when slave stopped due to an error. Fix by remembering the error code when we first get an error, and passing the saved error code to wakeup_subsequent_commits(). Thanks to nanyi607rao who reported this bug on maria-developers@lists.launchpad.net and analysed the root cause.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc42
1 files changed, 22 insertions, 20 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 3218acf2525..af797d55da3 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -20,6 +20,8 @@
struct rpl_parallel_thread_pool global_rpl_thread_pool;
+static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
+ int err);
static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
@@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
static void
-finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, rpl_group_info *rgi)
+finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
+ rpl_group_info *rgi)
{
wait_for_commit *wfc= &rgi->commit_orderer;
+ int err;
/*
Remove any left-over registration to wait for a prior commit to
@@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
waiting for us will in any case receive the error back from their
wait_for_prior_commit() call.
*/
- if (err)
+ if (rgi->worker_error)
wfc->unregister_wait_for_prior_commit();
- else
- err= wfc->wait_for_prior_commit(thd);
+ else if ((err= wfc->wait_for_prior_commit(thd)))
+ signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
/*
@@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
not yet started should just skip their group, preparing for stop of the
SQL driver thread.
*/
- if (unlikely(rgi->is_error) &&
+ if (unlikely(rgi->worker_error) &&
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id;
/*
@@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
- wfc->wakeup_subsequent_commits(err);
+ wfc->wakeup_subsequent_commits(rgi->worker_error);
}
static void
-signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
{
- rgi->is_error= true;
+ rgi->worker_error= err;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
rgi->rli->stop_for_until= false;
@@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg)
continue;
}
- err= 0;
group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
@@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg)
did_enter_cond= true;
do
{
- if (thd->check_killed() && !rgi->is_error)
+ if (thd->check_killed() && !rgi->worker_error)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
prior event groups to signal that we can continue. Otherwise we
@@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg)
*/
rgi->cleanup_context(thd, true);
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
- thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
+ thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error);
}
thd->wait_for_commit_ptr= &rgi->commit_orderer;
@@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg)
{
/* Error. */
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
}
else if (!res)
{
@@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !skip_event_group)
+ if (!rgi->worker_error && !skip_event_group)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free;
qevs_to_free= events;
- if (err)
+ if (unlikely(err) && !rgi->worker_error)
{
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
}
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
@@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg)
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
thd->wait_for_prior_commit();
- finish_event_group(thd, 1, group_rgi->gtid_sub_id,
+ signal_error_to_sql_driver_thread(thd, group_rgi, 1);
+ finish_event_group(thd, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, group_rgi);
- signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);