summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2015-03-11 09:18:16 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2015-03-11 09:18:16 +0100
commited04c40b01c122436eda6552c550d62ce8a3920b (patch)
treeb36044637c34b07c0528f82c8a238fdb868ea1ab
parenta7fd11b31d52b62ef7b61783bb83a5e62271307b (diff)
downloadmariadb-git-ed04c40b01c122436eda6552c550d62ce8a3920b.tar.gz
MDEV-5289: master server starts slave parallel threads
Delay spawning parallel replication worker threads until a slave SQL thread is running, and de-spawn them when the last SQL thread stops. This is especially useful to avoid needless threads on a master in a setup where same my.cnf is used on masters and slaves.
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result14
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test11
-rw-r--r--mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result15
-rw-r--r--mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf5
-rw-r--r--mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test9
-rw-r--r--sql/rpl_mi.cc28
-rw-r--r--sql/rpl_mi.h1
-rw-r--r--sql/rpl_parallel.cc63
-rw-r--r--sql/rpl_parallel.h6
-rw-r--r--sql/slave.cc9
-rw-r--r--sql/sys_vars.cc8
11 files changed, 116 insertions, 53 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 3c66a541cc1..499ca23a645 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -4,8 +4,22 @@ SET GLOBAL slave_parallel_threads=10;
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
+OK
+include/stop_slave.inc
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
+include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
+OK
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 7397ede14b3..cafdffe81ac 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -12,9 +12,20 @@ SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
+
+# Check that we do not spawn any worker threads when no slave is running.
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc
+# Check that worker threads get spawned when slave starts.
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+# ... and that worker threads get removed when slave stops.
+--source include/stop_slave.inc
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+--source include/start_slave.inc
+SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
diff --git a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
index 2956d04c065..56aa5976f91 100644
--- a/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
+++ b/mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
@@ -1,13 +1,22 @@
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
-SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
-must be zero because of default
-0
+SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
+must be 20 because of .cnf
+20
SELECT @@SESSION.slave_parallel_threads as 'no session var';
ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable
SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT;
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
+must be 0 because of default
+0
SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads;
@@GLOBAL.slave_parallel_threads
10
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
+OK
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf
new file mode 100644
index 00000000000..02bdb44e1de
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.cnf
@@ -0,0 +1,5 @@
+# Use default setting for mysqld processes
+!include include/default_mysqld.cnf
+
+[mysqld.1]
+slave_parallel_threads=20
diff --git a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
index 8e987489d86..b567b7f8854 100644
--- a/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
+++ b/mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
@@ -2,13 +2,20 @@
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
-SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
+# Check that we don't spawn worker threads at server startup, when no
+# slave is configured (MDEV-5289).
+SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
+
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.slave_parallel_threads as 'no session var';
SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT;
+SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads;
+# Check that we don't spawn worker threads when no slave is started.
+SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 081265ccf58..50cd44df824 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -1248,7 +1248,7 @@ bool Master_info_index::remove_master_info(LEX_STRING *name)
bool Master_info_index::give_error_if_slave_running()
{
- DBUG_ENTER("warn_if_slave_running");
+ DBUG_ENTER("give_error_if_slave_running");
mysql_mutex_assert_owner(&LOCK_active_mi);
if (!this) // master_info_index is set to NULL on server shutdown
return TRUE;
@@ -1269,6 +1269,32 @@ bool Master_info_index::give_error_if_slave_running()
/**
+ Master_info_index::any_slave_sql_running()
+
+ The LOCK_active_mi must be held while calling this function.
+
+ @return
+ TRUE If some slave SQL thread is running.
+ FALSE No slave SQL thread is running
+*/
+
+bool Master_info_index::any_slave_sql_running()
+{
+ DBUG_ENTER("any_slave_sql_running");
+ if (!this) // master_info_index is set to NULL on server shutdown
+ return TRUE;
+
+ for (uint i= 0; i< master_info_hash.records; ++i)
+ {
+ Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i);
+ if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
+ DBUG_RETURN(TRUE);
+ }
+ DBUG_RETURN(FALSE);
+}
+
+
+/**
Master_info_index::start_all_slaves()
Start all slaves that was not running.
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index ebb1b541728..2b0b40feb3d 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -218,6 +218,7 @@ public:
Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning);
bool give_error_if_slave_running();
+ bool any_slave_sql_running();
bool start_all_slaves(THD *thd);
bool stop_all_slaves(THD *thd);
};
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ec177280fea..2f47ad8892b 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -944,9 +944,9 @@ dealloc_gco(group_commit_orderer *gco)
}
-int
+static int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
- uint32 new_count, bool skip_check)
+ uint32 new_count)
{
uint32 i;
rpl_parallel_thread **new_list= NULL;
@@ -991,24 +991,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
new_free_list= new_list[i];
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- if (master_info_index->give_error_if_slave_running())
- {
- mysql_mutex_unlock(&LOCK_active_mi);
- goto err;
- }
- if (pool->changing)
- {
- mysql_mutex_unlock(&LOCK_active_mi);
- my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0));
- goto err;
- }
- pool->changing= true;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
-
/*
Grab each old thread in turn, and signal it to stop.
@@ -1068,13 +1050,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- pool->changing= false;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
-
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
@@ -1101,16 +1076,26 @@ err:
}
my_free(new_list);
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- pool->changing= false;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
return 1;
}
+int
+rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
+{
+ if (!pool->count)
+ return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
+ return 0;
+}
+
+
+int
+rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
+{
+ return rpl_parallel_change_thread_count(pool, 0);
+}
+
+
void
rpl_parallel_thread::batch_free()
{
@@ -1354,7 +1339,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : count(0), threads(0), free_list(0), changing(false), inited(false)
+ : count(0), threads(0), free_list(0), inited(false)
{
}
@@ -1369,10 +1354,14 @@ rpl_parallel_thread_pool::init(uint32 size)
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
- changing= false;
inited= true;
- return rpl_parallel_change_thread_count(this, size, true);
+ /*
+ The pool is initially empty. Threads will be spawned when a slave SQL
+ thread is started.
+ */
+
+ return 0;
}
@@ -1381,7 +1370,7 @@ rpl_parallel_thread_pool::destroy()
{
if (!inited)
return;
- rpl_parallel_change_thread_count(this, 0, true);
+ rpl_parallel_change_thread_count(this, 0);
mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index d60b2200c17..00421d6f5c9 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -204,7 +204,6 @@ struct rpl_parallel_thread_pool {
struct rpl_parallel_thread *free_list;
mysql_mutex_t LOCK_rpl_thread_pool;
mysql_cond_t COND_rpl_thread_pool;
- bool changing;
bool inited;
rpl_parallel_thread_pool();
@@ -314,9 +313,8 @@ struct rpl_parallel {
extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
-extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
- uint32 new_count,
- bool skip_check= false);
+extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
+extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
#endif /* RPL_PARALLEL_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index 1e1edcbe7c9..703338c435c 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -652,6 +652,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock);
+
+ if (opt_slave_parallel_threads > 0 &&
+ !master_info_index->any_slave_sql_running())
+ rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
}
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{
@@ -958,7 +962,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi);
if (!error && (thread_mask & SLAVE_SQL))
{
- error= start_slave_thread(
+ if (opt_slave_parallel_threads > 0)
+ error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
+ if (!error)
+ error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql,
#endif
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index f1942d08591..31bd712ee67 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1752,16 +1752,12 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
static bool
fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
{
- bool running;
- bool err= false;
+ bool err;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
- running= master_info_index->give_error_if_slave_running();
+ err= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
- if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
- opt_slave_parallel_threads))
- err= true;
mysql_mutex_lock(&LOCK_global_system_variables);
return err;