diff options
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r-- | sql/rpl_mi.cc | 442 |
1 files changed, 328 insertions, 114 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index a141d238f78..f30b7e161f2 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -40,7 +40,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, sync_counter(0), heartbeat_period(0), received_heartbeats(0), master_id(0), prev_master_id(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), + in_start_all_slaves(0), in_stop_all_slaves(0), + users(0), killed(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -81,6 +83,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, bzero((char*) &file, sizeof(file)); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock, + MY_MUTEX_INIT_SLOW); mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION); mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -90,8 +94,27 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL); } + +/** + Wait until no one is using Master_info +*/ + +void Master_info::wait_until_free() +{ + mysql_mutex_lock(&sleep_lock); + killed= 1; + while (users) + mysql_cond_wait(&sleep_cond, &sleep_lock); + mysql_mutex_unlock(&sleep_lock); +} + +/** + Delete master_info +*/ + Master_info::~Master_info() { + wait_until_free(); #ifdef WITH_WSREP /* Do not free "wsrep" rpl_filter. It will eventually be freed by @@ -106,6 +129,7 @@ Master_info::~Master_info() mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&sleep_lock); + mysql_mutex_destroy(&start_stop_lock); mysql_cond_destroy(&data_cond); mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); @@ -841,12 +865,28 @@ uchar *get_key_master_info(Master_info *mi, size_t *length, return (uchar*) mi->cmp_connection_name.str; } +/* + Delete a master info + + Called from my_hash_delete(&master_info_hash) + Stops associated slave threads and frees master_info +*/ + void free_key_master_info(Master_info *mi) { DBUG_ENTER("free_key_master_info"); + mysql_mutex_unlock(&LOCK_active_mi); + + /* Ensure that we are not in reset_slave while this is done */ + mi->lock_slave_threads(); terminate_slave_threads(mi,SLAVE_FORCE_ALL); + /* We use 2 here instead of 1 just to make it easier when debugging */ + mi->killed= 2; end_master_info(mi); + mi->unlock_slave_threads(); delete mi; + + mysql_mutex_lock(&LOCK_active_mi); DBUG_VOID_RETURN; } @@ -1002,9 +1042,28 @@ Master_info_index::Master_info_index() index_file.file= -1; } + +/** + Free all connection threads + + This is done during early stages of shutdown + to give connection threads and slave threads time + to die before ~Master_info_index is called +*/ + +void Master_info_index::free_connections() +{ + mysql_mutex_assert_owner(&LOCK_active_mi); + my_hash_reset(&master_info_hash); +} + + +/** + Free all connection threads and free structures +*/ + Master_info_index::~Master_info_index() { - /* This will close connection for all objects in the cache */ my_hash_free(&master_info_hash); end_io_cache(&index_file); if (index_file.file >= 0) @@ -1025,9 +1084,9 @@ bool Master_info_index::init_all_master_info() int err_num= 0, succ_num= 0; // The number of success read Master_info char sign[MAX_CONNECTION_NAME+1]; File index_file_nr; + THD *thd; DBUG_ENTER("init_all_master_info"); - mysql_mutex_assert_owner(&LOCK_active_mi); DBUG_ASSERT(master_info_index); if ((index_file_nr= my_open(index_file_name, @@ -1057,6 +1116,10 @@ bool Master_info_index::init_all_master_info() DBUG_RETURN(1); } + thd= new THD(next_thread_id()); /* Needed by start_slave_threads */ + thd->thread_stack= (char*) &thd; + thd->store_globals(); + reinit_io_cache(&index_file, READ_CACHE, 0L,0,0); while (!init_strvar_from_file(sign, sizeof(sign), &index_file, NULL)) @@ -1072,10 +1135,9 @@ bool Master_info_index::init_all_master_info() mi->error()) { delete mi; - DBUG_RETURN(1); + goto error; } - lock_slave_threads(mi); init_thread_mask(&thread_mask,mi,0 /*not inverse*/); create_logfile_name_with_suffix(buf_master_info_file, @@ -1090,6 +1152,7 @@ bool Master_info_index::init_all_master_info() sql_print_information("Reading Master_info: '%s' Relay_info:'%s'", buf_master_info_file, buf_relay_log_info_file); + mi->lock_slave_threads(); if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, 0, thread_mask)) { @@ -1101,17 +1164,18 @@ bool Master_info_index::init_all_master_info() { /* Master_info is not in HASH; Add it */ if (master_info_index->add_master_info(mi, FALSE)) - DBUG_RETURN(1); + goto error; succ_num++; - unlock_slave_threads(mi); + mi->unlock_slave_threads(); } else { /* Master_info already in HASH */ sql_print_error(ER_THD_OR_DEFAULT(current_thd, ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str, (int) connection_name.length, connection_name.str); - unlock_slave_threads(mi); + mi->unlock_slave_threads(); delete mi; } continue; @@ -1128,23 +1192,23 @@ bool Master_info_index::init_all_master_info() /* Master_info was already registered */ sql_print_error(ER_THD_OR_DEFAULT(current_thd, ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str, (int) connection_name.length, connection_name.str); - unlock_slave_threads(mi); + mi->unlock_slave_threads(); delete mi; continue; } /* Master_info was not registered; add it */ if (master_info_index->add_master_info(mi, FALSE)) - DBUG_RETURN(1); + goto error; succ_num++; - unlock_slave_threads(mi); if (!opt_skip_slave_start) { if (start_slave_threads(current_thd, 1 /* need mutex */, - 0 /* no wait for start*/, + 1 /* wait for start*/, mi, buf_master_info_file, buf_relay_log_info_file, @@ -1160,8 +1224,11 @@ bool Master_info_index::init_all_master_info() (int) connection_name.length, connection_name.str); } + mi->unlock_slave_threads(); } } + thd->reset_globals(); + delete thd; if (!err_num) // No Error on read Master_info { @@ -1169,16 +1236,19 @@ bool Master_info_index::init_all_master_info() sql_print_information("Reading of all Master_info entries succeded"); DBUG_RETURN(0); } - else if (succ_num) // Have some Error and some Success + if (succ_num) // Have some Error and some Success { sql_print_warning("Reading of some Master_info entries failed"); DBUG_RETURN(1); } - else // All failed - { - sql_print_error("Reading of all Master_info entries failed!"); - DBUG_RETURN(1); - } + + sql_print_error("Reading of all Master_info entries failed!"); + DBUG_RETURN(1); + +error: + thd->reset_globals(); + delete thd; + DBUG_RETURN(1); } @@ -1211,6 +1281,71 @@ bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name, /** + Get Master_info for a connection and lock the object from deletion + + @param + connection_name Connection name + warning WARN_LEVEL_NOTE -> Don't print anything + WARN_LEVEL_WARN -> Issue warning if not exists + WARN_LEVEL_ERROR-> Issue error if not exists +*/ + +Master_info *get_master_info(const LEX_STRING *connection_name, + Sql_condition::enum_warning_level warning) +{ + Master_info *mi; + DBUG_ENTER("get_master_info"); + + /* Protect against inserts into hash */ + mysql_mutex_lock(&LOCK_active_mi); + /* + The following can only be true during shutdown when slave has been killed + but some other threads are still trying to access slave statistics. + */ + if (unlikely(!master_info_index)) + { + if (warning != Sql_condition::WARN_LEVEL_NOTE) + my_error(WARN_NO_MASTER_INFO, + MYF(warning == Sql_condition::WARN_LEVEL_WARN ? + ME_JUST_WARNING : 0), + (int) connection_name->length, connection_name->str); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(0); + } + if ((mi= master_info_index->get_master_info(connection_name, warning))) + { + /* + We have to use sleep_lock here. If we would use LOCK_active_mi + then we would take locks in wrong order in Master_info::release() + */ + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; + DBUG_PRINT("info",("users: %d", mi->users)); + mysql_mutex_unlock(&mi->sleep_lock); + } + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(mi); +} + + +/** + Release master info. + Signals ~Master_info that it's now safe to delete it +*/ + +void Master_info::release() +{ + mysql_mutex_lock(&sleep_lock); + if (!--users && killed) + { + /* Signal ~Master_info that it's ok to now free it */ + mysql_cond_signal(&sleep_cond); + } + mysql_mutex_unlock(&sleep_lock); +} + + +/** Get Master_info for a connection @param @@ -1232,8 +1367,6 @@ Master_info_index::get_master_info(const LEX_STRING *connection_name, ("connection_name: '%.*s'", (int) connection_name->length, connection_name->str)); - mysql_mutex_assert_owner(&LOCK_active_mi); - /* Make name lower case for comparison */ res= strmake(buff, connection_name->str, connection_name->length); my_casedn_str(system_charset_info, buff); @@ -1299,7 +1432,12 @@ bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg, /* Add a Master_info class to Hash Table */ bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) { - if (!my_hash_insert(&master_info_hash, (uchar*) mi)) + /* + We have to protect against shutdown to ensure we are not calling + my_hash_insert() while my_hash_free() is in progress + */ + if (unlikely(shutdown_in_progress) || + !my_hash_insert(&master_info_hash, (uchar*) mi)) { if (global_system_variables.log_warnings > 1) sql_print_information("Added new Master_info '%.*s' to hash table", @@ -1325,105 +1463,131 @@ bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) atomic */ -bool Master_info_index::remove_master_info(LEX_STRING *name) +bool Master_info_index::remove_master_info(Master_info *mi) { - Master_info* mi; DBUG_ENTER("remove_master_info"); + mysql_mutex_assert_owner(&LOCK_active_mi); - if ((mi= get_master_info(name, Sql_condition::WARN_LEVEL_WARN))) + // Delete Master_info and rewrite others to file + if (!my_hash_delete(&master_info_hash, (uchar*) mi)) { - // Delete Master_info and rewrite others to file - if (!my_hash_delete(&master_info_hash, (uchar*) mi)) + File index_file_nr; + + // Close IO_CACHE and FILE handler fisrt + end_io_cache(&index_file); + my_close(index_file.file, MYF(MY_WME)); + + // Reopen File and truncate it + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_TRUNC | O_BINARY , + MYF(MY_WME))) < 0 || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, WRITE_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) { - File index_file_nr; - - // Close IO_CACHE and FILE handler fisrt - end_io_cache(&index_file); - my_close(index_file.file, MYF(MY_WME)); - - // Reopen File and truncate it - if ((index_file_nr= my_open(index_file_name, - O_RDWR | O_CREAT | O_TRUNC | O_BINARY , - MYF(MY_WME))) < 0 || - init_io_cache(&index_file, index_file_nr, - IO_SIZE, WRITE_CACHE, - my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), - 0, MYF(MY_WME | MY_WAIT_IF_FULL))) - { - int error= my_errno; - if (index_file_nr >= 0) - my_close(index_file_nr,MYF(0)); - - sql_print_error("Create of Master Info Index file '%s' failed with " - "error: %M", - index_file_name, error); - DBUG_RETURN(TRUE); - } + int error= my_errno; + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); - // Rewrite Master_info.index - for (uint i= 0; i< master_info_hash.records; ++i) - { - Master_info *tmp_mi; - tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); - write_master_name_to_index_file(&tmp_mi->connection_name, 0); - } - my_sync(index_file_nr, MYF(MY_WME)); + sql_print_error("Create of Master Info Index file '%s' failed with " + "error: %M", + index_file_name, error); + DBUG_RETURN(TRUE); + } + + // Rewrite Master_info.index + for (uint i= 0; i< master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + write_master_name_to_index_file(&tmp_mi->connection_name, 0); } + if (my_sync(index_file_nr, MYF(MY_WME))) + DBUG_RETURN(TRUE); } DBUG_RETURN(FALSE); } /** - Master_info_index::give_error_if_slave_running() + give_error_if_slave_running() + + @param + already_locked 0 if we need to lock, 1 if we have LOCK_active_mi_locked @return TRUE If some slave is running. An error is printed FALSE No slave is running */ -bool Master_info_index::give_error_if_slave_running() +bool give_error_if_slave_running(bool already_locked) { + bool ret= 0; DBUG_ENTER("give_error_if_slave_running"); - mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + if (!already_locked) + mysql_mutex_lock(&LOCK_active_mi); + if (!master_info_index) { - Master_info *mi; - mi= (Master_info *) my_hash_element(&master_info_hash, i); - if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + my_error(ER_SERVER_SHUTDOWN, MYF(0)); + ret= 1; + } + else + { + HASH *hash= &master_info_index->master_info_hash; + for (uint i= 0; i< hash->records; ++i) { - my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, - mi->connection_name.str); - DBUG_RETURN(TRUE); + Master_info *mi; + mi= (Master_info *) my_hash_element(hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + ret= 1; + break; + } } } - DBUG_RETURN(FALSE); + if (!already_locked) + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(ret); } /** - Master_info_index::any_slave_sql_running() - - The LOCK_active_mi must be held while calling this function. + any_slave_sql_running() @return 0 No Slave SQL thread is running # Number of slave SQL thread running + + Note that during shutdown we return 1. This is needed to ensure we + don't try to resize thread pool during shutdown as during shutdown + master_info_hash may be freeing the hash and during that time + hash entries can't be accessed. */ -uint Master_info_index::any_slave_sql_running() +uint any_slave_sql_running() { uint count= 0; + HASH *hash; DBUG_ENTER("any_slave_sql_running"); - mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + mysql_mutex_lock(&LOCK_active_mi); + if (unlikely(shutdown_in_progress || !master_info_index)) { - Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(1); + } + hash= &master_info_index->master_info_hash; + for (uint i= 0; i< hash->records; ++i) + { + Master_info *mi= (Master_info *)my_hash_element(hash, i); if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) count++; } + mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(count); } @@ -1436,15 +1600,25 @@ uint Master_info_index::any_slave_sql_running() @return TRUE Error FALSE Everything ok. + + This code is written so that we don't keep LOCK_active_mi active + while we are starting a slave. */ bool Master_info_index::start_all_slaves(THD *thd) { bool result= FALSE; - DBUG_ENTER("warn_if_slave_running"); + DBUG_ENTER("start_all_slaves"); mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + for (uint i= 0; i< master_info_hash.records; i++) + { + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + mi->in_start_all_slaves= 0; + } + + for (uint i= 0; i< master_info_hash.records; ) { int error; Master_info *mi; @@ -1454,25 +1628,40 @@ bool Master_info_index::start_all_slaves(THD *thd) Try to start all slaves that are configured (host is defined) and are not already running */ - if ((mi->slave_running == MYSQL_SLAVE_NOT_RUN || - !mi->rli.slave_running) && *mi->host) + if (!((mi->slave_running == MYSQL_SLAVE_NOT_RUN || + !mi->rli.slave_running) && *mi->host) || + mi->in_start_all_slaves) { - if ((error= start_slave(thd, mi, 1))) - { - my_error(ER_CANT_START_STOP_SLAVE, MYF(0), - "START", - (int) mi->connection_name.length, - mi->connection_name.str); - result= 1; - if (error < 0) // fatal error - break; - } - else if (thd) - push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, - ER_SLAVE_STARTED, ER_THD(thd, ER_SLAVE_STARTED), - (int) mi->connection_name.length, - mi->connection_name.str); + i++; + continue; + } + mi->in_start_all_slaves= 1; + + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; // Mark used + mysql_mutex_unlock(&mi->sleep_lock); + mysql_mutex_unlock(&LOCK_active_mi); + error= start_slave(thd, mi, 1); + mi->release(); + mysql_mutex_lock(&LOCK_active_mi); + if (error) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "START", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // fatal error + break; } + else if (thd) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, + ER_SLAVE_STARTED, ER_THD(thd, ER_SLAVE_STARTED), + (int) mi->connection_name.length, + mi->connection_name.str); + /* Restart from first element as master_info_hash may have changed */ + i= 0; + continue; } DBUG_RETURN(result); } @@ -1488,39 +1677,64 @@ bool Master_info_index::start_all_slaves(THD *thd) @return TRUE Error FALSE Everything ok. + + This code is written so that we don't keep LOCK_active_mi active + while we are stopping a slave. */ bool Master_info_index::stop_all_slaves(THD *thd) { bool result= FALSE; - DBUG_ENTER("warn_if_slave_running"); + DBUG_ENTER("stop_all_slaves"); mysql_mutex_assert_owner(&LOCK_active_mi); DBUG_ASSERT(thd); - for (uint i= 0; i< master_info_hash.records; ++i) + for (uint i= 0; i< master_info_hash.records; i++) + { + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + mi->in_stop_all_slaves= 0; + } + + for (uint i= 0; i< master_info_hash.records ;) { int error; Master_info *mi; mi= (Master_info *) my_hash_element(&master_info_hash, i); - if ((mi->slave_running != MYSQL_SLAVE_NOT_RUN || - mi->rli.slave_running)) + if (!(mi->slave_running != MYSQL_SLAVE_NOT_RUN || + mi->rli.slave_running) || + mi->in_stop_all_slaves) { - if ((error= stop_slave(thd, mi, 1))) - { - my_error(ER_CANT_START_STOP_SLAVE, MYF(0), - "STOP", - (int) mi->connection_name.length, - mi->connection_name.str); - result= 1; - if (error < 0) // Fatal error - break; - } - else - push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, - ER_SLAVE_STOPPED, ER_THD(thd, ER_SLAVE_STOPPED), - (int) mi->connection_name.length, - mi->connection_name.str); + i++; + continue; } + mi->in_stop_all_slaves= 1; // Protection for loops + + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; // Mark used + mysql_mutex_unlock(&mi->sleep_lock); + mysql_mutex_unlock(&LOCK_active_mi); + error= stop_slave(thd, mi, 1); + mi->release(); + mysql_mutex_lock(&LOCK_active_mi); + if (error) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "STOP", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // Fatal error + break; + } + else + push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, + ER_SLAVE_STOPPED, ER_THD(thd, ER_SLAVE_STOPPED), + (int) mi->connection_name.length, + mi->connection_name.str); + /* Restart from first element as master_info_hash may have changed */ + i= 0; + continue; } DBUG_RETURN(result); } |