diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-07-04 09:20:56 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-07-04 09:20:56 +0200 |
commit | a1cfd473469171e5a9700dbff0ee0e1eb84d6312 (patch) | |
tree | 026dad4ef4a445e81bb046d1c8c1e8c73fa1e0d6 | |
parent | 592e464a021747d7ac5b13222f5de1cd4250531c (diff) | |
download | mariadb-git-a1cfd473469171e5a9700dbff0ee0e1eb84d6312.tar.gz |
MDEV-4506: Parallel replication: Intermediate commit.
Wait for all worker threads to finish when stopping the SQL thread.
(Only a basic wait; this still needs to be fixed to include timeout
logic as in sql_slave_killed()).
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 3 | ||||
-rw-r--r-- | sql/mysqld.cc | 6 | ||||
-rw-r--r-- | sql/mysqld.h | 3 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 25 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | sql/slave.cc | 3 |
6 files changed, 38 insertions, 5 deletions
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 5748218dc10..3ace346e006 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -44,10 +44,9 @@ query_vertical SHOW SLAVE STATUS; --source include/start_slave.inc SELECT * FROM t1; ---sleep 1 -SELECT * FROM t1; --source include/stop_slave.inc +SELECT * FROM t1; --connection s1 SET sql_log_bin=0; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 52c754993ac..816756338a4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -894,7 +894,8 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; +PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, + key_COND_parallel_entry; static PSI_cond_info all_server_conds[]= { @@ -938,7 +939,8 @@ static PSI_cond_info all_server_conds[]= { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_rpl_thread, "COND_rpl_thread", 0}, - { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0} + { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}, + { &key_COND_parallel_entry, "COND_parallel_entry", 0} }; PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, diff --git a/sql/mysqld.h b/sql/mysqld.h index d3b17cfefe1..3475835c67b 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -283,7 +283,8 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; +extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, + key_COND_parallel_entry; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_kill_server, key_thread_main, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 1a6eb9e3d50..21c3dcf6d90 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -21,6 +21,9 @@ the logic in sql_slave_killed() that waits for current event group to complete needs to be extended appropriately... + - Audit the use of Relay_log_info::data_lock. Make sure it is held + correctly in all needed places also when using parallel replication. + - We need some user-configurable limit on how far ahead the SQL thread will fetch and queue events for parallel execution (otherwise if slave gets behind we will fill up memory with pending malloc()'ed events). @@ -194,7 +197,11 @@ handle_rpl_parallel_thread(void *arg) */ mysql_mutex_lock(&entry->LOCK_parallel_entry); if (entry->last_committed_sub_id < rgi->gtid_sub_id) + { entry->last_committed_sub_id= rgi->gtid_sub_id; + if (entry->need_signal) + mysql_cond_broadcast(&entry->COND_parallel_entry); + } mysql_mutex_unlock(&entry->LOCK_parallel_entry); rgi->commit_orderer.wakeup_subsequent_commits(); @@ -463,12 +470,30 @@ rpl_parallel::find(uint32 domain_id) } mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); } return e; } +void +rpl_parallel::wait_for_done() +{ + struct rpl_parallel_entry *e; + uint32 i; + + for (i= 0; i < domain_hash.records; ++i) + { + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + while (e->current_sub_id > e->last_commit_id) + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } +} + + bool rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *parent_thd) diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index b0367efdea6..09bde20f5af 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -50,6 +50,7 @@ struct rpl_parallel_entry { uint64 last_seq_no; uint64 last_commit_id; bool active; + bool need_signal; rpl_parallel_thread *rpl_thread; /* The sub_id of the last transaction to commit within this domain_id. @@ -57,6 +58,7 @@ struct rpl_parallel_entry { */ uint64 last_committed_sub_id; mysql_mutex_t LOCK_parallel_entry; + mysql_cond_t COND_parallel_entry; uint64 current_sub_id; struct rpl_group_info *current_group_info; }; @@ -67,6 +69,7 @@ struct rpl_parallel { rpl_parallel(); ~rpl_parallel(); rpl_parallel_entry *find(uint32 domain_id); + void wait_for_done(); bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd); }; diff --git a/sql/slave.cc b/sql/slave.cc index ba4fef03639..9b3df653384 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4342,6 +4342,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ rli->executed_entries++; } + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(); + /* Thread stopped. Print the current replication position to the log */ { String tmp; |