summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc23
1 files changed, 5 insertions, 18 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index a87cd0b50a3..29cda30738c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -173,7 +173,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
mark_start_commit() calls can be made and it is safe to de-allocate
the GCO.
*/
- err= wfc->wait_for_prior_commit(thd);
+ err= wfc->wait_for_prior_commit(thd, true);
if (unlikely(err) && !rgi->worker_error)
signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
@@ -304,8 +304,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2"));
}});
- if (!rgi->aborted)
- rgi->unmark_start_commit();
+ rgi->unmark_start_commit();
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
@@ -1391,25 +1390,14 @@ handle_rpl_parallel_thread(void *arg)
if (!err)
#endif
{
- if (unlikely(thd->check_killed()))
- {
-
- thd->clear_error();
- thd->get_stmt_da()->reset_diagnostics_area();
- thd->send_kill_message();
- err= 1;
- }
- else if ((entry->stop_abrupt(rgi->rli) &&
- entry->rgi_is_safe_to_terminate(rgi)))
+ if (unlikely(thd->check_killed()) ||
+ (entry->stop_abrupt(rgi->rli) &&
+ entry->rgi_is_safe_to_terminate(rgi)))
{
- /*
- Temporarily separated branch, will merge with above
- */
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message();
err= 1;
- rgi->aborted= true;
}
else
err= rpt_handle_event(qev, rpt);
@@ -1990,7 +1978,6 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
- rgi->aborted= 0;
return rgi;
}