summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2014-12-05 16:09:48 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-12-06 08:49:50 +0100
commitdb21fddc3740dfa48f3443751c48282467afac5e (patch)
tree3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/rpl_parallel.cc
parent1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff)
downloadmariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions are optimistically attempted applied in parallel. In case of conflicts, the offending transaction is rolled back and retried later non-parallel. This is an early-release patch to facilitate testing, more changes to user interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc183
1 files changed, 159 insertions, 24 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 2050ccdb3d7..89d9289d166 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -250,7 +250,13 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
asynchroneously to allow the former to complete its commit.
In this case, we convert the 'killed' error into a deadlock error, and retry
- the later transaction. */
+ the later transaction.
+
+ If we are doing optimistic parallel apply of transactions not known to be
+ safe, we convert any error to a deadlock error, but then at retry we will
+ wait for prior transactions to commit first, so that the retries can be
+ done non-speculative.
+*/
static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
@@ -260,8 +266,10 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
if (!thd->get_stmt_da()->is_error())
return;
err_code= thd->get_stmt_da()->sql_errno();
- if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
- rgi->killed_for_retry)
+ if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
+ err_code != ER_PRIOR_COMMIT_FAILED) ||
+ ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
+ rgi->killed_for_retry))
{
thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0));
@@ -354,9 +362,43 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- register_wait_for_prior_event_group_commit(rgi, entry);
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ for (;;)
+ {
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ if (rgi->speculation != rpl_group_info::SPECULATE_OPTIMISTIC)
+ break;
+
+ /*
+ We speculatively tried to run this in parallel with prior event groups,
+ but it did not work for some reason.
+
+ So let us wait for all prior transactions to complete before trying
+ again. This way, we avoid repeatedly retrying and failing a small
+ transaction that conflicts with a prior long-running one.
+ */
+ if (!(err= thd->wait_for_prior_commit()))
+ {
+ rgi->speculation = rpl_group_info::SPECULATE_WAIT;
+ break;
+ }
+
+ convert_kill_to_deadlock_error(rgi);
+ if (!has_temporary_error(thd))
+ goto err;
+ /*
+ If we get a temporary error such as a deadlock kill, we can safely
+ ignore it, as we already rolled back.
+
+ But we still want to retry the wait for the prior transaction to
+ complete its commit.
+ */
+ thd->clear_error();
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
+ }
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
@@ -550,6 +592,13 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
+ /*
+ We need to use (at least) REPEATABLE READ isolation level. Otherwise
+ speculative parallel apply can run out-of-order and give wrong results
+ for statement-based replication.
+ */
+ thd->variables.tx_isolation= ISO_REPEATABLE_READ;
+
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -639,6 +688,7 @@ handle_rpl_parallel_thread(void *arg)
}
});
+ thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -662,11 +712,11 @@ handle_rpl_parallel_thread(void *arg)
in parallel with.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (!gco->installed)
+ if (!(gco->flags & group_commit_orderer::INSTALLED))
{
if (gco->prev_gco)
gco->prev_gco->next_gco= gco;
- gco->installed= true;
+ gco->flags|= group_commit_orderer::INSTALLED;
}
wait_count= gco->wait_count;
if (wait_count > entry->count_committing_event_groups)
@@ -766,6 +816,18 @@ handle_rpl_parallel_thread(void *arg)
/* We have to apply the event. */
}
}
+ /*
+ If we are optimistically running transactions in parallel, but this
+ particular event group should not run in parallel with what came
+ before, then wait now for the prior transaction to complete its
+ commit.
+ */
+ if (rgi->speculation == rpl_group_info::SPECULATE_WAIT &&
+ (err= thd->wait_for_prior_commit()))
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ }
}
group_ending= is_group_ending(qev->ev, event_type);
@@ -1319,7 +1381,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
gco->wait_count= wait_count;
gco->prev_gco= prev;
gco->next_gco= NULL;
- gco->installed= false;
+ gco->flags= 0;
return gco;
}
@@ -1849,6 +1911,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool did_enter_cond= false;
PSI_stage_info old_stage;
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
/* Handle master log name change, seen in Rotate_log_event. */
typ= ev->get_type_code();
if (unlikely(typ == ROTATE_EVENT))
@@ -1929,7 +1992,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
+ !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
@@ -1969,6 +2033,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ bool new_gco;
+ ulonglong mode= rli->mi->parallel_mode;
+ uchar gtid_flags= gtid_ev->flags2;
+ group_commit_orderer *gco;
+ uint8 force_switch_flag;
+ enum rpl_group_info::enum_speculation speculation;
if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
@@ -1995,19 +2065,87 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
- if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id))
+ speculation= rpl_group_info::SPECULATE_NO;
+ new_gco= true;
+ force_switch_flag= 0;
+ gco= e->current_gco;
+ if (likely(gco))
+ {
+ uint8 flags= gco->flags;
+
+ if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
+ e->last_commit_id != gtid_ev->commit_id)
+ flags|= group_commit_orderer::MULTI_BATCH;
+ /* Make sure we do not attempt to run DDL in parallel speculatively. */
+ if (gtid_flags & Gtid_log_event::FL_DDL)
+ flags|= (force_switch_flag= group_commit_orderer::FORCE_SWITCH);
+
+ if (!(flags & group_commit_orderer::MULTI_BATCH))
+ {
+ /*
+ Still the same batch of event groups that group-committed together
+ on the master, so we can run in parallel.
+ */
+ new_gco= false;
+ }
+ else if ((mode & SLAVE_PARALLEL_TRX) &&
+ !(flags & group_commit_orderer::FORCE_SWITCH))
+ {
+ /*
+ In transactional parallel mode, we optimistically attempt to run
+ non-DDL in parallel. In case of conflicts, we catch the conflict as
+ a deadlock or other error, roll back and retry serially.
+
+ The assumption is that only a few event groups will be
+ non-transactional or otherwise unsuitable for parallel apply. Those
+ transactions are still scheduled in parallel, but we set a flag that
+ will make the worker thread wait for everything before to complete
+ before starting.
+ */
+ new_gco= false;
+ if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
+ !(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
+ ((gtid_flags & Gtid_log_event::FL_WAITED) &&
+ !(mode & SLAVE_PARALLEL_WAITING)))
+ {
+ /*
+ This transaction should not be speculatively run in parallel with
+ what came before, either because it cannot safely be rolled back in
+ case of a conflict, or because it was marked as likely to conflict
+ and require expensive rollback and retry.
+
+ Here we mark it as such, and then the worker thread will do a
+ wait_for_prior_commit() before starting it. We do not introduce a
+ new group_commit_orderer, since we still want following transactions
+ to run in parallel with transactions prior to this one.
+ */
+ speculation= rpl_group_info::SPECULATE_WAIT;
+ }
+ else
+ speculation= rpl_group_info::SPECULATE_OPTIMISTIC;
+ }
+ gco->flags= flags;
+ }
+ rgi->speculation= speculation;
+
+ if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
+ e->last_commit_id= gtid_ev->commit_id;
+ else
+ e->last_commit_id= 0;
+
+ if (new_gco)
{
/*
- A new batch of transactions that group-committed together on the master.
+ Do not run this event group in parallel with what came before; instead
+ wait for everything prior to at least have started its commit phase, to
+ avoid any risk of performing any conflicting action too early.
- Remember the count that marks the end of the previous group committed
- batch, and allocate a new gco.
+ Remember the count that marks the end of the previous batch of event
+ groups that run in parallel, and allocate a new gco.
*/
uint64 count= e->count_queued_event_groups;
- group_commit_orderer *gco;
- if (!(gco= cur_thread->get_gco(count, e->current_gco)))
+ if (!(gco= cur_thread->get_gco(count, gco)))
{
cur_thread->free_rgi(rgi);
cur_thread->free_qev(qev);
@@ -2016,14 +2154,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete ev;
return 1;
}
- e->current_gco= rgi->gco= gco;
+ gco->flags|= force_switch_flag;
+ e->current_gco= gco;
}
- else
- rgi->gco= e->current_gco;
- if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- e->last_commit_id= gtid_ev->commit_id;
- else
- e->last_commit_id= 0;
+ rgi->gco= gco;
+
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
++e->count_queued_event_groups;