diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-07-08 16:47:07 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-07-08 16:47:07 +0200 |
commit | a99356fbe72fbca61617edabc5a8928da4343c96 (patch) | |
tree | d685addaf9908478c735a57d271d937b7133c60f | |
parent | e654be3865d7c8a6ad6339b2de2c45f02c9f7981 (diff) | |
download | mariadb-git-a99356fbe72fbca61617edabc5a8928da4343c96.tar.gz |
MDEV-4506: Parallel replication: intermediate commit.
Fix a bunch of issues found with locking, ordering, and non-thread-safe stuff
in Relay_log_info.
Now able to do a simple benchmark, showing 4.5 times speedup for applying a
binlog with 10000 REPLACE statements.
-rw-r--r-- | sql/log.h | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 2 | ||||
-rw-r--r-- | sql/log_event_old.cc | 4 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 132 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 11 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 8 | ||||
-rw-r--r-- | sql/rpl_rli.h | 12 | ||||
-rw-r--r-- | sql/slave.cc | 3 |
8 files changed, 109 insertions, 65 deletions
diff --git a/sql/log.h b/sql/log.h index 48cc568da11..efb560dc245 100644 --- a/sql/log.h +++ b/sql/log.h @@ -408,7 +408,7 @@ private: class binlog_cache_mngr; struct rpl_gtid; -class wait_for_commit; +struct wait_for_commit; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: diff --git a/sql/log_event.cc b/sql/log_event.cc index 1f8685e34b8..cb7bc3924f5 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -9101,7 +9101,7 @@ int Rows_log_event::do_apply_event(struct rpl_group_info *rgi) do_apply_event(). We still check here to prevent future coding errors. */ - DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rgi->thd == thd); /* If there is no locks taken, this is the first binrow event seen diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 4be3e2720de..d3e9d47d64a 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -68,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info do_apply_event(). We still check here to prevent future coding errors. */ - DBUG_ASSERT(rli->sql_thd == ev_thd); + DBUG_ASSERT(rgi->thd == ev_thd); /* If there is no locks taken, this is the first binrow event seen @@ -1481,7 +1481,7 @@ int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi) do_apply_event(). We still check here to prevent future coding errors. */ - DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rgi->thd == thd); /* If there is no locks taken, this is the first binrow event seen diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 8f97c19e5ad..63066d8d7c0 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -38,15 +38,16 @@ everything needs to be correctly rolled back and stopped in all threads, to ensure a consistent slave replication state. - - We need some knob on the master to allow the user to deliberately delay - commits waiting for more transactions to join group commit, to increase - potential for parallel execution on the slave. - - Handle the case of a partial event group. This occurs when the master crashes in the middle of writing the event group to the binlog. The slave rolls back the transaction; parallel execution needs to be able to deal with this wrt. commit_orderer and such. + - Relay_log_info::is_in_group(). This needs to be handled correctly in all + callers. I think it needs to be split into two, one version in + Relay_log_info to be used from next_event() in slave.cc, one to be used in + per-transaction stuff. + - We should fail if we connect to the master with opt_slave_parallel_threads greater than zero and master does not support GTID. Just to avoid a bunch of potential problems, we won't be able to do any parallel replication @@ -58,12 +59,12 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool; static void rpt_handle_event(rpl_parallel_thread::queued_event *qev, - THD *thd, struct rpl_parallel_thread *rpt) { int err; struct rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; + THD *thd= rgi->thd; thd->rli_slave= rli; thd->rpl_filter = rli->mi->rpl_filter; @@ -143,6 +144,7 @@ handle_rpl_parallel_thread(void *arg) rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_for_sub_id; + uint64 wait_start_sub_id; bool end_of_group; if (event_type == GTID_EVENT) @@ -155,14 +157,28 @@ handle_rpl_parallel_thread(void *arg) /* Save this, as it gets cleared once event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; + rgi->thd= thd; + /* Register ourself to wait for the previous commit, if we need to do such registration _and_ that previous commit has not already occured. + + Also do not start parallel execution of this event group until all + prior groups have committed that are not safe to run in parallel with. */ - if ((wait_for_sub_id= rgi->wait_commit_sub_id)) + wait_for_sub_id= rgi->wait_commit_sub_id; + wait_start_sub_id= rgi->wait_start_sub_id; + if (wait_for_sub_id || wait_start_sub_id) { mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (wait_start_sub_id) + { + while (wait_start_sub_id > entry->last_committed_sub_id) + mysql_cond_wait(&entry->COND_parallel_entry, + &entry->LOCK_parallel_entry); + } + rgi->wait_start_sub_id= 0; /* No need to check again. */ if (wait_for_sub_id > entry->last_committed_sub_id) { wait_for_commit *waitee= @@ -176,7 +192,7 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } - rpt_handle_event(events, thd, rpt); + rpt_handle_event(events, rpt); end_of_group= in_event_group && @@ -376,6 +392,7 @@ err: while (new_free_list->running) mysql_cond_wait(&new_free_list->COND_rpl_thread, &new_free_list->LOCK_rpl_thread); + mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread); my_free(new_free_list); new_free_list= next; } @@ -503,8 +520,7 @@ rpl_parallel::wait_for_done() bool -rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, - THD *parent_thd) +rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev) { rpl_parallel_entry *e; rpl_parallel_thread *cur_thread; @@ -530,51 +546,15 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); if (!(e= find(gtid_ev->domain_id)) || - !(e->current_group_info= rgi= new rpl_group_info(rli)) || + !(rgi= new rpl_group_info(rli)) || event_group_new_gtid(rgi, gtid_ev)) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); return true; } - /* Check if we already have a worker thread for this entry. */ - cur_thread= e->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != e) - { - /* Not ours anymore, we need to grab a new one. */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - e->rpl_thread= cur_thread= NULL; - } - } - - if (!cur_thread) - { - /* - Nothing else is currently running in this domain. We can spawn a new - thread to do this event group in parallel with anything else that might - be running in other domains. - */ - if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) - { - e->last_server_id= gtid_ev->server_id; - e->last_seq_no= gtid_ev->seq_no; - e->last_commit_id= gtid_ev->commit_id; - } - else - { - e->last_server_id= 0; - e->last_seq_no= 0; - e->last_commit_id= 0; - } - cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); - rgi->wait_commit_sub_id= 0; - /* get_thread() returns with the LOCK_rpl_thread locked. */ - } - else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) && - e->last_commit_id == gtid_ev->commit_id) + if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && + e->last_commit_id == gtid_ev->commit_id) { /* We are already executing something else in this domain. But the two @@ -588,19 +568,63 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_group_info= e->current_group_info; + rgi->wait_start_sub_id= e->prev_groupcommit_sub_id; e->rpl_thread= cur_thread= rpt; /* get_thread() returns with the LOCK_rpl_thread locked. */ } else { - /* - We are still executing the previous event group for this replication - domain, and we have to wait for that to finish before we can start on - the next one. So just re-use the thread. - */ + /* Check if we already have a worker thread for this entry. */ + cur_thread= e->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != e) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + e->rpl_thread= cur_thread= NULL; + } + } + + if (!cur_thread) + { + /* + Nothing else is currently running in this domain. We can spawn a new + thread to do this event group in parallel with anything else that might + be running in other domains. + */ + cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); + /* get_thread() returns with the LOCK_rpl_thread locked. */ + } + else + { + /* + We are still executing the previous event group for this replication + domain, and we have to wait for that to finish before we can start on + the next one. So just re-use the thread. + */ + } + rgi->wait_commit_sub_id= 0; + rgi->wait_start_sub_id= 0; + e->prev_groupcommit_sub_id= e->current_sub_id; + } + + if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) + { + e->last_server_id= gtid_ev->server_id; + e->last_seq_no= gtid_ev->seq_no; + e->last_commit_id= gtid_ev->commit_id; + } + else + { + e->last_server_id= 0; + e->last_seq_no= 0; + e->last_commit_id= 0; } + e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } @@ -612,7 +636,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, but they might be from an old master). */ qev->rgi= serial_rgi; - rpt_handle_event(qev, parent_thd, NULL); + rpt_handle_event(qev, NULL); delete_or_keep_event_post_apply(rli, typ, qev->ev); return false; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index a84722e9263..304263c3477 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -60,6 +60,15 @@ struct rpl_parallel_entry { mysql_cond_t COND_parallel_entry; uint64 current_sub_id; struct rpl_group_info *current_group_info; + /* + The sub_id of the last event group in the previous batch of group-committed + transactions. + + When we spawn parallel worker threads for the next group-committed batch, + they first need to wait for this sub_id to be committed before it is safe + to start executing them. + */ + uint64 prev_groupcommit_sub_id; }; struct rpl_parallel { HASH domain_hash; @@ -69,7 +78,7 @@ struct rpl_parallel { ~rpl_parallel(); rpl_parallel_entry *find(uint32 domain_id); void wait_for_done(); - bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd); + bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index f189f9adffa..8fb22266d5e 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1226,7 +1226,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, middle of the "transaction". START SLAVE will resume at BEGIN while the MyISAM table has already been updated. */ - if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) + if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) inc_event_relay_log_pos(); else { @@ -1267,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) { DBUG_ENTER("Relay_log_info::cleanup_context"); - DBUG_ASSERT(sql_thd == thd); + DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd); /* 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, may have opened tables, which we cannot be sure have been closed (because @@ -1534,8 +1534,8 @@ end: rpl_group_info::rpl_group_info(Relay_log_info *rli_) - : rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0), - parallel_entry(0) + : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0) { bzero(¤t_gtid, sizeof(current_gtid)); } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index c22773f9810..294f2ba885a 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -604,6 +604,7 @@ private: struct rpl_group_info { Relay_log_info *rli; + THD *thd; /* Current GTID being processed. The sub_id gives the binlog order within one domain_id. A zero sub_id @@ -630,10 +631,19 @@ struct rpl_group_info */ uint64 wait_commit_sub_id; struct rpl_group_info *wait_commit_group_info; + /* + If non-zero, the event group must wait for this sub_id to be committed + before the execution of the event group is allowed to start. + + (When we execute in parallel the transactions that group committed + together on the master, we still need to wait for any prior transactions + to have commtted). + */ + uint64 wait_start_sub_id; struct rpl_parallel_entry *parallel_entry; - rpl_group_info(Relay_log_info *rli); + rpl_group_info(Relay_log_info *rli_); ~rpl_group_info() { }; }; diff --git a/sql/slave.cc b/sql/slave.cc index 474a6f902d2..b9ef3172364 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3246,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, } if (opt_slave_parallel_threads > 0) - DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd)); + DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev)); /* For GTID, allocate a new sub_id for the given domain_id. @@ -3995,6 +3995,7 @@ pthread_handler_t handle_slave_sql(void *arg) thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is thd->rpl_filter = mi->rpl_filter; + serial_rgi.thd= thd; DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->mi == mi); |