diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 135 |
1 files changed, 115 insertions, 20 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 67d61b7cf11..65461b3f990 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -156,6 +156,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, mysql_mutex_unlock(&entry->LOCK_parallel_entry); thd->clear_error(); + thd->reset_killed(); thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); } @@ -188,6 +189,25 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static void +register_wait_for_prior_event_group_commit(rpl_group_info *rgi, + rpl_parallel_entry *entry) +{ + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -205,6 +225,40 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) #endif +/* + If we detect a deadlock due to eg. storage engine locks that conflict with + the fixed commit order, then the later transaction will be killed + asynchroneously to allow the former to complete its commit. + + In this case, we convert the 'killed' error into a deadlock error, and retry + the later transaction. */ +static void +convert_kill_to_deadlock_error(rpl_group_info *rgi) +{ + THD *thd= rgi->thd; + + if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED && + rgi->killed_for_retry) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + rgi->killed_for_retry= false; + thd->reset_killed(); + } +} + + +static bool +is_group_ending(Log_event *ev, Log_event_type event_type) +{ + return event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback())); +} + + static int retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, rpl_parallel_thread::queued_event *orig_qev) @@ -221,11 +275,46 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ulonglong cur_offset, old_offset; char log_name[FN_REFLEN]; THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; ulong retries= 0; do_retry: event_count= 0; err= 0; + + /* + If we already started committing before getting the deadlock (or other + error) that caused us to need to retry, we have already signalled + subsequent transactions that we have started committing. This is + potentially a problem, as now we will rollback, and if subsequent + transactions would start to execute now, they could see an unexpected + state of the database and get eg. key not found or duplicate key error. + + However, to get a deadlock in the first place, there must have been + another earlier transaction that is waiting for us. Thus that other + transaction has _not_ yet started to commit, and any subsequent + transactions will still be waiting at this point. + + So here, we decrement back the count of transactions that started + committing (if we already incremented it), undoing the effect of an + earlier mark_start_commit(). Then later, when the retry succeeds and we + commit again, we can do a new mark_start_commit() and eventually wake up + subsequent transactions at the proper time. + + We need to do the unmark before the rollback, to be sure that the + transaction we deadlocked with will not signal that it started to commit + until after the unmark. + */ + rgi->unmark_start_commit(); + + /* + We might get the deadlock error that causes the retry during commit, while + sitting in wait_for_prior_commit(). If this happens, we will have a + pending error in the wait_for_commit object. So clear this by + unregistering (and later re-registering) the wait. + */ + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); rgi->cleanup_context(thd, 1); mysql_mutex_lock(&rli->data_lock); @@ -233,6 +322,10 @@ do_retry: statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + strcpy(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) { @@ -319,6 +412,9 @@ do_retry: err= 1; goto err; } + if (is_group_ending(ev, event_type)) + rgi->mark_start_commit(); + err= rpt_handle_event(qev, rpt); ++event_count; mysql_mutex_lock(&rpt->LOCK_rpl_thread); @@ -332,6 +428,7 @@ do_retry: err= dbug_simulate_tmp_error(rgi, thd);); if (err) { + convert_kill_to_deadlock_error(rgi); if (has_temporary_error(thd)) { ++retries; @@ -599,17 +696,9 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; - else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) - { - /* - Register that the commit of this event group must wait for the - commit of the previous event group to complete before it may - complete itself, so that we preserve commit order. - */ - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } + else + register_wait_for_prior_event_group_commit(rgi, entry); + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, &did_enter_cond, &old_stage); @@ -651,10 +740,7 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback())); + group_ending= is_group_ending(events->ev, event_type); if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); @@ -674,8 +760,12 @@ handle_rpl_parallel_thread(void *arg) delete_or_keep_event_post_apply(rgi, event_type, events->ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", err= dbug_simulate_tmp_error(rgi, thd);); - if (err && has_temporary_error(thd)) - err= retry_event_group(rgi, rpt, events); + if (err) + { + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd) && slave_trans_retries > 0) + err= retry_event_group(rgi, rpt, events); + } } else { @@ -691,10 +781,14 @@ handle_rpl_parallel_thread(void *arg) events->next= qevs_to_free; qevs_to_free= events; - if (unlikely(err) && !rgi->worker_error) + if (unlikely(err)) { - slave_output_error_info(rgi, thd); - signal_error_to_sql_driver_thread(thd, rgi, err); + if (!rgi->worker_error) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, err); + } + thd->reset_killed(); } if (end_of_group) { @@ -1096,6 +1190,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->relay_log= rli->last_inuse_relaylog; rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; + rgi->killed_for_retry= false; return rgi; } |