summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-07-08 16:47:07 +0200
committerunknown <knielsen@knielsen-hq.org>2013-07-08 16:47:07 +0200
commita99356fbe72fbca61617edabc5a8928da4343c96 (patch)
treed685addaf9908478c735a57d271d937b7133c60f
parente654be3865d7c8a6ad6339b2de2c45f02c9f7981 (diff)
downloadmariadb-git-a99356fbe72fbca61617edabc5a8928da4343c96.tar.gz
MDEV-4506: Parallel replication: intermediate commit.
Fix a bunch of issues found with locking, ordering, and non-thread-safe stuff in Relay_log_info. Now able to do a simple benchmark, showing 4.5 times speedup for applying a binlog with 10000 REPLACE statements.
-rw-r--r--sql/log.h2
-rw-r--r--sql/log_event.cc2
-rw-r--r--sql/log_event_old.cc4
-rw-r--r--sql/rpl_parallel.cc132
-rw-r--r--sql/rpl_parallel.h11
-rw-r--r--sql/rpl_rli.cc8
-rw-r--r--sql/rpl_rli.h12
-rw-r--r--sql/slave.cc3
8 files changed, 109 insertions, 65 deletions
diff --git a/sql/log.h b/sql/log.h
index 48cc568da11..efb560dc245 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -408,7 +408,7 @@ private:
class binlog_cache_mngr;
struct rpl_gtid;
-class wait_for_commit;
+struct wait_for_commit;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 1f8685e34b8..cb7bc3924f5 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -9101,7 +9101,7 @@ int Rows_log_event::do_apply_event(struct rpl_group_info *rgi)
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rgi->thd == thd);
/*
If there is no locks taken, this is the first binrow event seen
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index 4be3e2720de..d3e9d47d64a 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -68,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == ev_thd);
+ DBUG_ASSERT(rgi->thd == ev_thd);
/*
If there is no locks taken, this is the first binrow event seen
@@ -1481,7 +1481,7 @@ int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rgi->thd == thd);
/*
If there is no locks taken, this is the first binrow event seen
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 8f97c19e5ad..63066d8d7c0 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -38,15 +38,16 @@
everything needs to be correctly rolled back and stopped in all threads,
to ensure a consistent slave replication state.
- - We need some knob on the master to allow the user to deliberately delay
- commits waiting for more transactions to join group commit, to increase
- potential for parallel execution on the slave.
-
- Handle the case of a partial event group. This occurs when the master
crashes in the middle of writing the event group to the binlog. The
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
+ - Relay_log_info::is_in_group(). This needs to be handled correctly in all
+ callers. I think it needs to be split into two, one version in
+ Relay_log_info to be used from next_event() in slave.cc, one to be used in
+ per-transaction stuff.
+
- We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication
@@ -58,12 +59,12 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
- THD *thd,
struct rpl_parallel_thread *rpt)
{
int err;
struct rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
+ THD *thd= rgi->thd;
thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter;
@@ -143,6 +144,7 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_for_sub_id;
+ uint64 wait_start_sub_id;
bool end_of_group;
if (event_type == GTID_EVENT)
@@ -155,14 +157,28 @@ handle_rpl_parallel_thread(void *arg)
/* Save this, as it gets cleared once event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
+ rgi->thd= thd;
+
/*
Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already
occured.
+
+ Also do not start parallel execution of this event group until all
+ prior groups have committed that are not safe to run in parallel with.
*/
- if ((wait_for_sub_id= rgi->wait_commit_sub_id))
+ wait_for_sub_id= rgi->wait_commit_sub_id;
+ wait_start_sub_id= rgi->wait_start_sub_id;
+ if (wait_for_sub_id || wait_start_sub_id)
{
mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (wait_start_sub_id)
+ {
+ while (wait_start_sub_id > entry->last_committed_sub_id)
+ mysql_cond_wait(&entry->COND_parallel_entry,
+ &entry->LOCK_parallel_entry);
+ }
+ rgi->wait_start_sub_id= 0; /* No need to check again. */
if (wait_for_sub_id > entry->last_committed_sub_id)
{
wait_for_commit *waitee=
@@ -176,7 +192,7 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
- rpt_handle_event(events, thd, rpt);
+ rpt_handle_event(events, rpt);
end_of_group=
in_event_group &&
@@ -376,6 +392,7 @@ err:
while (new_free_list->running)
mysql_cond_wait(&new_free_list->COND_rpl_thread,
&new_free_list->LOCK_rpl_thread);
+ mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
my_free(new_free_list);
new_free_list= next;
}
@@ -503,8 +520,7 @@ rpl_parallel::wait_for_done()
bool
-rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
- THD *parent_thd)
+rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
{
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
@@ -530,51 +546,15 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
if (!(e= find(gtid_ev->domain_id)) ||
- !(e->current_group_info= rgi= new rpl_group_info(rli)) ||
+ !(rgi= new rpl_group_info(rli)) ||
event_group_new_gtid(rgi, gtid_ev))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true;
}
- /* Check if we already have a worker thread for this entry. */
- cur_thread= e->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != e)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- e->rpl_thread= cur_thread= NULL;
- }
- }
-
- if (!cur_thread)
- {
- /*
- Nothing else is currently running in this domain. We can spawn a new
- thread to do this event group in parallel with anything else that might
- be running in other domains.
- */
- if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
- {
- e->last_server_id= gtid_ev->server_id;
- e->last_seq_no= gtid_ev->seq_no;
- e->last_commit_id= gtid_ev->commit_id;
- }
- else
- {
- e->last_server_id= 0;
- e->last_seq_no= 0;
- e->last_commit_id= 0;
- }
- cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- rgi->wait_commit_sub_id= 0;
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id)
+ if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id)
{
/*
We are already executing something else in this domain. But the two
@@ -588,19 +568,63 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
+ rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
e->rpl_thread= cur_thread= rpt;
/* get_thread() returns with the LOCK_rpl_thread locked. */
}
else
{
- /*
- We are still executing the previous event group for this replication
- domain, and we have to wait for that to finish before we can start on
- the next one. So just re-use the thread.
- */
+ /* Check if we already have a worker thread for this entry. */
+ cur_thread= e->rpl_thread;
+ if (cur_thread)
+ {
+ mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
+ if (cur_thread->current_entry != e)
+ {
+ /* Not ours anymore, we need to grab a new one. */
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ e->rpl_thread= cur_thread= NULL;
+ }
+ }
+
+ if (!cur_thread)
+ {
+ /*
+ Nothing else is currently running in this domain. We can spawn a new
+ thread to do this event group in parallel with anything else that might
+ be running in other domains.
+ */
+ cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
+ /* get_thread() returns with the LOCK_rpl_thread locked. */
+ }
+ else
+ {
+ /*
+ We are still executing the previous event group for this replication
+ domain, and we have to wait for that to finish before we can start on
+ the next one. So just re-use the thread.
+ */
+ }
+
rgi->wait_commit_sub_id= 0;
+ rgi->wait_start_sub_id= 0;
+ e->prev_groupcommit_sub_id= e->current_sub_id;
+ }
+
+ if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
+ {
+ e->last_server_id= gtid_ev->server_id;
+ e->last_seq_no= gtid_ev->seq_no;
+ e->last_commit_id= gtid_ev->commit_id;
+ }
+ else
+ {
+ e->last_server_id= 0;
+ e->last_seq_no= 0;
+ e->last_commit_id= 0;
}
+ e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e;
}
@@ -612,7 +636,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
but they might be from an old master).
*/
qev->rgi= serial_rgi;
- rpt_handle_event(qev, parent_thd, NULL);
+ rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(rli, typ, qev->ev);
return false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index a84722e9263..304263c3477 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -60,6 +60,15 @@ struct rpl_parallel_entry {
mysql_cond_t COND_parallel_entry;
uint64 current_sub_id;
struct rpl_group_info *current_group_info;
+ /*
+ The sub_id of the last event group in the previous batch of group-committed
+ transactions.
+
+ When we spawn parallel worker threads for the next group-committed batch,
+ they first need to wait for this sub_id to be committed before it is safe
+ to start executing them.
+ */
+ uint64 prev_groupcommit_sub_id;
};
struct rpl_parallel {
HASH domain_hash;
@@ -69,7 +78,7 @@ struct rpl_parallel {
~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
- bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd);
+ bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index f189f9adffa..8fb22266d5e 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1226,7 +1226,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
inc_event_relay_log_pos();
else
{
@@ -1267,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
- DBUG_ASSERT(sql_thd == thd);
+ DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
@@ -1534,8 +1534,8 @@ end:
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
- : rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0),
- parallel_entry(0)
+ : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0)
{
bzero(&current_gtid, sizeof(current_gtid));
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index c22773f9810..294f2ba885a 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -604,6 +604,7 @@ private:
struct rpl_group_info
{
Relay_log_info *rli;
+ THD *thd;
/*
Current GTID being processed.
The sub_id gives the binlog order within one domain_id. A zero sub_id
@@ -630,10 +631,19 @@ struct rpl_group_info
*/
uint64 wait_commit_sub_id;
struct rpl_group_info *wait_commit_group_info;
+ /*
+ If non-zero, the event group must wait for this sub_id to be committed
+ before the execution of the event group is allowed to start.
+
+ (When we execute in parallel the transactions that group committed
+ together on the master, we still need to wait for any prior transactions
+ to have commtted).
+ */
+ uint64 wait_start_sub_id;
struct rpl_parallel_entry *parallel_entry;
- rpl_group_info(Relay_log_info *rli);
+ rpl_group_info(Relay_log_info *rli_);
~rpl_group_info() { };
};
diff --git a/sql/slave.cc b/sql/slave.cc
index 474a6f902d2..b9ef3172364 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3246,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
}
if (opt_slave_parallel_threads > 0)
- DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd));
+ DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev));
/*
For GTID, allocate a new sub_id for the given domain_id.
@@ -3995,6 +3995,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter;
+ serial_rgi.thd= thd;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);