diff options
author | Monty <monty@mariadb.org> | 2017-01-29 22:10:56 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2017-02-28 16:10:46 +0100 |
commit | e65f667bb60244610512efd7491fc77eccceb9db (patch) | |
tree | ff549849324d917615ab896afb7e25ee0bfc7396 | |
parent | d5c54f3990b49d7c7d6a410016e85e8e58803895 (diff) | |
download | mariadb-git-e65f667bb60244610512efd7491fc77eccceb9db.tar.gz |
MDEV-9573 'Stop slave' hangs on replication slave
The reason for this is that stop slave takes LOCK_active_mi over the
whole operation while some slave operations will also need LOCK_active_mi
which causes deadlocks.
Fixed by introducing object counting for Master_info and not taking
LOCK_active_mi over stop slave or even stop_all_slaves()
Another benefit of this approach is that it allows:
- Multiple threads can run SHOW SLAVE STATUS at the same time
- START/STOP/RESET/SLAVE STATUS on a slave will not block other slaves
- Simpler interface for handling get_master_info()
- Added some missing unlock of 'log_lock' in error condtions
- Moved rpl_parallel_inactivate_pool(&global_rpl_thread_pool) to end
of stop_slave() to not have to use LOCK_active_mi inside
terminate_slave_threads()
- Changed argument for remove_master_info() to Master_info, as we always
have this available
- Fixed core dump when doing FLUSH TABLES WITH READ LOCK and parallel
replication. Problem was that waiting for pause_for_ftwrl was not done
when deleting rpt->current_owner after a force_abort.
-rw-r--r-- | mysys/mf_iocache.c | 5 | ||||
-rw-r--r-- | sql/item_func.cc | 8 | ||||
-rw-r--r-- | sql/mysqld.cc | 66 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 375 | ||||
-rw-r--r-- | sql/rpl_mi.h | 16 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 62 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 | ||||
-rw-r--r-- | sql/slave.cc | 79 | ||||
-rw-r--r-- | sql/slave.h | 1 | ||||
-rw-r--r-- | sql/sql_parse.cc | 66 | ||||
-rw-r--r-- | sql/sql_reload.cc | 56 | ||||
-rw-r--r-- | sql/sql_repl.cc | 19 | ||||
-rw-r--r-- | sql/sys_vars.cc | 130 |
13 files changed, 539 insertions, 345 deletions
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index f891a22b17d..8687c2e0c48 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1815,6 +1815,7 @@ int my_b_flush_io_cache(IO_CACHE *info, It's currently safe to call this if one has called init_io_cache() on the 'info' object, even if init_io_cache() failed. This function is also safe to call twice with the same handle. + Note that info->file is not reset as the caller may still use ut for my_close() RETURN 0 ok @@ -1850,10 +1851,12 @@ int end_io_cache(IO_CACHE *info) if (info->type == SEQ_READ_APPEND) { /* Destroy allocated mutex */ - info->type= TYPE_NOT_SET; mysql_mutex_destroy(&info->append_buffer_lock); } info->share= 0; + info->type= TYPE_NOT_SET; /* Ensure that flush_io_cache() does nothing */ + info->write_end= 0; /* Ensure that my_b_write() fails */ + info->write_function= 0; /* my_b_write will crash if used */ DBUG_RETURN(error); } /* end_io_cache */ diff --git a/sql/item_func.cc b/sql/item_func.cc index 3149ebb855d..2c43181e8bb 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -3958,12 +3958,7 @@ longlong Item_master_pos_wait::val_int() else connection_name= thd->variables.default_master_connection; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) // master_info_index is set to NULL on shutdown. - mi= master_info_index->get_master_info(&connection_name, - Sql_condition::WARN_LEVEL_WARN); - mysql_mutex_unlock(&LOCK_active_mi); - if (!mi) + if (!(mi= get_master_info(&connection_name, Sql_condition::WARN_LEVEL_WARN))) goto err; if ((event_count = mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2) @@ -3971,6 +3966,7 @@ longlong Item_master_pos_wait::val_int() null_value = 1; event_count=0; } + mi->release(); #endif return event_count; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index a6b6cdfae9b..ca45e9cdcdb 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -702,12 +702,15 @@ mysql_mutex_t LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_global_system_variables, - LOCK_user_conn, LOCK_slave_list, LOCK_active_mi, + LOCK_user_conn, LOCK_slave_list, LOCK_connection_count, LOCK_error_messages, LOCK_slave_init; mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; +/* This protects against changes in master_info_index */ +mysql_mutex_t LOCK_active_mi; + /** The below lock protects access to two global server variables: max_prepared_stmt_count and prepared_stmt_count. These variables @@ -1651,7 +1654,7 @@ static void close_connections(void) mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list Events::deinit(); - end_slave(); + slave_prepare_for_shutdown(); /* Give threads time to die. @@ -1700,6 +1703,7 @@ static void close_connections(void) DBUG_PRINT("quit",("Unlocking LOCK_thread_count")); mysql_mutex_unlock(&LOCK_thread_count); } + end_slave(); /* All threads has now been aborted */ DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); mysql_mutex_lock(&LOCK_thread_count); @@ -7214,17 +7218,14 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff) var->type= SHOW_MY_BOOL; var->value= buff; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) + + if ((mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_NOTE))) { - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_NOTE); - if (mi) - tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_READING && - mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN); + tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_READING && + mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); if (mi) *((my_bool *)buff)= tmp; else @@ -7256,38 +7257,26 @@ static int show_slaves_running(THD *thd, SHOW_VAR *var, char *buff) { var->type= SHOW_LONGLONG; var->value= buff; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) - *((longlong *)buff)= master_info_index->any_slave_sql_running(); - else - *((longlong *)buff)= 0; + *((longlong *)buff)= any_slave_sql_running(); - mysql_mutex_unlock(&LOCK_active_mi); return 0; } static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) { - Master_info *mi= NULL; - longlong tmp; - LINT_INIT(tmp); + Master_info *mi; var->type= SHOW_LONGLONG; var->value= buff; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) + + if ((mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_NOTE))) { - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_NOTE); - if (mi) - tmp= mi->received_heartbeats; + *((longlong *)buff)= mi->received_heartbeats; + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); - if (mi) - *((longlong *)buff)= tmp; else var->type= SHOW_UNDEF; return 0; @@ -7297,23 +7286,16 @@ static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff) { Master_info *mi= NULL; - float tmp; - LINT_INIT(tmp); var->type= SHOW_CHAR; var->value= buff; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) + + if ((mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_NOTE))) { - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_NOTE); - if (mi) - tmp= mi->heartbeat_period; + sprintf(buff, "%.3f", mi->heartbeat_period); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); - if (mi) - sprintf(buff, "%.3f", tmp); else var->type= SHOW_UNDEF; return 0; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 249bf7608e5..7aa84e72bef 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -40,7 +40,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, sync_counter(0), heartbeat_period(0), received_heartbeats(0), master_id(0), prev_master_id(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), + in_start_all_slaves(0), in_stop_all_slaves(0), + users(0), killed(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -87,8 +89,27 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL); } + +/** + Wait until no one is using Master_info +*/ + +void Master_info::wait_until_free() +{ + mysql_mutex_lock(&sleep_lock); + killed= 1; + while (users) + mysql_cond_wait(&sleep_cond, &sleep_lock); + mysql_mutex_unlock(&sleep_lock); +} + +/** + Delete master_info +*/ + Master_info::~Master_info() { + wait_until_free(); rpl_filters.delete_element(connection_name.str, connection_name.length, (void (*)(const char*, uchar*)) free_rpl_filter); my_free(connection_name.str); @@ -709,8 +730,13 @@ uchar *get_key_master_info(Master_info *mi, size_t *length, void free_key_master_info(Master_info *mi) { DBUG_ENTER("free_key_master_info"); + /* Ensure that we are not in reset_slave while this is done */ + lock_slave_threads(mi); terminate_slave_threads(mi,SLAVE_FORCE_ALL); + /* We use 2 here instead of 1 just to make it easier when debugging */ + mi->killed= 2; end_master_info(mi); + unlock_slave_threads(mi); delete mi; DBUG_VOID_RETURN; } @@ -867,9 +893,27 @@ Master_info_index::Master_info_index() index_file.file= -1; } + +/** + Free all connection threads + + This is done during early stages of shutdown + to give connection threads and slave threads time + to die before ~Master_info_index is called +*/ + +void Master_info_index::free_connections() +{ + my_hash_reset(&master_info_hash); +} + + +/** + Free all connection threads and free structures +*/ + Master_info_index::~Master_info_index() { - /* This will close connection for all objects in the cache */ my_hash_free(&master_info_hash); end_io_cache(&index_file); if (index_file.file >= 0) @@ -1001,7 +1045,6 @@ bool Master_info_index::init_all_master_info() if (master_info_index->add_master_info(mi, FALSE)) DBUG_RETURN(1); succ_num++; - unlock_slave_threads(mi); if (!opt_skip_slave_start) { @@ -1022,6 +1065,7 @@ bool Master_info_index::init_all_master_info() (int) connection_name.length, connection_name.str); } + unlock_slave_threads(mi); } } @@ -1073,6 +1117,71 @@ bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name, /** + Get Master_info for a connection and lock the object from deletion + + @param + connection_name Connection name + warning WARN_LEVEL_NOTE -> Don't print anything + WARN_LEVEL_WARN -> Issue warning if not exists + WARN_LEVEL_ERROR-> Issue error if not exists +*/ + +Master_info *get_master_info(LEX_STRING *connection_name, + Sql_condition::enum_warning_level warning) +{ + Master_info *mi; + DBUG_ENTER("get_master_info"); + + /* Protect against inserts into hash */ + mysql_mutex_lock(&LOCK_active_mi); + /* + The following can only be true during shutdown when slave has been killed + but some other threads are still trying to access slave statistics. + */ + if (unlikely(!master_info_index)) + { + if (warning != Sql_condition::WARN_LEVEL_NOTE) + my_error(WARN_NO_MASTER_INFO, + MYF(warning == Sql_condition::WARN_LEVEL_WARN ? + ME_JUST_WARNING : 0), + (int) connection_name->length, connection_name->str); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(0); + } + if ((mi= master_info_index->get_master_info(connection_name, warning))) + { + /* + We have to use sleep_lock here. If we would use LOCK_active_mi + then we would take locks in wrong order in Master_info::release() + */ + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; + DBUG_PRINT("info",("users: %d", mi->users)); + mysql_mutex_unlock(&mi->sleep_lock); + } + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(mi); +} + + +/** + Release master info. + Signals ~Master_info that it's now safe to delete it +*/ + +void Master_info::release() +{ + mysql_mutex_lock(&sleep_lock); + if (!--users && killed) + { + /* Signal ~Master_info that it's ok to now free it */ + mysql_cond_signal(&sleep_cond); + } + mysql_mutex_unlock(&sleep_lock); +} + + +/** Get Master_info for a connection @param @@ -1094,8 +1203,6 @@ Master_info_index::get_master_info(LEX_STRING *connection_name, ("connection_name: '%.*s'", (int) connection_name->length, connection_name->str)); - mysql_mutex_assert_owner(&LOCK_active_mi); - /* Make name lower case for comparison */ res= strmake(buff, connection_name->str, connection_name->length); my_casedn_str(system_charset_info, buff); @@ -1187,105 +1294,123 @@ bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) atomic */ -bool Master_info_index::remove_master_info(LEX_STRING *name) +bool Master_info_index::remove_master_info(Master_info *mi) { - Master_info* mi; DBUG_ENTER("remove_master_info"); + mysql_mutex_assert_owner(&LOCK_active_mi); - if ((mi= get_master_info(name, Sql_condition::WARN_LEVEL_WARN))) + // Delete Master_info and rewrite others to file + if (!my_hash_delete(&master_info_hash, (uchar*) mi)) { - // Delete Master_info and rewrite others to file - if (!my_hash_delete(&master_info_hash, (uchar*) mi)) + File index_file_nr; + + // Close IO_CACHE and FILE handler fisrt + end_io_cache(&index_file); + my_close(index_file.file, MYF(MY_WME)); + + // Reopen File and truncate it + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_TRUNC | O_BINARY , + MYF(MY_WME))) < 0 || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, WRITE_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) { - File index_file_nr; - - // Close IO_CACHE and FILE handler fisrt - end_io_cache(&index_file); - my_close(index_file.file, MYF(MY_WME)); - - // Reopen File and truncate it - if ((index_file_nr= my_open(index_file_name, - O_RDWR | O_CREAT | O_TRUNC | O_BINARY , - MYF(MY_WME))) < 0 || - init_io_cache(&index_file, index_file_nr, - IO_SIZE, WRITE_CACHE, - my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), - 0, MYF(MY_WME | MY_WAIT_IF_FULL))) - { - int error= my_errno; - if (index_file_nr >= 0) - my_close(index_file_nr,MYF(0)); - - sql_print_error("Create of Master Info Index file '%s' failed with " - "error: %M", - index_file_name, error); - DBUG_RETURN(TRUE); - } + int error= my_errno; + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); - // Rewrite Master_info.index - for (uint i= 0; i< master_info_hash.records; ++i) - { - Master_info *tmp_mi; - tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); - write_master_name_to_index_file(&tmp_mi->connection_name, 0); - } - my_sync(index_file_nr, MYF(MY_WME)); + sql_print_error("Create of Master Info Index file '%s' failed with " + "error: %M", + index_file_name, error); + DBUG_RETURN(TRUE); + } + + // Rewrite Master_info.index + for (uint i= 0; i< master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + write_master_name_to_index_file(&tmp_mi->connection_name, 0); } + if (my_sync(index_file_nr, MYF(MY_WME))) + DBUG_RETURN(TRUE); } DBUG_RETURN(FALSE); } /** - Master_info_index::give_error_if_slave_running() + give_error_if_slave_running() + + @param + already_locked 0 if we need to lock, 1 if we have LOCK_active_mi_locked @return TRUE If some slave is running. An error is printed FALSE No slave is running */ -bool Master_info_index::give_error_if_slave_running() +bool give_error_if_slave_running(bool already_locked) { + bool ret= 0; DBUG_ENTER("give_error_if_slave_running"); - mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + if (!already_locked) + mysql_mutex_lock(&LOCK_active_mi); + if (!master_info_index) { - Master_info *mi; - mi= (Master_info *) my_hash_element(&master_info_hash, i); - if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + my_error(ER_SERVER_SHUTDOWN, MYF(0)); + ret= 1; + } + else + { + HASH *hash= &master_info_index->master_info_hash; + for (uint i= 0; i< hash->records; ++i) { - my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, - mi->connection_name.str); - DBUG_RETURN(TRUE); + Master_info *mi; + mi= (Master_info *) my_hash_element(hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + ret= 1; + break; + } } } - DBUG_RETURN(FALSE); + if (!already_locked) + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(ret); } /** - Master_info_index::any_slave_sql_running() - - The LOCK_active_mi must be held while calling this function. + any_slave_sql_running() @return 0 No Slave SQL thread is running # Number of slave SQL thread running */ -uint Master_info_index::any_slave_sql_running() +uint any_slave_sql_running() { uint count= 0; DBUG_ENTER("any_slave_sql_running"); - mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + mysql_mutex_lock(&LOCK_active_mi); + if (likely(master_info_index)) // Not shutdown { - Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i); - if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) - count++; + HASH *hash= &master_info_index->master_info_hash; + for (uint i= 0; i< hash->records; ++i) + { + Master_info *mi= (Master_info *)my_hash_element(hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + count++; + } } + mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(count); } @@ -1298,15 +1423,25 @@ uint Master_info_index::any_slave_sql_running() @return TRUE Error FALSE Everything ok. + + This code is written so that we don't keep LOCK_active_mi active + while we are starting a slave. */ bool Master_info_index::start_all_slaves(THD *thd) { bool result= FALSE; - DBUG_ENTER("warn_if_slave_running"); + DBUG_ENTER("start_all_slaves"); mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + for (uint i= 0; i< master_info_hash.records; i++) + { + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + mi->in_start_all_slaves= 0; + } + + for (uint i= 0; i< master_info_hash.records; ) { int error; Master_info *mi; @@ -1316,25 +1451,40 @@ bool Master_info_index::start_all_slaves(THD *thd) Try to start all slaves that are configured (host is defined) and are not already running */ - if ((mi->slave_running == MYSQL_SLAVE_NOT_RUN || - !mi->rli.slave_running) && *mi->host) + if (!((mi->slave_running == MYSQL_SLAVE_NOT_RUN || + !mi->rli.slave_running) && *mi->host) || + mi->in_start_all_slaves) { - if ((error= start_slave(thd, mi, 1))) - { - my_error(ER_CANT_START_STOP_SLAVE, MYF(0), - "START", - (int) mi->connection_name.length, - mi->connection_name.str); - result= 1; - if (error < 0) // fatal error - break; - } - else - push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, - ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED), - (int) mi->connection_name.length, - mi->connection_name.str); + i++; + continue; + } + mi->in_start_all_slaves= 1; + + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; // Mark used + mysql_mutex_unlock(&mi->sleep_lock); + mysql_mutex_unlock(&LOCK_active_mi); + error= start_slave(thd, mi, 1); + mi->release(); + mysql_mutex_lock(&LOCK_active_mi); + if (error) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "START", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // fatal error + break; } + else + push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, + ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED), + (int) mi->connection_name.length, + mi->connection_name.str); + /* Restart from first element as master_info_hash may have changed */ + i= 0; + continue; } DBUG_RETURN(result); } @@ -1348,38 +1498,63 @@ bool Master_info_index::start_all_slaves(THD *thd) @return TRUE Error FALSE Everything ok. + + This code is written so that we don't keep LOCK_active_mi active + while we are stopping a slave. */ bool Master_info_index::stop_all_slaves(THD *thd) { bool result= FALSE; - DBUG_ENTER("warn_if_slave_running"); + DBUG_ENTER("stop_all_slaves"); mysql_mutex_assert_owner(&LOCK_active_mi); - for (uint i= 0; i< master_info_hash.records; ++i) + for (uint i= 0; i< master_info_hash.records; i++) + { + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + mi->in_stop_all_slaves= 0; + } + + for (uint i= 0; i< master_info_hash.records ;) { int error; Master_info *mi; mi= (Master_info *) my_hash_element(&master_info_hash, i); - if ((mi->slave_running != MYSQL_SLAVE_NOT_RUN || - mi->rli.slave_running)) + if (!(mi->slave_running != MYSQL_SLAVE_NOT_RUN || + mi->rli.slave_running) || + mi->in_stop_all_slaves) { - if ((error= stop_slave(thd, mi, 1))) - { - my_error(ER_CANT_START_STOP_SLAVE, MYF(0), - "STOP", - (int) mi->connection_name.length, - mi->connection_name.str); - result= 1; - if (error < 0) // Fatal error - break; - } - else - push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, - ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED), - (int) mi->connection_name.length, - mi->connection_name.str); + i++; + continue; } + mi->in_stop_all_slaves= 1; // Protection for loops + + mysql_mutex_lock(&mi->sleep_lock); + mi->users++; // Mark used + mysql_mutex_unlock(&mi->sleep_lock); + mysql_mutex_unlock(&LOCK_active_mi); + error= stop_slave(thd, mi, 1); + mi->release(); + mysql_mutex_lock(&LOCK_active_mi); + if (error) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "STOP", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // Fatal error + break; + } + else + push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE, + ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED), + (int) mi->connection_name.length, + mi->connection_name.str); + /* Restart from first element as master_info_hash may have changed */ + i= 0; + continue; } DBUG_RETURN(result); } diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index e58df0150f5..32591c56caa 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -79,6 +79,8 @@ class Master_info : public Slave_reporting_capability { return opt_slave_parallel_threads > 0; } + void release(); + void wait_until_free(); /* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ @@ -182,7 +184,11 @@ class Master_info : public Slave_reporting_capability uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + bool in_start_all_slaves, in_stop_all_slaves; + uint users; /* Active user for object */ + uint killed; }; + int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, bool abort_if_no_master_info_file, @@ -218,13 +224,12 @@ public: bool check_duplicate_master_info(LEX_STRING *connection_name, const char *host, uint port); bool add_master_info(Master_info *mi, bool write_to_file); - bool remove_master_info(LEX_STRING *connection_name); + bool remove_master_info(Master_info *mi); Master_info *get_master_info(LEX_STRING *connection_name, Sql_condition::enum_warning_level warning); - bool give_error_if_slave_running(); - uint any_slave_sql_running(); bool start_all_slaves(THD *thd); bool stop_all_slaves(THD *thd); + void free_connections(); }; @@ -237,6 +242,8 @@ public: }; +Master_info *get_master_info(LEX_STRING *connection_name, + Sql_condition::enum_warning_level warning); bool check_master_connection_name(LEX_STRING *name); void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, @@ -246,7 +253,8 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length, uchar *get_key_master_info(Master_info *mi, size_t *length, my_bool not_used __attribute__((unused))); void free_key_master_info(Master_info *mi); - +uint any_slave_sql_running(); +bool give_error_if_slave_running(bool already_lock); #endif /* HAVE_REPLICATION */ #endif /* RPL_MI_H */ diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 23f61a82a90..ff2c7d68467 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1312,6 +1312,29 @@ handle_rpl_parallel_thread(void *arg) } if (!in_event_group) { + /* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */ + while (rpt->current_entry && rpt->pause_for_ftwrl) + { + /* + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. + */ + rpl_parallel_entry *e= rpt->current_entry; + /* + Ensure that we will unblock rpl_pause_for_ftrwl() + e->pause_sub_id may be LONGLONG_MAX if rpt->current_entry has changed + */ + DBUG_ASSERT(e->pause_sub_id == (uint64)ULONGLONG_MAX || + e->last_committed_sub_id >= e->pause_sub_id); + mysql_mutex_lock(&e->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (rpt->pause_for_ftwrl) + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + } rpt->current_owner= NULL; /* Tell wait_for_done() that we are done, if it is waiting. */ if (likely(rpt->current_entry) && @@ -1369,6 +1392,28 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, if ((res= pool_mark_busy(pool, current_thd))) return res; + /* Protect against parallel pool resizes */ + if (pool->count == new_count) + { + pool_mark_not_busy(pool); + return 0; + } + + /* + If we are about to delete pool, do an extra check that there are no new + slave threads running since we marked pool busy + */ + if (!new_count) + { + if (any_slave_sql_running()) + { + DBUG_PRINT("warning", + ("SQL threads running while trying to reset parallel pool")); + pool_mark_not_busy(pool); + return 0; // Ok to not resize pool + } + } + /* Allocate the new list of threads up-front. That way, if we fail half-way, we only need to free whatever we managed @@ -1382,7 +1427,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + new_count*sizeof(*rpt_array)))); - goto err;; + goto err; } for (i= 0; i < new_count; ++i) @@ -1503,6 +1548,20 @@ err: return 1; } +/* + Deactivate the parallel replication thread pool, if there are now no more + SQL threads running. +*/ + +int rpl_parallel_resize_pool_if_no_slaves(void) +{ + /* master_info_index is set to NULL on shutdown */ + if (opt_slave_parallel_threads > 0 && !any_slave_sql_running() && + master_info_index) + return rpl_parallel_inactivate_pool(&global_rpl_thread_pool); + return 0; +} + int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) @@ -1814,6 +1873,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, { rpl_parallel_thread *rpt; + DBUG_ASSERT(count > 0); mysql_mutex_lock(&LOCK_rpl_thread_pool); while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index a02c1af3b3e..0d7cd4f2e9b 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -342,6 +342,7 @@ struct rpl_parallel { extern struct rpl_parallel_thread_pool global_rpl_thread_pool; +extern int rpl_parallel_resize_pool_if_no_slaves(void); extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); diff --git a/sql/slave.cc b/sql/slave.cc index c4817ef4794..a2b8439b4b4 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -614,6 +614,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) if (!mi->inited) DBUG_RETURN(0); /* successfully do nothing */ int error,force_all = (thread_mask & SLAVE_FORCE_ALL); + int retval= 0; mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock(); @@ -633,24 +634,19 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + retval= error; mysql_mutex_lock(log_lock); DBUG_PRINT("info",("Flushing relay-log info file.")); if (current_thd) THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file); - if (flush_relay_log_info(&mi->rli)) - DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); - - if (my_sync(mi->rli.info_fd, MYF(MY_WME))) - DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + if (flush_relay_log_info(&mi->rli) || + my_sync(mi->rli.info_fd, MYF(MY_WME))) + retval= ER_ERROR_DURING_FLUSH_LOGS; mysql_mutex_unlock(log_lock); } - if (opt_slave_parallel_threads > 0 && - master_info_index &&// master_info_index is set to NULL on server shutdown - !master_info_index->any_slave_sql_running()) - rpl_parallel_inactivate_pool(&global_rpl_thread_pool); if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating IO thread")); @@ -661,25 +657,26 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) skip_lock)) && !force_all) DBUG_RETURN(error); + if (!retval) + retval= error; mysql_mutex_lock(log_lock); DBUG_PRINT("info",("Flushing relay log and master info file.")); if (current_thd) THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository); - if (flush_master_info(mi, TRUE, FALSE)) - DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); - + if (likely(mi->fd >= 0)) + { + if (flush_master_info(mi, TRUE, FALSE) || my_sync(mi->fd, MYF(MY_WME))) + retval= ER_ERROR_DURING_FLUSH_LOGS; + } if (mi->rli.relay_log.is_open() && my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME))) - DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); - - if (my_sync(mi->fd, MYF(MY_WME))) - DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); + retval= ER_ERROR_DURING_FLUSH_LOGS; mysql_mutex_unlock(log_lock); } - DBUG_RETURN(0); + DBUG_RETURN(retval); } @@ -956,10 +953,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mi); if (!error && (thread_mask & SLAVE_SQL)) { - if (opt_slave_parallel_threads > 0) - error= rpl_parallel_activate_pool(&global_rpl_thread_pool); - if (!error) - error= start_slave_thread( + error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_sql, #endif @@ -975,10 +969,18 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, /* - Release slave threads at time of executing shutdown. + Kill slaves preparing for shutdown +*/ - SYNOPSIS - end_slave() +void slave_prepare_for_shutdown() +{ + mysql_mutex_lock(&LOCK_active_mi); + master_info_index->free_connections(); + mysql_mutex_unlock(&LOCK_active_mi); +} + +/* + Release slave threads at time of executing shutdown. */ void end_slave() @@ -996,7 +998,10 @@ void end_slave() startup parameter to the server was wrong. */ mysql_mutex_lock(&LOCK_active_mi); - /* This will call terminate_slave_threads() on all connections */ + /* + master_info_index should not have any threads anymore as they where + killed as part of slave_prepare_for_shutdown() + */ delete master_info_index; master_info_index= 0; active_mi= 0; @@ -2657,7 +2662,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, mysql_mutex_lock(&mi->data_lock); mysql_mutex_lock(&mi->rli.data_lock); + /* err_lock is to protect mi->last_error() */ mysql_mutex_lock(&mi->err_lock); + /* err_lock is to protect mi->rli.last_error() */ mysql_mutex_lock(&mi->rli.err_lock); protocol->store(mi->host, &my_charset_bin); protocol->store(mi->user, &my_charset_bin); @@ -4493,6 +4500,16 @@ pthread_handler_t handle_slave_sql(void *arg) rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; pthread_detach_this_thread(); + + if (opt_slave_parallel_threads > 0 && + rpl_parallel_activate_pool(&global_rpl_thread_pool)) + { + mysql_cond_broadcast(&rli->start_cond); + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, + "Failed during parallel slave pool activation"); + goto err_during_init; + } + if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) { /* @@ -4862,17 +4879,7 @@ err_during_init: DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5);); mysql_mutex_unlock(&rli->run_lock); // tell the world we are done - /* - Deactivate the parallel replication thread pool, if there are now no more - SQL threads running. Do this here, when we have released all locks, but - while our THD (and current_thd) is still valid. - */ - mysql_mutex_lock(&LOCK_active_mi); - if (opt_slave_parallel_threads > 0 && - master_info_index &&// master_info_index is set to NULL on server shutdown - !master_info_index->any_slave_sql_running()) - rpl_parallel_inactivate_pool(&global_rpl_thread_pool); - mysql_mutex_unlock(&LOCK_active_mi); + rpl_parallel_resize_pool_if_no_slaves(); mysql_mutex_lock(&LOCK_thread_count); delete thd; diff --git a/sql/slave.h b/sql/slave.h index c6b78b96aca..cfff79da0e4 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -213,6 +213,7 @@ bool rpl_master_erroneous_autoinc(THD* thd); const char *print_slave_db_safe(const char *db); void skip_load_data_infile(NET* net); +void slave_prepare_for_shutdown(); void end_slave(); /* release slave threads */ void close_active_mi(); /* clean up slave threads data */ void clear_until_condition(Relay_log_info* rli); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 03bad46ae0e..97d3ce24c25 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2207,7 +2207,7 @@ err: int mysql_execute_command(THD *thd) { - int res= FALSE; + int res= 0; int up_result= 0; LEX *lex= thd->lex; /* first SELECT_LEX (have special meaning for many of non-SELECTcommands) */ @@ -2702,10 +2702,17 @@ case SQLCOM_PREPARE: if (check_global_access(thd, SUPER_ACL)) goto error; + /* + In this code it's ok to use LOCK_active_mi as we are adding new things + into master_info_index + */ mysql_mutex_lock(&LOCK_active_mi); - if (!master_info_index) + { + mysql_mutex_unlock(&LOCK_active_mi); + my_error(ER_SERVER_SHUTDOWN, MYF(0)); goto error; + } mi= master_info_index->get_master_info(&lex_mi->connection_name, Sql_condition::WARN_LEVEL_NOTE); @@ -2734,7 +2741,7 @@ case SQLCOM_PREPARE: If new master was not added, we still need to free mi. */ if (master_info_added) - master_info_index->remove_master_info(&lex_mi->connection_name); + master_info_index->remove_master_info(mi); else delete mi; } @@ -2752,22 +2759,24 @@ case SQLCOM_PREPARE: /* Accept one of two privileges */ if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL)) goto error; - mysql_mutex_lock(&LOCK_active_mi); if (lex->verbose) + { + mysql_mutex_lock(&LOCK_active_mi); res= show_all_master_info(thd); + mysql_mutex_unlock(&LOCK_active_mi); + } else { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; Master_info *mi; - mi= master_info_index->get_master_info(&lex_mi->connection_name, - Sql_condition::WARN_LEVEL_ERROR); - if (mi != NULL) + if ((mi= get_master_info(&lex_mi->connection_name, + Sql_condition::WARN_LEVEL_ERROR))) { res= show_master_info(thd, mi, 0); + mi->release(); } } - mysql_mutex_unlock(&LOCK_active_mi); break; } case SQLCOM_SHOW_MASTER_STAT: @@ -3091,22 +3100,23 @@ end_with_restore_list: load_error= rpl_load_gtid_slave_state(thd); - mysql_mutex_lock(&LOCK_active_mi); - - if ((mi= (master_info_index-> - get_master_info(&lex_mi->connection_name, - Sql_condition::WARN_LEVEL_ERROR)))) + /* + We don't need to ensure that only one user is using master_info + as start_slave is protected against simultaneous usage + */ + if ((mi= get_master_info(&lex_mi->connection_name, + Sql_condition::WARN_LEVEL_ERROR))) { if (load_error) { /* - We cannot start a slave using GTID if we cannot load the GTID position - from the mysql.gtid_slave_pos table. But we can allow non-GTID - replication (useful eg. during upgrade). + We cannot start a slave using GTID if we cannot load the + GTID position from the mysql.gtid_slave_pos table. But we + can allow non-GTID replication (useful eg. during upgrade). */ if (mi->using_gtid != Master_info::USE_GTID_NO) { - mysql_mutex_unlock(&LOCK_active_mi); + mi->release(); break; } else @@ -3114,8 +3124,8 @@ end_with_restore_list: } if (!start_slave(thd, mi, 1 /* net report*/)) my_ok(thd); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); break; } case SQLCOM_SLAVE_STOP: @@ -3145,13 +3155,17 @@ end_with_restore_list: } lex_mi= &thd->lex->mi; - mysql_mutex_lock(&LOCK_active_mi); - if ((mi= (master_info_index-> - get_master_info(&lex_mi->connection_name, - Sql_condition::WARN_LEVEL_ERROR)))) - if (!stop_slave(thd, mi, 1/* net report*/)) + if ((mi= get_master_info(&lex_mi->connection_name, + Sql_condition::WARN_LEVEL_ERROR))) + { + if (stop_slave(thd, mi, 1/* net report*/)) + res= 1; + mi->release(); + if (rpl_parallel_resize_pool_if_no_slaves()) + res= 1; + if (!res) my_ok(thd); - mysql_mutex_unlock(&LOCK_active_mi); + } break; } case SQLCOM_SLAVE_ALL_START: @@ -4317,11 +4331,13 @@ end_with_restore_list: reload_acl_and_cache binlog interactions failed */ res= 1; - } + } if (!res) my_ok(thd); } + else + res= 1; // reload_acl_and_cache failed #ifdef HAVE_REPLICATION if (lex->type & REFRESH_READ_LOCK) rpl_unpause_after_ftwrl(thd); diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc index bcf2585ca2a..f9c5757f721 100644 --- a/sql/sql_reload.cc +++ b/sql/sql_reload.cc @@ -174,24 +174,20 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options, slave is not likely to have the same connection names. */ tmp_write_to_binlog= 0; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) + + if (!(mi= (get_master_info(&connection_name, + Sql_condition::WARN_LEVEL_ERROR)))) { - if (!(mi= (master_info_index-> - get_master_info(&connection_name, - Sql_condition::WARN_LEVEL_ERROR)))) - { - result= 1; - } - else - { - mysql_mutex_lock(&mi->data_lock); - if (rotate_relay_log(mi)) - *write_to_binlog= -1; - mysql_mutex_unlock(&mi->data_lock); - } + result= 1; + } + else + { + mysql_mutex_lock(&mi->data_lock); + if (rotate_relay_log(mi)) + *write_to_binlog= -1; + mysql_mutex_unlock(&mi->data_lock); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); #endif } #ifdef HAVE_QUERY_CACHE @@ -349,27 +345,33 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options, LEX_MASTER_INFO* lex_mi= &thd->lex->mi; Master_info *mi; tmp_write_to_binlog= 0; - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index) + + if (!(mi= get_master_info(&lex_mi->connection_name, + Sql_condition::WARN_LEVEL_ERROR))) { - if (!(mi= (master_info_index-> - get_master_info(&lex_mi->connection_name, - Sql_condition::WARN_LEVEL_ERROR)))) - { - result= 1; - } - else if (reset_slave(thd, mi)) + result= 1; + } + else + { + /* The following will fail if slave is running */ + if (reset_slave(thd, mi)) { + mi->release(); /* NOTE: my_error() has been already called by reset_slave(). */ result= 1; } else if (mi->connection_name.length && thd->lex->reset_slave_info.all) { /* If not default connection and 'all' is used */ - master_info_index->remove_master_info(&mi->connection_name); + mi->release(); + mysql_mutex_lock(&LOCK_active_mi); + if (master_info_index->remove_master_info(mi)) + result= 1; + mysql_mutex_unlock(&LOCK_active_mi); } + else + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); } #endif if (options & REFRESH_USER_RESOURCES) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 4505ad69880..29980dbbbf1 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -3265,8 +3265,8 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) LEX_MASTER_INFO* lex_mi= &thd->lex->mi; DBUG_ENTER("change_master"); - mysql_mutex_assert_owner(&LOCK_active_mi); DBUG_ASSERT(master_info_index); + mysql_mutex_assert_owner(&LOCK_active_mi); *master_info_added= false; /* @@ -3646,7 +3646,6 @@ bool mysql_show_binlog_events(THD* thd) int old_max_allowed_packet= thd->variables.max_allowed_packet; Master_info *mi= 0; LOG_INFO linfo; - DBUG_ENTER("mysql_show_binlog_events"); Log_event::init_show_field_list(&field_list); @@ -3674,13 +3673,9 @@ bool mysql_show_binlog_events(THD* thd) } else /* showing relay log contents */ { - mysql_mutex_lock(&LOCK_active_mi); - if (!master_info_index || - !(mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_ERROR))) + if (!(mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_ERROR))) { - mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(TRUE); } binary_log= &(mi->rli.relay_log); @@ -3700,7 +3695,7 @@ bool mysql_show_binlog_events(THD* thd) if (mi) { /* We can unlock the mutex as we have a lock on the file */ - mysql_mutex_unlock(&LOCK_active_mi); + mi->release(); mi= 0; } @@ -3722,6 +3717,7 @@ bool mysql_show_binlog_events(THD* thd) goto err; } + /* These locks is here to enable syncronization with log_in_use() */ mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = &linfo; mysql_mutex_unlock(&LOCK_thread_count); @@ -3799,7 +3795,7 @@ bool mysql_show_binlog_events(THD* thd) mysql_mutex_unlock(log_lock); } else if (mi) - mysql_mutex_unlock(&LOCK_active_mi); + mi->release(); // Check that linfo is still on the function scope. DEBUG_SYNC(thd, "after_show_binlog_events"); @@ -3820,8 +3816,9 @@ err: else my_eof(thd); + /* These locks is here to enable syncronization with log_in_use() */ mysql_mutex_lock(&LOCK_thread_count); - thd->current_linfo = 0; + thd->current_linfo= 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; DBUG_RETURN(ret); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 4adc19231bc..5ff79a2f235 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1527,7 +1527,6 @@ bool Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var) { String str, *res; - bool running; DBUG_ASSERT(var->type == OPT_GLOBAL); @@ -1538,11 +1537,7 @@ Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var) return true; } - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); - if (running) + if (give_error_if_slave_running(0)) return true; if (!(res= var->value->val_str(&str))) return true; @@ -1580,7 +1575,7 @@ Sys_var_gtid_slave_pos::global_update(THD *thd, set_var *var) mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); - if (!master_info_index || master_info_index->give_error_if_slave_running()) + if (give_error_if_slave_running(1)) err= true; else err= rpl_gtid_pos_update(thd, var->save_result.string_value.str, @@ -1766,16 +1761,7 @@ Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base) static bool check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) { - bool running; - - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); - if (running) - return true; - - return false; + return give_error_if_slave_running(0); } static bool @@ -1784,10 +1770,7 @@ fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type) bool err; mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - err= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); + err= give_error_if_slave_running(0); mysql_mutex_lock(&LOCK_global_system_variables); return err; @@ -1810,16 +1793,7 @@ static Sys_var_ulong Sys_slave_parallel_threads( static bool check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) { - bool running; - - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); - if (running) - return true; - - return false; + return give_error_if_slave_running(0); } static bool @@ -1828,13 +1802,10 @@ fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type) bool running; mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); + running= give_error_if_slave_running(0); mysql_mutex_lock(&LOCK_global_system_variables); - return running ? true : false; + return running; } @@ -1865,16 +1836,7 @@ static Sys_var_ulong Sys_slave_parallel_max_queued( static bool check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var) { - bool running; - - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); - if (running) - return true; - - return false; + return give_error_if_slave_running(0); } static bool @@ -1883,13 +1845,10 @@ fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type) bool running; mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - running= (!master_info_index || - master_info_index->give_error_if_slave_running()); - mysql_mutex_unlock(&LOCK_active_mi); + running= give_error_if_slave_running(0); mysql_mutex_lock(&LOCK_global_system_variables); - return running ? true : false; + return running; } @@ -2837,10 +2796,8 @@ Sys_var_replicate_events_marked_for_skip::global_update(THD *thd, set_var *var) DBUG_ENTER("Sys_var_replicate_events_marked_for_skip::global_update"); mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index && !master_info_index->give_error_if_slave_running()) + if (!give_error_if_slave_running(0)) result= Sys_var_enum::global_update(thd, var); - mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_global_system_variables); DBUG_RETURN(result); } @@ -4112,19 +4069,16 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var) Master_info *mi; mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); if (!var->base.length) // no base name { - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_ERROR); + mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_ERROR); } else // has base name { - mi= master_info_index-> - get_master_info(&var->base, - Sql_condition::WARN_LEVEL_WARN); + mi= get_master_info(&var->base, + Sql_condition::WARN_LEVEL_WARN); } if (mi) @@ -4132,17 +4086,17 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var) if (mi->rli.slave_running) { my_error(ER_SLAVE_MUST_STOP, MYF(0), - mi->connection_name.length, - mi->connection_name.str); + mi->connection_name.length, + mi->connection_name.str); result= true; } else { result= set_filter_value(var->save_result.string_value.str, mi); } + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_global_system_variables); return result; } @@ -4150,8 +4104,10 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var) bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) { bool status= true; - Rpl_filter* rpl_filter= mi ? mi->rpl_filter : global_rpl_filter; + Rpl_filter* rpl_filter= mi->rpl_filter; + /* Proctect against other threads */ + mysql_mutex_lock(&LOCK_active_mi); switch (opt_id) { case OPT_REPLICATE_DO_DB: status= rpl_filter->set_do_db(value); @@ -4172,7 +4128,7 @@ bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) status= rpl_filter->set_wild_ignore_table(value); break; } - + mysql_mutex_unlock(&LOCK_active_mi); return status; } @@ -4185,29 +4141,24 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base) Rpl_filter *rpl_filter; mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); if (!base->length) // no base name { - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_ERROR); + mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_ERROR); } else // has base name - { - mi= master_info_index-> - get_master_info(base, - Sql_condition::WARN_LEVEL_WARN); - } - mysql_mutex_lock(&LOCK_global_system_variables); + mi= get_master_info(base, Sql_condition::WARN_LEVEL_WARN); if (!mi) { - mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); return 0; } + rpl_filter= mi->rpl_filter; tmp.length(0); + mysql_mutex_lock(&LOCK_active_mi); switch (opt_id) { case OPT_REPLICATE_DO_DB: rpl_filter->get_do_db(&tmp); @@ -4228,9 +4179,12 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base) rpl_filter->get_wild_ignore_table(&tmp); break; } + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + + mi->release(); ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length()); - mysql_mutex_unlock(&LOCK_active_mi); return ret; } @@ -4301,17 +4255,12 @@ get_master_info_ulonglong_value(THD *thd, ptrdiff_t offset) Master_info *mi; ulonglong res= 0; // Default value mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_WARN); - if (mi) + if ((mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_WARN))) { - mysql_mutex_lock(&mi->rli.data_lock); res= *((ulonglong*) (((uchar*) mi) + master_info_offset)); - mysql_mutex_unlock(&mi->rli.data_lock); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_global_system_variables); return res; } @@ -4326,19 +4275,16 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd, if (type == OPT_GLOBAL) mysql_mutex_unlock(&LOCK_global_system_variables); - mysql_mutex_lock(&LOCK_active_mi); - mi= master_info_index-> - get_master_info(&thd->variables.default_master_connection, - Sql_condition::WARN_LEVEL_ERROR); - if (mi) + if ((mi= (get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_ERROR)))) { mysql_mutex_lock(&mi->rli.run_lock); mysql_mutex_lock(&mi->rli.data_lock); result= self->update_variable(thd, mi); mysql_mutex_unlock(&mi->rli.data_lock); mysql_mutex_unlock(&mi->rli.run_lock); + mi->release(); } - mysql_mutex_unlock(&LOCK_active_mi); if (type == OPT_GLOBAL) mysql_mutex_lock(&LOCK_global_system_variables); return result; |