summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc135
1 files changed, 115 insertions, 20 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 67d61b7cf11..65461b3f990 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -156,6 +156,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
+ thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -188,6 +189,25 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
+static void
+register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
+ rpl_parallel_entry *entry)
+{
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+ if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+}
+
+
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -205,6 +225,40 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
#endif
+/*
+ If we detect a deadlock due to eg. storage engine locks that conflict with
+ the fixed commit order, then the later transaction will be killed
+ asynchroneously to allow the former to complete its commit.
+
+ In this case, we convert the 'killed' error into a deadlock error, and retry
+ the later transaction. */
+static void
+convert_kill_to_deadlock_error(rpl_group_info *rgi)
+{
+ THD *thd= rgi->thd;
+
+ if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED &&
+ rgi->killed_for_retry)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ rgi->killed_for_retry= false;
+ thd->reset_killed();
+ }
+}
+
+
+static bool
+is_group_ending(Log_event *ev, Log_event_type event_type)
+{
+ return event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback()));
+}
+
+
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
@@ -221,11 +275,46 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
do_retry:
event_count= 0;
err= 0;
+
+ /*
+ If we already started committing before getting the deadlock (or other
+ error) that caused us to need to retry, we have already signalled
+ subsequent transactions that we have started committing. This is
+ potentially a problem, as now we will rollback, and if subsequent
+ transactions would start to execute now, they could see an unexpected
+ state of the database and get eg. key not found or duplicate key error.
+
+ However, to get a deadlock in the first place, there must have been
+ another earlier transaction that is waiting for us. Thus that other
+ transaction has _not_ yet started to commit, and any subsequent
+ transactions will still be waiting at this point.
+
+ So here, we decrement back the count of transactions that started
+ committing (if we already incremented it), undoing the effect of an
+ earlier mark_start_commit(). Then later, when the retry succeeds and we
+ commit again, we can do a new mark_start_commit() and eventually wake up
+ subsequent transactions at the proper time.
+
+ We need to do the unmark before the rollback, to be sure that the
+ transaction we deadlocked with will not signal that it started to commit
+ until after the unmark.
+ */
+ rgi->unmark_start_commit();
+
+ /*
+ We might get the deadlock error that causes the retry during commit, while
+ sitting in wait_for_prior_commit(). If this happens, we will have a
+ pending error in the wait_for_commit object. So clear this by
+ unregistering (and later re-registering) the wait.
+ */
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
rgi->cleanup_context(thd, 1);
mysql_mutex_lock(&rli->data_lock);
@@ -233,6 +322,10 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
@@ -319,6 +412,9 @@ do_retry:
err= 1;
goto err;
}
+ if (is_group_ending(ev, event_type))
+ rgi->mark_start_commit();
+
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
@@ -332,6 +428,7 @@ do_retry:
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
+ convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd))
{
++retries;
@@ -599,17 +696,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
- else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
- {
- /*
- Register that the commit of this event group must wait for the
- commit of the previous event group to complete before it may
- complete itself, so that we preserve commit order.
- */
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
+ else
+ register_wait_for_prior_event_group_commit(rgi, entry);
+
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@@ -651,10 +740,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
- group_ending= event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback()));
+ group_ending= is_group_ending(events->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@@ -674,8 +760,12 @@ handle_rpl_parallel_thread(void *arg)
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
- if (err && has_temporary_error(thd))
- err= retry_event_group(rgi, rpt, events);
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd) && slave_trans_retries > 0)
+ err= retry_event_group(rgi, rpt, events);
+ }
}
else
{
@@ -691,10 +781,14 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free;
qevs_to_free= events;
- if (unlikely(err) && !rgi->worker_error)
+ if (unlikely(err))
{
- slave_output_error_info(rgi, thd);
- signal_error_to_sql_driver_thread(thd, rgi, err);
+ if (!rgi->worker_error)
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
+ }
+ thd->reset_killed();
}
if (end_of_group)
{
@@ -1096,6 +1190,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
+ rgi->killed_for_retry= false;
return rgi;
}