diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-03-21 10:11:28 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-03-21 10:11:28 +0100 |
commit | a5418c5540f06e2ec4016734e9f1cc5c50c01de0 (patch) | |
tree | 4e21b26065c2691e19322d4671ff7986e716672f /sql/rpl_parallel.cc | |
parent | e59dec0345bba78ba83928060c20c5861cf205f1 (diff) | |
download | mariadb-git-a5418c5540f06e2ec4016734e9f1cc5c50c01de0.tar.gz |
MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction
When a transaction fails in parallel replication, it should signal the error
to any following transactions doing wait_for_prior_commit() on it. But the
code for this was incorrect, and would not correctly remember a prior error
when sending the signal. This caused corruption when slave stopped due to an
error.
Fix by remembering the error code when we first get an error, and passing the
saved error code to wakeup_subsequent_commits().
Thanks to nanyi607rao who reported this bug on
maria-developers@lists.launchpad.net and analysed the root cause.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 42 |
1 files changed, 22 insertions, 20 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 3218acf2525..af797d55da3 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -20,6 +20,8 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool; +static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, + int err); static int rpt_handle_event(rpl_parallel_thread::queued_event *qev, @@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) static void -finish_event_group(THD *thd, int err, uint64 sub_id, - rpl_parallel_entry *entry, rpl_group_info *rgi) +finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, + rpl_group_info *rgi) { wait_for_commit *wfc= &rgi->commit_orderer; + int err; /* Remove any left-over registration to wait for a prior commit to @@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id, waiting for us will in any case receive the error back from their wait_for_prior_commit() call. */ - if (err) + if (rgi->worker_error) wfc->unregister_wait_for_prior_commit(); - else - err= wfc->wait_for_prior_commit(thd); + else if ((err= wfc->wait_for_prior_commit(thd))) + signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; /* @@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, not yet started should just skip their group, preparing for stop of the SQL driver thread. */ - if (unlikely(rgi->is_error) && + if (unlikely(rgi->worker_error) && entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id= sub_id; /* @@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id, thd->clear_error(); thd->get_stmt_da()->reset_diagnostics_area(); - wfc->wakeup_subsequent_commits(err); + wfc->wakeup_subsequent_commits(rgi->worker_error); } static void -signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) +signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err) { - rgi->is_error= true; + rgi->worker_error= err; rgi->cleanup_context(thd, true); rgi->rli->abort_slave= true; rgi->rli->stop_for_until= false; @@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg) continue; } - err= 0; group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ @@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg) did_enter_cond= true; do { - if (thd->check_killed() && !rgi->is_error) + if (thd->check_killed() && !rgi->worker_error) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, 1); /* Even though we were killed, we need to continue waiting for the prior event groups to signal that we can continue. Otherwise we @@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg) */ rgi->cleanup_context(thd, true); thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); + thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error); } thd->wait_for_commit_ptr= &rgi->commit_orderer; @@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg) { /* Error. */ slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, 1); } else if (!res) { @@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->is_error && !skip_event_group) + if (!rgi->worker_error && !skip_event_group) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg) events->next= qevs_to_free; qevs_to_free= events; - if (err) + if (unlikely(err) && !rgi->worker_error) { slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, err); } if (end_of_group) { in_event_group= false; - finish_event_group(thd, err, event_gtid_sub_id, entry, rgi); + finish_event_group(thd, event_gtid_sub_id, entry, rgi); rgi->next= rgis_to_free; rgis_to_free= rgi; group_rgi= rgi= NULL; @@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg) */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); thd->wait_for_prior_commit(); - finish_event_group(thd, 1, group_rgi->gtid_sub_id, + signal_error_to_sql_driver_thread(thd, group_rgi, 1); + finish_event_group(thd, group_rgi->gtid_sub_id, group_rgi->parallel_entry, group_rgi); - signal_error_to_sql_driver_thread(thd, group_rgi); in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); |