diff options
-rw-r--r-- | mysys/hash.c | 12 | ||||
-rw-r--r-- | sql/mysqld.cc | 3 | ||||
-rw-r--r-- | sql/mysqld.h | 2 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 61 | ||||
-rw-r--r-- | sql/rpl_mi.h | 4 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 28 | ||||
-rw-r--r-- | sql/slave.cc | 22 | ||||
-rw-r--r-- | sql/slave.h | 2 | ||||
-rw-r--r-- | sql/sql_repl.cc | 57 |
9 files changed, 132 insertions, 59 deletions
diff --git a/mysys/hash.c b/mysys/hash.c index 344b698a433..c2f3555396e 100644 --- a/mysys/hash.c +++ b/mysys/hash.c @@ -115,14 +115,19 @@ my_hash_init2(HASH *hash, uint growth_size, CHARSET_INFO *charset, static inline void my_hash_free_elements(HASH *hash) { + uint records= hash->records; + /* + Set records to 0 early to guard against anyone looking at the structure + during the free process + */ + hash->records= 0; if (hash->free) { HASH_LINK *data=dynamic_element(&hash->array,0,HASH_LINK*); - HASH_LINK *end= data + hash->records; + HASH_LINK *end= data + records; while (data < end) (*hash->free)((data++)->data); } - hash->records=0; } @@ -519,6 +524,9 @@ my_bool my_hash_insert(HASH *info, const uchar *record) The record with the same record ptr is removed. If there is a free-function it's called if record was found. + hash->free() is guarantee to be called only after the row has been + deleted from the hash and the hash can be reused by other threads. + @return @retval 0 ok @retval 1 Record not found diff --git a/sql/mysqld.cc b/sql/mysqld.cc index ca45e9cdcdb..3e0cdf2d10e 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -863,7 +863,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_system_variables_hash, key_LOCK_thd_data, key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log, key_master_info_data_lock, key_master_info_run_lock, - key_master_info_sleep_lock, + key_master_info_sleep_lock, key_master_info_start_stop_lock, key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, key_rpl_group_info_sleep_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, @@ -933,6 +933,7 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL}, { &key_LOG_LOCK_log, "LOG::LOCK_log", 0}, { &key_master_info_data_lock, "Master_info::data_lock", 0}, + { &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0}, { &key_master_info_run_lock, "Master_info::run_lock", 0}, { &key_master_info_sleep_lock, "Master_info::sleep_lock", 0}, { &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0}, diff --git a/sql/mysqld.h b/sql/mysqld.h index e7394230fe4..4538b7c4c70 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -266,7 +266,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_thd_data, key_LOCK_user_conn, key_LOG_LOCK_log, key_master_info_data_lock, key_master_info_run_lock, - key_master_info_sleep_lock, + key_master_info_sleep_lock, key_master_info_start_stop_lock, key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_rpl_group_info_sleep_lock, diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 7aa84e72bef..8e9fcb85082 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -80,6 +80,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, bzero((char*) &file, sizeof(file)); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock, + MY_MUTEX_INIT_SLOW); mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -117,6 +119,7 @@ Master_info::~Master_info() mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&sleep_lock); + mysql_mutex_destroy(&start_stop_lock); mysql_cond_destroy(&data_cond); mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); @@ -727,17 +730,28 @@ uchar *get_key_master_info(Master_info *mi, size_t *length, return (uchar*) mi->cmp_connection_name.str; } +/* + Delete a master info + + Called from my_hash_delete(&master_info_hash) + Stops associated slave threads and frees master_info +*/ + void free_key_master_info(Master_info *mi) { DBUG_ENTER("free_key_master_info"); + mysql_mutex_unlock(&LOCK_active_mi); + /* Ensure that we are not in reset_slave while this is done */ - lock_slave_threads(mi); + mi->lock_slave_threads(); terminate_slave_threads(mi,SLAVE_FORCE_ALL); /* We use 2 here instead of 1 just to make it easier when debugging */ mi->killed= 2; end_master_info(mi); - unlock_slave_threads(mi); + mi->unlock_slave_threads(); delete mi; + + mysql_mutex_lock(&LOCK_active_mi); DBUG_VOID_RETURN; } @@ -904,6 +918,7 @@ Master_info_index::Master_info_index() void Master_info_index::free_connections() { + mysql_mutex_assert_owner(&LOCK_active_mi); my_hash_reset(&master_info_hash); } @@ -936,7 +951,6 @@ bool Master_info_index::init_all_master_info() File index_file_nr; DBUG_ENTER("init_all_master_info"); - mysql_mutex_assert_owner(&LOCK_active_mi); DBUG_ASSERT(master_info_index); if ((index_file_nr= my_open(index_file_name, @@ -984,7 +998,6 @@ bool Master_info_index::init_all_master_info() DBUG_RETURN(1); } - lock_slave_threads(mi); init_thread_mask(&thread_mask,mi,0 /*not inverse*/); create_logfile_name_with_suffix(buf_master_info_file, @@ -999,6 +1012,7 @@ bool Master_info_index::init_all_master_info() sql_print_information("Reading Master_info: '%s' Relay_info:'%s'", buf_master_info_file, buf_relay_log_info_file); + mi->lock_slave_threads(); if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, 0, thread_mask)) { @@ -1012,14 +1026,14 @@ bool Master_info_index::init_all_master_info() if (master_info_index->add_master_info(mi, FALSE)) DBUG_RETURN(1); succ_num++; - unlock_slave_threads(mi); + mi->unlock_slave_threads(); } else { /* Master_info already in HASH */ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), (int) connection_name.length, connection_name.str); - unlock_slave_threads(mi); + mi->unlock_slave_threads(); delete mi; } continue; @@ -1036,7 +1050,7 @@ bool Master_info_index::init_all_master_info() /* Master_info was already registered */ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), (int) connection_name.length, connection_name.str); - unlock_slave_threads(mi); + mi->unlock_slave_threads(); delete mi; continue; } @@ -1065,7 +1079,7 @@ bool Master_info_index::init_all_master_info() (int) connection_name.length, connection_name.str); } - unlock_slave_threads(mi); + mi->unlock_slave_threads(); } } @@ -1268,7 +1282,12 @@ bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg, /* Add a Master_info class to Hash Table */ bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) { - if (!my_hash_insert(&master_info_hash, (uchar*) mi)) + /* + We have to protect against shutdown to ensure we are not calling + my_hash_insert() while my_hash_free() is in progress + */ + if (unlikely(shutdown_in_progress) || + !my_hash_insert(&master_info_hash, (uchar*) mi)) { if (global_system_variables.log_warnings > 1) sql_print_information("Added new Master_info '%.*s' to hash table", @@ -1392,23 +1411,31 @@ bool give_error_if_slave_running(bool already_locked) @return 0 No Slave SQL thread is running # Number of slave SQL thread running + + Note that during shutdown we return 1. This is needed to ensure we + don't try to resize thread pool during shutdown as during shutdown + master_info_hash may be freeing the hash and during that time + hash entries can't be accessed. */ uint any_slave_sql_running() { uint count= 0; + HASH *hash; DBUG_ENTER("any_slave_sql_running"); mysql_mutex_lock(&LOCK_active_mi); - if (likely(master_info_index)) // Not shutdown + if (unlikely(shutdown_in_progress || !master_info_index)) { - HASH *hash= &master_info_index->master_info_hash; - for (uint i= 0; i< hash->records; ++i) - { - Master_info *mi= (Master_info *)my_hash_element(hash, i); - if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) - count++; - } + mysql_mutex_unlock(&LOCK_active_mi); + return 1; + } + hash= &master_info_index->master_info_hash; + for (uint i= 0; i< hash->records; ++i) + { + Master_info *mi= (Master_info *)my_hash_element(hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + count++; } mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(count); diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 32591c56caa..5c42378ca2c 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -81,6 +81,8 @@ class Master_info : public Slave_reporting_capability } void release(); void wait_until_free(); + void lock_slave_threads(); + void unlock_slave_threads(); /* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ @@ -99,7 +101,7 @@ class Master_info : public Slave_reporting_capability File fd; // we keep the file open, so we need to remember the file pointer IO_CACHE file; - mysql_mutex_t data_lock, run_lock, sleep_lock; + mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock; mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond; THD *io_thd; MYSQL* mysql; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index ff2c7d68467..474c6f063c7 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1378,10 +1378,24 @@ dealloc_gco(group_commit_orderer *gco) my_free(gco); } +/** + Change thread count for global parallel worker threads + + @param pool parallel thread pool + @param new_count Number of threads to be in pool. 0 in shutdown + @param force Force thread count to new_count even if slave + threads are running + + By default we don't resize pool of there are running threads. + However during shutdown we will always do it. + This is needed as any_slave_sql_running() returns 1 during shutdown + as we don't want to access master_info while + Master_info_index::free_connections are running. +*/ static int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, - uint32 new_count) + uint32 new_count, bool force) { uint32 i; rpl_parallel_thread **new_list= NULL; @@ -1403,7 +1417,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, If we are about to delete pool, do an extra check that there are no new slave threads running since we marked pool busy */ - if (!new_count) + if (!new_count && !force) { if (any_slave_sql_running()) { @@ -1556,8 +1570,7 @@ err: int rpl_parallel_resize_pool_if_no_slaves(void) { /* master_info_index is set to NULL on shutdown */ - if (opt_slave_parallel_threads > 0 && !any_slave_sql_running() && - master_info_index) + if (opt_slave_parallel_threads > 0 && !any_slave_sql_running()) return rpl_parallel_inactivate_pool(&global_rpl_thread_pool); return 0; } @@ -1567,7 +1580,8 @@ int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) { if (!pool->count) - return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads); + return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads, + 0); return 0; } @@ -1575,7 +1589,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) { - return rpl_parallel_change_thread_count(pool, 0); + return rpl_parallel_change_thread_count(pool, 0, 0); } @@ -1854,7 +1868,7 @@ rpl_parallel_thread_pool::destroy() { if (!inited) return; - rpl_parallel_change_thread_count(this, 0); + rpl_parallel_change_thread_count(this, 0, 1); mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool); inited= false; diff --git a/sql/slave.cc b/sql/slave.cc index a2b8439b4b4..1bb2b534693 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -228,16 +228,14 @@ void init_thread_mask(int* mask,Master_info* mi,bool inverse) /* - lock_slave_threads() + lock_slave_threads() against other threads doing STOP, START or RESET SLAVE + */ -void lock_slave_threads(Master_info* mi) +void Master_info::lock_slave_threads() { DBUG_ENTER("lock_slave_threads"); - - //TODO: see if we can do this without dual mutex - mysql_mutex_lock(&mi->run_lock); - mysql_mutex_lock(&mi->rli.run_lock); + mysql_mutex_lock(&start_stop_lock); DBUG_VOID_RETURN; } @@ -246,13 +244,10 @@ void lock_slave_threads(Master_info* mi) unlock_slave_threads() */ -void unlock_slave_threads(Master_info* mi) +void Master_info::unlock_slave_threads() { DBUG_ENTER("unlock_slave_threads"); - - //TODO: see if we can do this without dual mutex - mysql_mutex_unlock(&mi->rli.run_lock); - mysql_mutex_unlock(&mi->run_lock); + mysql_mutex_unlock(&start_stop_lock); DBUG_VOID_RETURN; } @@ -374,7 +369,6 @@ int init_slave() accepted. However bootstrap may conflict with us if it does START SLAVE. So it's safer to take the lock. */ - mysql_mutex_lock(&LOCK_active_mi); if (pthread_key_create(&RPL_MASTER_INFO, NULL)) goto err; @@ -383,7 +377,6 @@ int init_slave() if (!master_info_index || master_info_index->init_all_master_info()) { sql_print_error("Failed to initialize multi master structures"); - mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(1); } if (!(active_mi= new Master_info(&default_master_connection_name, @@ -441,7 +434,6 @@ int init_slave() } end: - mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(error); err: @@ -6159,7 +6151,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, suppress_warnings= 0; mi->report(ERROR_LEVEL, last_errno, NULL, "error %s to master '%s@%s:%d'" - " - retry-time: %d retries: %lu message: %s", + " - retry-time: %d maximum-retries: %lu message: %s", (reconnect ? "reconnecting" : "connecting"), mi->user, mi->host, mi->port, mi->connect_retry, master_retry_count, diff --git a/sql/slave.h b/sql/slave.h index cfff79da0e4..abcb4df984b 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -219,8 +219,6 @@ void close_active_mi(); /* clean up slave threads data */ void clear_until_condition(Relay_log_info* rli); void clear_slave_error(Relay_log_info* rli); void end_relay_log_info(Relay_log_info* rli); -void lock_slave_threads(Master_info* mi); -void unlock_slave_threads(Master_info* mi); void init_thread_mask(int* mask,Master_info* mi,bool inverse); Format_description_log_event * read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 29980dbbbf1..7d4fb110229 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -2833,7 +2833,16 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) relay_log_info_file, 0, &mi->cmp_connection_name); - lock_slave_threads(mi); // this allows us to cleanly read slave_running + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(-1); + } + // Get a mask of _stopped_ threads init_thread_mask(&thread_mask,mi,1 /* inverse */); @@ -2968,7 +2977,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ER(ER_UNTIL_COND_IGNORED)); if (!slave_errno) - slave_errno = start_slave_threads(0 /*no mutex */, + slave_errno = start_slave_threads(1, 1 /* wait for start */, mi, master_info_file_tmp, @@ -2984,7 +2993,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) } err: - unlock_slave_threads(mi); + mi->unlock_slave_threads(); if (slave_errno) { @@ -3024,8 +3033,12 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) DBUG_RETURN(-1); THD_STAGE_INFO(thd, stage_killing_slave); int thread_mask; - lock_slave_threads(mi); - // Get a mask of _running_ threads + mi->lock_slave_threads(); + /* + Get a mask of _running_ threads. + We don't have to test for mi->killed as the thread_mask will take care + of checking if threads exists + */ init_thread_mask(&thread_mask,mi,0 /* not inverse*/); /* Below we will stop all running threads. @@ -3038,8 +3051,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) if (thread_mask) { - slave_errno= terminate_slave_threads(mi,thread_mask, - 1 /*skip lock */); + slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */); } else { @@ -3048,7 +3060,8 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING, ER(ER_SLAVE_WAS_NOT_RUNNING)); } - unlock_slave_threads(mi); + + mi->unlock_slave_threads(); if (slave_errno) { @@ -3083,11 +3096,20 @@ int reset_slave(THD *thd, Master_info* mi) char relay_log_info_file_tmp[FN_REFLEN]; DBUG_ENTER("reset_slave"); - lock_slave_threads(mi); + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(-1); + } + init_thread_mask(&thread_mask,mi,0 /* not inverse */); if (thread_mask) // We refuse if any slave thread is running { - unlock_slave_threads(mi); + mi->unlock_slave_threads(); my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, mi->connection_name.str); DBUG_RETURN(ER_SLAVE_MUST_STOP); @@ -3152,7 +3174,7 @@ int reset_slave(THD *thd, Master_info* mi) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: - unlock_slave_threads(mi); + mi->unlock_slave_threads(); if (error) my_error(sql_errno, MYF(0), errmsg); DBUG_RETURN(error); @@ -3286,7 +3308,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) lex_mi->port)) DBUG_RETURN(TRUE); - lock_slave_threads(mi); + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(TRUE); + } + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); if (thread_mask) // We refuse if any slave thread is running { @@ -3593,7 +3624,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) mysql_mutex_unlock(&mi->rli.data_lock); err: - unlock_slave_threads(mi); + mi->unlock_slave_threads(); if (ret == FALSE) my_ok(thd); DBUG_RETURN(ret); |