diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-05 16:09:48 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-06 08:49:50 +0100 |
commit | db21fddc3740dfa48f3443751c48282467afac5e (patch) | |
tree | 3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/rpl_parallel.cc | |
parent | 1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff) | |
download | mariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz |
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions
are optimistically attempted applied in parallel. In case of conflicts, the
offending transaction is rolled back and retried later non-parallel.
This is an early-release patch to facilitate testing, more changes to user
interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 183 |
1 files changed, 159 insertions, 24 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 2050ccdb3d7..89d9289d166 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -250,7 +250,13 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) asynchroneously to allow the former to complete its commit. In this case, we convert the 'killed' error into a deadlock error, and retry - the later transaction. */ + the later transaction. + + If we are doing optimistic parallel apply of transactions not known to be + safe, we convert any error to a deadlock error, but then at retry we will + wait for prior transactions to commit first, so that the retries can be + done non-speculative. +*/ static void convert_kill_to_deadlock_error(rpl_group_info *rgi) { @@ -260,8 +266,10 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) if (!thd->get_stmt_da()->is_error()) return; err_code= thd->get_stmt_da()->sql_errno(); - if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && - rgi->killed_for_retry) + if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC && + err_code != ER_PRIOR_COMMIT_FAILED) || + ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) && + rgi->killed_for_retry)) { thd->clear_error(); my_error(ER_LOCK_DEADLOCK, MYF(0)); @@ -354,9 +362,43 @@ do_retry: statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); - mysql_mutex_lock(&entry->LOCK_parallel_entry); - register_wait_for_prior_event_group_commit(rgi, entry); - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + for (;;) + { + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + if (rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC) + break; + + /* + We speculatively tried to run this in parallel with prior event groups, + but it did not work for some reason. + + So let us wait for all prior transactions to complete before trying + again. This way, we avoid repeatedly retrying and failing a small + transaction that conflicts with a prior long-running one. + */ + if (!(err= thd->wait_for_prior_commit())) + { + rgi->speculation = rpl_group_info::SPECULATE_WAIT; + break; + } + + convert_kill_to_deadlock_error(rgi); + if (!has_temporary_error(thd)) + goto err; + /* + If we get a temporary error such as a deadlock kill, we can safely + ignore it, as we already rolled back. + + But we still want to retry the wait for the prior transaction to + complete its commit. + */ + thd->clear_error(); + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + } strmake_buf(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) @@ -550,6 +592,13 @@ handle_rpl_parallel_thread(void *arg) thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; thd->system_thread_info.rpl_sql_info= &sql_info; + /* + We need to use (at least) REPEATABLE READ isolation level. Otherwise + speculative parallel apply can run out-of-order and give wrong results + for statement-based replication. + */ + thd->variables.tx_isolation= ISO_REPEATABLE_READ; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->thd= thd; @@ -639,6 +688,7 @@ handle_rpl_parallel_thread(void *arg) } }); + thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation; in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -662,11 +712,11 @@ handle_rpl_parallel_thread(void *arg) in parallel with. */ mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (!gco->installed) + if (!(gco->flags & group_commit_orderer::INSTALLED)) { if (gco->prev_gco) gco->prev_gco->next_gco= gco; - gco->installed= true; + gco->flags|= group_commit_orderer::INSTALLED; } wait_count= gco->wait_count; if (wait_count > entry->count_committing_event_groups) @@ -766,6 +816,18 @@ handle_rpl_parallel_thread(void *arg) /* We have to apply the event. */ } } + /* + If we are optimistically running transactions in parallel, but this + particular event group should not run in parallel with what came + before, then wait now for the prior transaction to complete its + commit. + */ + if (rgi->speculation == rpl_group_info::SPECULATE_WAIT && + (err= thd->wait_for_prior_commit())) + { + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + } } group_ending= is_group_ending(qev->ev, event_type); @@ -1319,7 +1381,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) gco->wait_count= wait_count; gco->prev_gco= prev; gco->next_gco= NULL; - gco->installed= false; + gco->flags= 0; return gco; } @@ -1849,6 +1911,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, bool did_enter_cond= false; PSI_stage_info old_stage; + DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); /* Handle master log name change, seen in Rotate_log_event. */ typ= ev->get_type_code(); if (unlikely(typ == ROTATE_EVENT)) @@ -1929,7 +1992,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || + !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ? 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { @@ -1969,6 +2033,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + bool new_gco; + ulonglong mode= rli->mi->parallel_mode; + uchar gtid_flags= gtid_ev->flags2; + group_commit_orderer *gco; + uint8 force_switch_flag; + enum rpl_group_info::enum_speculation speculation; if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { @@ -1995,19 +2065,87 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_group_info= e->current_group_info; - if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && - e->last_commit_id == gtid_ev->commit_id)) + speculation= rpl_group_info::SPECULATE_NO; + new_gco= true; + force_switch_flag= 0; + gco= e->current_gco; + if (likely(gco)) + { + uint8 flags= gco->flags; + + if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) || + e->last_commit_id != gtid_ev->commit_id) + flags|= group_commit_orderer::MULTI_BATCH; + /* Make sure we do not attempt to run DDL in parallel speculatively. */ + if (gtid_flags & Gtid_log_event::FL_DDL) + flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH); + + if (!(flags & group_commit_orderer::MULTI_BATCH)) + { + /* + Still the same batch of event groups that group-committed together + on the master, so we can run in parallel. + */ + new_gco= false; + } + else if ((mode & SLAVE_PARALLEL_TRX) && + !(flags & group_commit_orderer::FORCE_SWITCH)) + { + /* + In transactional parallel mode, we optimistically attempt to run + non-DDL in parallel. In case of conflicts, we catch the conflict as + a deadlock or other error, roll back and retry serially. + + The assumption is that only a few event groups will be + non-transactional or otherwise unsuitable for parallel apply. Those + transactions are still scheduled in parallel, but we set a flag that + will make the worker thread wait for everything before to complete + before starting. + */ + new_gco= false; + if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) || + !(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) || + ((gtid_flags & Gtid_log_event::FL_WAITED) && + !(mode & SLAVE_PARALLEL_WAITING))) + { + /* + This transaction should not be speculatively run in parallel with + what came before, either because it cannot safely be rolled back in + case of a conflict, or because it was marked as likely to conflict + and require expensive rollback and retry. + + Here we mark it as such, and then the worker thread will do a + wait_for_prior_commit() before starting it. We do not introduce a + new group_commit_orderer, since we still want following transactions + to run in parallel with transactions prior to this one. + */ + speculation= rpl_group_info::SPECULATE_WAIT; + } + else + speculation= rpl_group_info::SPECULATE_OPTIMISTIC; + } + gco->flags= flags; + } + rgi->speculation= speculation; + + if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) + e->last_commit_id= gtid_ev->commit_id; + else + e->last_commit_id= 0; + + if (new_gco) { /* - A new batch of transactions that group-committed together on the master. + Do not run this event group in parallel with what came before; instead + wait for everything prior to at least have started its commit phase, to + avoid any risk of performing any conflicting action too early. - Remember the count that marks the end of the previous group committed - batch, and allocate a new gco. + Remember the count that marks the end of the previous batch of event + groups that run in parallel, and allocate a new gco. */ uint64 count= e->count_queued_event_groups; - group_commit_orderer *gco; - if (!(gco= cur_thread->get_gco(count, e->current_gco))) + if (!(gco= cur_thread->get_gco(count, gco))) { cur_thread->free_rgi(rgi); cur_thread->free_qev(qev); @@ -2016,14 +2154,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, delete ev; return 1; } - e->current_gco= rgi->gco= gco; + gco->flags|= force_switch_flag; + e->current_gco= gco; } - else - rgi->gco= e->current_gco; - if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) - e->last_commit_id= gtid_ev->commit_id; - else - e->last_commit_id= 0; + rgi->gco= gco; + qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; ++e->count_queued_event_groups; |