diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-11 09:18:16 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-11 09:18:16 +0100 |
commit | ed04c40b01c122436eda6552c550d62ce8a3920b (patch) | |
tree | b36044637c34b07c0528f82c8a238fdb868ea1ab | |
parent | a7fd11b31d52b62ef7b61783bb83a5e62271307b (diff) | |
download | mariadb-git-ed04c40b01c122436eda6552c550d62ce8a3920b.tar.gz |
MDEV-5289: master server starts slave parallel threads
Delay spawning parallel replication worker threads until a slave SQL
thread is running, and de-spawn them when the last SQL thread stops.
This is especially useful to avoid needless threads on a master in a
setup where same my.cnf is used on masters and slaves.
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 14 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 11 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result | 15 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf | 5 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test | 9 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 28 | ||||
-rw-r--r-- | sql/rpl_mi.h | 1 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 63 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 6 | ||||
-rw-r--r-- | sql/slave.cc | 9 | ||||
-rw-r--r-- | sql/sys_vars.cc | 8 |
11 files changed, 116 insertions, 53 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 3c66a541cc1..499ca23a645 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -4,8 +4,22 @@ SET GLOBAL slave_parallel_threads=10; ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) +OK CHANGE MASTER TO master_use_gtid=slave_pos; include/start_slave.inc +SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) +OK +include/stop_slave.inc +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) +OK +include/start_slave.inc +SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) +OK *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 7397ede14b3..cafdffe81ac 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -12,9 +12,20 @@ SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; SET GLOBAL slave_parallel_threads=10; --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=10; + +# Check that we do not spawn any worker threads when no slave is running. +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; + CHANGE MASTER TO master_use_gtid=slave_pos; --source include/start_slave.inc +# Check that worker threads get spawned when slave starts. +SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +# ... and that worker threads get removed when slave stops. +--source include/stop_slave.inc +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +--source include/start_slave.inc +SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; --echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** diff --git a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result index 2956d04c065..56aa5976f91 100644 --- a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result +++ b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result @@ -1,13 +1,22 @@ SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; -SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; -must be zero because of default -0 +SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) +OK +SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf'; +must be 20 because of .cnf +20 SELECT @@SESSION.slave_parallel_threads as 'no session var'; ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable SET GLOBAL slave_parallel_threads= 0; SET GLOBAL slave_parallel_threads= DEFAULT; +SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default'; +must be 0 because of default +0 SET GLOBAL slave_parallel_threads= 10; SELECT @@GLOBAL.slave_parallel_threads; @@GLOBAL.slave_parallel_threads 10 +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; +IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) +OK SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf new file mode 100644 index 00000000000..02bdb44e1de --- /dev/null +++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf @@ -0,0 +1,5 @@ +# Use default setting for mysqld processes +!include include/default_mysqld.cnf + +[mysqld.1] +slave_parallel_threads=20 diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test index 8e987489d86..b567b7f8854 100644 --- a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test +++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test @@ -2,13 +2,20 @@ SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; -SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; +# Check that we don't spawn worker threads at server startup, when no +# slave is configured (MDEV-5289). +SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; + +SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf'; --error ER_INCORRECT_GLOBAL_LOCAL_VAR SELECT @@SESSION.slave_parallel_threads as 'no session var'; SET GLOBAL slave_parallel_threads= 0; SET GLOBAL slave_parallel_threads= DEFAULT; +SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default'; SET GLOBAL slave_parallel_threads= 10; SELECT @@GLOBAL.slave_parallel_threads; +# Check that we don't spawn worker threads when no slave is started. +SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user"; SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 081265ccf58..50cd44df824 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -1248,7 +1248,7 @@ bool Master_info_index::remove_master_info(LEX_STRING *name) bool Master_info_index::give_error_if_slave_running() { - DBUG_ENTER("warn_if_slave_running"); + DBUG_ENTER("give_error_if_slave_running"); mysql_mutex_assert_owner(&LOCK_active_mi); if (!this) // master_info_index is set to NULL on server shutdown return TRUE; @@ -1269,6 +1269,32 @@ bool Master_info_index::give_error_if_slave_running() /** + Master_info_index::any_slave_sql_running() + + The LOCK_active_mi must be held while calling this function. + + @return + TRUE If some slave SQL thread is running. + FALSE No slave SQL thread is running +*/ + +bool Master_info_index::any_slave_sql_running() +{ + DBUG_ENTER("any_slave_sql_running"); + if (!this) // master_info_index is set to NULL on server shutdown + return TRUE; + + for (uint i= 0; i< master_info_hash.records; ++i) + { + Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + DBUG_RETURN(TRUE); + } + DBUG_RETURN(FALSE); +} + + +/** Master_info_index::start_all_slaves() Start all slaves that was not running. diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index ebb1b541728..2b0b40feb3d 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -218,6 +218,7 @@ public: Master_info *get_master_info(LEX_STRING *connection_name, Sql_condition::enum_warning_level warning); bool give_error_if_slave_running(); + bool any_slave_sql_running(); bool start_all_slaves(THD *thd); bool stop_all_slaves(THD *thd); }; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index ec177280fea..2f47ad8892b 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -944,9 +944,9 @@ dealloc_gco(group_commit_orderer *gco) } -int +static int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, - uint32 new_count, bool skip_check) + uint32 new_count) { uint32 i; rpl_parallel_thread **new_list= NULL; @@ -991,24 +991,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, new_free_list= new_list[i]; } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index->give_error_if_slave_running()) - { - mysql_mutex_unlock(&LOCK_active_mi); - goto err; - } - if (pool->changing) - { - mysql_mutex_unlock(&LOCK_active_mi); - my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0)); - goto err; - } - pool->changing= true; - mysql_mutex_unlock(&LOCK_active_mi); - } - /* Grab each old thread in turn, and signal it to stop. @@ -1068,13 +1050,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - pool->changing= false; - mysql_mutex_unlock(&LOCK_active_mi); - } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); mysql_cond_broadcast(&pool->COND_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); @@ -1101,16 +1076,26 @@ err: } my_free(new_list); } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - pool->changing= false; - mysql_mutex_unlock(&LOCK_active_mi); - } return 1; } +int +rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) +{ + if (!pool->count) + return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads); + return 0; +} + + +int +rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) +{ + return rpl_parallel_change_thread_count(pool, 0); +} + + void rpl_parallel_thread::batch_free() { @@ -1354,7 +1339,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), changing(false), inited(false) + : count(0), threads(0), free_list(0), inited(false) { } @@ -1369,10 +1354,14 @@ rpl_parallel_thread_pool::init(uint32 size) mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); - changing= false; inited= true; - return rpl_parallel_change_thread_count(this, size, true); + /* + The pool is initially empty. Threads will be spawned when a slave SQL + thread is started. + */ + + return 0; } @@ -1381,7 +1370,7 @@ rpl_parallel_thread_pool::destroy() { if (!inited) return; - rpl_parallel_change_thread_count(this, 0, true); + rpl_parallel_change_thread_count(this, 0); mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool); inited= false; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index d60b2200c17..00421d6f5c9 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -204,7 +204,6 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_thread *free_list; mysql_mutex_t LOCK_rpl_thread_pool; mysql_cond_t COND_rpl_thread_pool; - bool changing; bool inited; rpl_parallel_thread_pool(); @@ -314,9 +313,8 @@ struct rpl_parallel { extern struct rpl_parallel_thread_pool global_rpl_thread_pool; -extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, - uint32 new_count, - bool skip_check= false); +extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); +extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); #endif /* RPL_PARALLEL_H */ diff --git a/sql/slave.cc b/sql/slave.cc index 1e1edcbe7c9..703338c435c 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -652,6 +652,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); mysql_mutex_unlock(log_lock); + + if (opt_slave_parallel_threads > 0 && + !master_info_index->any_slave_sql_running()) + rpl_parallel_inactivate_pool(&global_rpl_thread_pool); } if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { @@ -958,7 +962,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mi); if (!error && (thread_mask & SLAVE_SQL)) { - error= start_slave_thread( + if (opt_slave_parallel_threads > 0) + error= rpl_parallel_activate_pool(&global_rpl_thread_pool); + if (!error) + error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_sql, #endif diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index f1942d08591..31bd712ee67 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1752,16 +1752,12 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) static bool fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type) { - bool running; - bool err= false; + bool err; mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); - running= master_info_index->give_error_if_slave_running(); + err= master_info_index->give_error_if_slave_running(); mysql_mutex_unlock(&LOCK_active_mi); - if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool, - opt_slave_parallel_threads)) - err= true; mysql_mutex_lock(&LOCK_global_system_variables); return err; |