summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysys/hash.c12
-rw-r--r--sql/mysqld.cc3
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_mi.cc61
-rw-r--r--sql/rpl_mi.h4
-rw-r--r--sql/rpl_parallel.cc28
-rw-r--r--sql/slave.cc22
-rw-r--r--sql/slave.h2
-rw-r--r--sql/sql_repl.cc57
9 files changed, 132 insertions, 59 deletions
diff --git a/mysys/hash.c b/mysys/hash.c
index 344b698a433..c2f3555396e 100644
--- a/mysys/hash.c
+++ b/mysys/hash.c
@@ -115,14 +115,19 @@ my_hash_init2(HASH *hash, uint growth_size, CHARSET_INFO *charset,
static inline void my_hash_free_elements(HASH *hash)
{
+ uint records= hash->records;
+ /*
+ Set records to 0 early to guard against anyone looking at the structure
+ during the free process
+ */
+ hash->records= 0;
if (hash->free)
{
HASH_LINK *data=dynamic_element(&hash->array,0,HASH_LINK*);
- HASH_LINK *end= data + hash->records;
+ HASH_LINK *end= data + records;
while (data < end)
(*hash->free)((data++)->data);
}
- hash->records=0;
}
@@ -519,6 +524,9 @@ my_bool my_hash_insert(HASH *info, const uchar *record)
The record with the same record ptr is removed.
If there is a free-function it's called if record was found.
+ hash->free() is guarantee to be called only after the row has been
+ deleted from the hash and the hash can be reused by other threads.
+
@return
@retval 0 ok
@retval 1 Record not found
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index ca45e9cdcdb..3e0cdf2d10e 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -863,7 +863,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_system_variables_hash, key_LOCK_thd_data,
key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
- key_master_info_sleep_lock,
+ key_master_info_sleep_lock, key_master_info_start_stop_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_rpl_group_info_sleep_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
@@ -933,6 +933,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
{ &key_LOG_LOCK_log, "LOG::LOCK_log", 0},
{ &key_master_info_data_lock, "Master_info::data_lock", 0},
+ { &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0},
{ &key_master_info_run_lock, "Master_info::run_lock", 0},
{ &key_master_info_sleep_lock, "Master_info::sleep_lock", 0},
{ &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0},
diff --git a/sql/mysqld.h b/sql/mysqld.h
index e7394230fe4..4538b7c4c70 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -266,7 +266,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thd_data,
key_LOCK_user_conn, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
- key_master_info_sleep_lock,
+ key_master_info_sleep_lock, key_master_info_start_stop_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_rpl_group_info_sleep_lock,
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 7aa84e72bef..8e9fcb85082 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -80,6 +80,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
+ MY_MUTEX_INIT_SLOW);
mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
@@ -117,6 +119,7 @@ Master_info::~Master_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&sleep_lock);
+ mysql_mutex_destroy(&start_stop_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
@@ -727,17 +730,28 @@ uchar *get_key_master_info(Master_info *mi, size_t *length,
return (uchar*) mi->cmp_connection_name.str;
}
+/*
+ Delete a master info
+
+ Called from my_hash_delete(&master_info_hash)
+ Stops associated slave threads and frees master_info
+*/
+
void free_key_master_info(Master_info *mi)
{
DBUG_ENTER("free_key_master_info");
+ mysql_mutex_unlock(&LOCK_active_mi);
+
/* Ensure that we are not in reset_slave while this is done */
- lock_slave_threads(mi);
+ mi->lock_slave_threads();
terminate_slave_threads(mi,SLAVE_FORCE_ALL);
/* We use 2 here instead of 1 just to make it easier when debugging */
mi->killed= 2;
end_master_info(mi);
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
delete mi;
+
+ mysql_mutex_lock(&LOCK_active_mi);
DBUG_VOID_RETURN;
}
@@ -904,6 +918,7 @@ Master_info_index::Master_info_index()
void Master_info_index::free_connections()
{
+ mysql_mutex_assert_owner(&LOCK_active_mi);
my_hash_reset(&master_info_hash);
}
@@ -936,7 +951,6 @@ bool Master_info_index::init_all_master_info()
File index_file_nr;
DBUG_ENTER("init_all_master_info");
- mysql_mutex_assert_owner(&LOCK_active_mi);
DBUG_ASSERT(master_info_index);
if ((index_file_nr= my_open(index_file_name,
@@ -984,7 +998,6 @@ bool Master_info_index::init_all_master_info()
DBUG_RETURN(1);
}
- lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
create_logfile_name_with_suffix(buf_master_info_file,
@@ -999,6 +1012,7 @@ bool Master_info_index::init_all_master_info()
sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
buf_master_info_file, buf_relay_log_info_file);
+ mi->lock_slave_threads();
if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
0, thread_mask))
{
@@ -1012,14 +1026,14 @@ bool Master_info_index::init_all_master_info()
if (master_info_index->add_master_info(mi, FALSE))
DBUG_RETURN(1);
succ_num++;
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
}
else
{
/* Master_info already in HASH */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str);
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
delete mi;
}
continue;
@@ -1036,7 +1050,7 @@ bool Master_info_index::init_all_master_info()
/* Master_info was already registered */
sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
(int) connection_name.length, connection_name.str);
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
delete mi;
continue;
}
@@ -1065,7 +1079,7 @@ bool Master_info_index::init_all_master_info()
(int) connection_name.length,
connection_name.str);
}
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
}
}
@@ -1268,7 +1282,12 @@ bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
/* Add a Master_info class to Hash Table */
bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
{
- if (!my_hash_insert(&master_info_hash, (uchar*) mi))
+ /*
+ We have to protect against shutdown to ensure we are not calling
+ my_hash_insert() while my_hash_free() is in progress
+ */
+ if (unlikely(shutdown_in_progress) ||
+ !my_hash_insert(&master_info_hash, (uchar*) mi))
{
if (global_system_variables.log_warnings > 1)
sql_print_information("Added new Master_info '%.*s' to hash table",
@@ -1392,23 +1411,31 @@ bool give_error_if_slave_running(bool already_locked)
@return
0 No Slave SQL thread is running
# Number of slave SQL thread running
+
+ Note that during shutdown we return 1. This is needed to ensure we
+ don't try to resize thread pool during shutdown as during shutdown
+ master_info_hash may be freeing the hash and during that time
+ hash entries can't be accessed.
*/
uint any_slave_sql_running()
{
uint count= 0;
+ HASH *hash;
DBUG_ENTER("any_slave_sql_running");
mysql_mutex_lock(&LOCK_active_mi);
- if (likely(master_info_index)) // Not shutdown
+ if (unlikely(shutdown_in_progress || !master_info_index))
{
- HASH *hash= &master_info_index->master_info_hash;
- for (uint i= 0; i< hash->records; ++i)
- {
- Master_info *mi= (Master_info *)my_hash_element(hash, i);
- if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
- count++;
- }
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return 1;
+ }
+ hash= &master_info_index->master_info_hash;
+ for (uint i= 0; i< hash->records; ++i)
+ {
+ Master_info *mi= (Master_info *)my_hash_element(hash, i);
+ if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
+ count++;
}
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(count);
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 32591c56caa..5c42378ca2c 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -81,6 +81,8 @@ class Master_info : public Slave_reporting_capability
}
void release();
void wait_until_free();
+ void lock_slave_threads();
+ void unlock_slave_threads();
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
@@ -99,7 +101,7 @@ class Master_info : public Slave_reporting_capability
File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
- mysql_mutex_t data_lock, run_lock, sleep_lock;
+ mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock;
mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
THD *io_thd;
MYSQL* mysql;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ff2c7d68467..474c6f063c7 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1378,10 +1378,24 @@ dealloc_gco(group_commit_orderer *gco)
my_free(gco);
}
+/**
+ Change thread count for global parallel worker threads
+
+ @param pool parallel thread pool
+ @param new_count Number of threads to be in pool. 0 in shutdown
+ @param force Force thread count to new_count even if slave
+ threads are running
+
+ By default we don't resize pool of there are running threads.
+ However during shutdown we will always do it.
+ This is needed as any_slave_sql_running() returns 1 during shutdown
+ as we don't want to access master_info while
+ Master_info_index::free_connections are running.
+*/
static int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
- uint32 new_count)
+ uint32 new_count, bool force)
{
uint32 i;
rpl_parallel_thread **new_list= NULL;
@@ -1403,7 +1417,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
If we are about to delete pool, do an extra check that there are no new
slave threads running since we marked pool busy
*/
- if (!new_count)
+ if (!new_count && !force)
{
if (any_slave_sql_running())
{
@@ -1556,8 +1570,7 @@ err:
int rpl_parallel_resize_pool_if_no_slaves(void)
{
/* master_info_index is set to NULL on shutdown */
- if (opt_slave_parallel_threads > 0 && !any_slave_sql_running() &&
- master_info_index)
+ if (opt_slave_parallel_threads > 0 && !any_slave_sql_running())
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
return 0;
}
@@ -1567,7 +1580,8 @@ int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{
if (!pool->count)
- return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
+ return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
+ 0);
return 0;
}
@@ -1575,7 +1589,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
int
rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
{
- return rpl_parallel_change_thread_count(pool, 0);
+ return rpl_parallel_change_thread_count(pool, 0, 0);
}
@@ -1854,7 +1868,7 @@ rpl_parallel_thread_pool::destroy()
{
if (!inited)
return;
- rpl_parallel_change_thread_count(this, 0);
+ rpl_parallel_change_thread_count(this, 0, 1);
mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false;
diff --git a/sql/slave.cc b/sql/slave.cc
index a2b8439b4b4..1bb2b534693 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -228,16 +228,14 @@ void init_thread_mask(int* mask,Master_info* mi,bool inverse)
/*
- lock_slave_threads()
+ lock_slave_threads() against other threads doing STOP, START or RESET SLAVE
+
*/
-void lock_slave_threads(Master_info* mi)
+void Master_info::lock_slave_threads()
{
DBUG_ENTER("lock_slave_threads");
-
- //TODO: see if we can do this without dual mutex
- mysql_mutex_lock(&mi->run_lock);
- mysql_mutex_lock(&mi->rli.run_lock);
+ mysql_mutex_lock(&start_stop_lock);
DBUG_VOID_RETURN;
}
@@ -246,13 +244,10 @@ void lock_slave_threads(Master_info* mi)
unlock_slave_threads()
*/
-void unlock_slave_threads(Master_info* mi)
+void Master_info::unlock_slave_threads()
{
DBUG_ENTER("unlock_slave_threads");
-
- //TODO: see if we can do this without dual mutex
- mysql_mutex_unlock(&mi->rli.run_lock);
- mysql_mutex_unlock(&mi->run_lock);
+ mysql_mutex_unlock(&start_stop_lock);
DBUG_VOID_RETURN;
}
@@ -374,7 +369,6 @@ int init_slave()
accepted. However bootstrap may conflict with us if it does START SLAVE.
So it's safer to take the lock.
*/
- mysql_mutex_lock(&LOCK_active_mi);
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
@@ -383,7 +377,6 @@ int init_slave()
if (!master_info_index || master_info_index->init_all_master_info())
{
sql_print_error("Failed to initialize multi master structures");
- mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(1);
}
if (!(active_mi= new Master_info(&default_master_connection_name,
@@ -441,7 +434,6 @@ int init_slave()
}
end:
- mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error);
err:
@@ -6159,7 +6151,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
suppress_warnings= 0;
mi->report(ERROR_LEVEL, last_errno, NULL,
"error %s to master '%s@%s:%d'"
- " - retry-time: %d retries: %lu message: %s",
+ " - retry-time: %d maximum-retries: %lu message: %s",
(reconnect ? "reconnecting" : "connecting"),
mi->user, mi->host, mi->port,
mi->connect_retry, master_retry_count,
diff --git a/sql/slave.h b/sql/slave.h
index cfff79da0e4..abcb4df984b 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -219,8 +219,6 @@ void close_active_mi(); /* clean up slave threads data */
void clear_until_condition(Relay_log_info* rli);
void clear_slave_error(Relay_log_info* rli);
void end_relay_log_info(Relay_log_info* rli);
-void lock_slave_threads(Master_info* mi);
-void unlock_slave_threads(Master_info* mi);
void init_thread_mask(int* mask,Master_info* mi,bool inverse);
Format_description_log_event *
read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 29980dbbbf1..7d4fb110229 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -2833,7 +2833,16 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
relay_log_info_file, 0,
&mi->cmp_connection_name);
- lock_slave_threads(mi); // this allows us to cleanly read slave_running
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(-1);
+ }
+
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
@@ -2968,7 +2977,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
ER(ER_UNTIL_COND_IGNORED));
if (!slave_errno)
- slave_errno = start_slave_threads(0 /*no mutex */,
+ slave_errno = start_slave_threads(1,
1 /* wait for start */,
mi,
master_info_file_tmp,
@@ -2984,7 +2993,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
}
err:
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
if (slave_errno)
{
@@ -3024,8 +3033,12 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
DBUG_RETURN(-1);
THD_STAGE_INFO(thd, stage_killing_slave);
int thread_mask;
- lock_slave_threads(mi);
- // Get a mask of _running_ threads
+ mi->lock_slave_threads();
+ /*
+ Get a mask of _running_ threads.
+ We don't have to test for mi->killed as the thread_mask will take care
+ of checking if threads exists
+ */
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
/*
Below we will stop all running threads.
@@ -3038,8 +3051,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
if (thread_mask)
{
- slave_errno= terminate_slave_threads(mi,thread_mask,
- 1 /*skip lock */);
+ slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */);
}
else
{
@@ -3048,7 +3060,8 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
ER(ER_SLAVE_WAS_NOT_RUNNING));
}
- unlock_slave_threads(mi);
+
+ mi->unlock_slave_threads();
if (slave_errno)
{
@@ -3083,11 +3096,20 @@ int reset_slave(THD *thd, Master_info* mi)
char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("reset_slave");
- lock_slave_threads(mi);
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(-1);
+ }
+
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
if (thread_mask) // We refuse if any slave thread is running
{
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
mi->connection_name.str);
DBUG_RETURN(ER_SLAVE_MUST_STOP);
@@ -3152,7 +3174,7 @@ int reset_slave(THD *thd, Master_info* mi)
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
if (error)
my_error(sql_errno, MYF(0), errmsg);
DBUG_RETURN(error);
@@ -3286,7 +3308,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
lex_mi->port))
DBUG_RETURN(TRUE);
- lock_slave_threads(mi);
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(TRUE);
+ }
+
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
if (thread_mask) // We refuse if any slave thread is running
{
@@ -3593,7 +3624,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
mysql_mutex_unlock(&mi->rli.data_lock);
err:
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
if (ret == FALSE)
my_ok(thd);
DBUG_RETURN(ret);