diff options
author | Sergey Vojtovich <svoj@mariadb.org> | 2019-03-07 08:12:26 +0400 |
---|---|---|
committer | Andrei Elkin <andrei.elkin@mariadb.com> | 2019-03-12 17:34:48 +0200 |
commit | 3568427d11f7afcd111b4c28c14cc8aba2b10807 (patch) | |
tree | 91102931e6813ca183d1f669b7e6da5725a3dce9 /sql | |
parent | e4505279388e3c1e84937b6737bf5619a15a7d4c (diff) | |
download | mariadb-git-3568427d11f7afcd111b4c28c14cc8aba2b10807.tar.gz |
MDEV-18450 Slaves wait shutdown
The patches features an optional shutdown behavior to hold on until
after all connected slaves have been sent the last binlogged event.
The connected slave is one whose START SLAVE has been acknowledged and
that was not stopped since that though it could be technically
reconnecting in background.
The solution therefore disallows killing the dump thread until is has
found EOF of the latest binlog file. It is up to the shutdown
requester (DBA) to set up a sufficiently large shutdown timeout value
for shudown to wait patiently until lagging behind slaves have been
synchronized. On the other hand if a specific slave needs exclusion
from synchronization the DBA would have to stop it manually which
would terminate its dump thread.
`mysqladmin shutdown' is extended with a `--wait_for_all_slaves' option
which translates to `SHUTDOW WAIT FOR ALL SLAVES' sql query
to enable the feature on the client side.
The patch also performs a small refactoring of the server shutdown
around close_connections() to introduce kill thread phases which
are two as of current.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/mysqld.cc | 119 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 12 | ||||
-rw-r--r-- | sql/sql_class.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.h | 28 | ||||
-rw-r--r-- | sql/sql_lex.h | 1 | ||||
-rw-r--r-- | sql/sql_parse.cc | 1 | ||||
-rw-r--r-- | sql/sql_repl.cc | 44 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 9 | ||||
-rw-r--r-- | sql/sql_yacc_ora.yy | 8 |
9 files changed, 173 insertions, 51 deletions
diff --git a/sql/mysqld.cc b/sql/mysqld.cc index bf1780ed082..fa9387ac51a 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -466,6 +466,7 @@ uint protocol_version; uint lower_case_table_names; ulong tc_heuristic_recover= 0; Atomic_counter<uint32_t> thread_count; +bool shutdown_wait_for_slaves; int32 slave_open_temp_tables; ulong thread_created; ulong back_log, connect_timeout, concurrency, server_id; @@ -1519,19 +1520,9 @@ static void end_ssl(); ** Code to end mysqld ****************************************************************************/ -static my_bool kill_all_threads(THD *thd, void *) +/* common callee of two shutdown phases */ +static void kill_thread(THD *thd) { - DBUG_PRINT("quit", ("Informing thread %ld that it's time to die", - (ulong) thd->thread_id)); - /* We skip slave threads on this first loop through. */ - if (thd->slave_thread) - return 0; - - if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0)) - return 0; - - thd->set_killed(KILL_SERVER_HARD); - MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_kill); if (thd->mysys_var) @@ -1557,16 +1548,73 @@ static my_bool kill_all_threads(THD *thd, void *) } mysql_mutex_unlock(&thd->LOCK_thd_kill); if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +/** + First shutdown everything but slave threads and binlog dump connections +*/ +static my_bool kill_thread_phase_1(THD *thd, void *) +{ + DBUG_PRINT("quit", ("Informing thread %ld that it's time to die", + (ulong) thd->thread_id)); + if (thd->slave_thread || thd->is_binlog_dump_thread()) + return 0; + + if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0)) + return 0; + + thd->set_killed(KILL_SERVER_HARD); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + kill_thread(thd); return 0; } -static my_bool warn_threads_still_active(THD *thd, void *) +/** + Last shutdown binlog dump connections +*/ +static my_bool kill_thread_phase_2(THD *thd, void *) +{ + if (shutdown_wait_for_slaves) + { + thd->set_killed(KILL_SERVER); + } + else + { + thd->set_killed(KILL_SERVER_HARD); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + } + kill_thread(thd); + return 0; +} + + +/* associated with the kill thread phase 1 */ +static my_bool warn_threads_active_after_phase_1(THD *thd, void *) +{ + if (!thd->is_binlog_dump_thread()) + sql_print_warning("%s: Thread %llu (user : '%s') did not exit\n", my_progname, + (ulonglong) thd->thread_id, + (thd->main_security_ctx.user ? + thd->main_security_ctx.user : "")); + return 0; +} + + +/* associated with the kill thread phase 2 */ +static my_bool warn_threads_active_after_phase_2(THD *thd, void *) { - sql_print_warning("%s: Thread %llu (user : '%s') did not exit\n", my_progname, - (ulonglong) thd->thread_id, - (thd->main_security_ctx.user ? - thd->main_security_ctx.user : "")); + mysql_mutex_lock(&thd->LOCK_thd_data); + // dump thread may not have yet (or already) current_linfo set + sql_print_warning("Dump thread %llu last sent to server %lu " + "binlog file:pos %s:%llu", + thd->thread_id, thd->variables.server_id, + thd->current_linfo ? + my_basename(thd->current_linfo->log_file_name) : "NULL", + thd->current_linfo ? thd->current_linfo->pos : 0); + mysql_mutex_unlock(&thd->LOCK_thd_data); + return 0; } @@ -1650,6 +1698,21 @@ void kill_mysql(THD *thd) { my_free(user); } + + DBUG_EXECUTE_IF("mysql_admin_shutdown_wait_for_slaves", + thd->lex->is_shutdown_wait_for_slaves= true;); + DBUG_EXECUTE_IF("simulate_delay_at_shutdown", + { + DBUG_ASSERT(binlog_dump_thread_count == 3); + const char act[]= + "now " + "SIGNAL greetings_from_kill_mysql"; + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + };); + + if (thd->lex->is_shutdown_wait_for_slaves) + shutdown_wait_for_slaves= true; break_connect_loop(); } @@ -1693,7 +1756,7 @@ static void close_connections(void) This will give the threads some time to gracefully abort their statements and inform their clients that the server is about to die. */ - server_threads.iterate(kill_all_threads); + server_threads.iterate(kill_thread_phase_1); Events::deinit(); slave_prepare_for_shutdown(); @@ -1716,11 +1779,11 @@ static void close_connections(void) */ DBUG_PRINT("info", ("thread_count: %u", uint32_t(thread_count))); - for (int i= 0; thread_count && i < 1000; i++) + for (int i= 0; (thread_count - binlog_dump_thread_count) && i < 1000; i++) my_sleep(20000); if (global_system_variables.log_warnings) - server_threads.iterate(warn_threads_still_active); + server_threads.iterate(warn_threads_active_after_phase_1); #ifdef WITH_WSREP if (wsrep_inited == 1) @@ -1732,9 +1795,23 @@ static void close_connections(void) DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", uint32_t(thread_count))); - while (thread_count) + while (thread_count - binlog_dump_thread_count) my_sleep(1000); + /* Kill phase 2 */ + server_threads.iterate(kill_thread_phase_2); + for (uint64 i= 0; thread_count; i++) + { + /* + This time the warnings are emitted within the loop to provide a + dynamic view on the shutdown status through the errorlog. + */ + if (global_system_variables.log_warnings > 2 && i % 60000 == 0) + server_threads.iterate(warn_threads_active_after_phase_2); + my_sleep(1000); + } + /* End of kill phase 2 */ + DBUG_PRINT("quit",("close_connections thread")); DBUG_VOID_RETURN; } diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index f13ff6c9163..e7873b185c5 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -147,11 +147,11 @@ int THD::register_slave(uchar *packet, size_t packet_length) if (!(si->master_id= uint4korr(p))) si->master_id= global_system_variables.server_id; - binlog_dump_thread_count++; unregister_slave(); mysql_mutex_lock(&LOCK_thd_data); slave_info= si; mysql_mutex_unlock(&LOCK_thd_data); + binlog_dump_thread_count++; return 0; err: @@ -161,6 +161,16 @@ err: } +bool THD::is_binlog_dump_thread() +{ + mysql_mutex_lock(&LOCK_thd_data); + bool res= slave_info != NULL; + mysql_mutex_unlock(&LOCK_thd_data); + + return res; +} + + static my_bool show_slave_hosts_callback(THD *thd, Protocol *protocol) { my_bool res= FALSE; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index bbba5218989..e2bc6ef1d90 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -644,6 +644,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) m_tmp_tables_locked(false) #ifdef HAVE_REPLICATION , + current_linfo(0), slave_info(0) #endif #ifdef WITH_WSREP @@ -751,7 +752,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) progress.arena= 0; progress.report_to_client= 0; progress.max_counter= 0; - current_linfo = 0; slave_thread = 0; connection_name.str= 0; connection_name.length= 0; diff --git a/sql/sql_class.h b/sql/sql_class.h index 23c51067734..96f9e6643fe 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3194,12 +3194,6 @@ public: /** number of name_const() substitutions, see sp_head.cc:subst_spvars() */ uint query_name_consts; - /* - If we do a purge of binary logs, log index info of the threads - that are currently reading it needs to be adjusted. To do that - each thread that is using LOG_INFO needs to adjust the pointer to it - */ - LOG_INFO* current_linfo; NET* slave_net; // network connection from slave -> m. /* @@ -4813,10 +4807,20 @@ private: public: #ifdef HAVE_REPLICATION + /* + If we do a purge of binary logs, log index info of the threads + that are currently reading it needs to be adjusted. To do that + each thread that is using LOG_INFO needs to adjust the pointer to it + */ + LOG_INFO *current_linfo; Slave_info *slave_info; + void set_current_linfo(LOG_INFO *linfo); + void reset_current_linfo() { set_current_linfo(0); } + int register_slave(uchar *packet, size_t packet_length); void unregister_slave(); + bool is_binlog_dump_thread(); #endif inline ulong wsrep_binlog_format() const @@ -4975,18 +4979,6 @@ public: (THD_TRANS::DID_WAIT | THD_TRANS::CREATED_TEMP_TABLE | THD_TRANS::DROPPED_TEMP_TABLE | THD_TRANS::DID_DDL)); } - /* - Reset current_linfo - Setting current_linfo to 0 needs to be done with LOCK_thd_data to - ensure that adjust_linfo_offsets doesn't use a structure that may - be deleted. - */ - inline void reset_current_linfo() - { - mysql_mutex_lock(&LOCK_thd_data); - current_linfo= 0; - mysql_mutex_unlock(&LOCK_thd_data); - } uint get_net_wait_timeout() diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 61816bee803..8e983551dbe 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -3226,6 +3226,7 @@ public: /* The following is used by KILL */ killed_state kill_signal; killed_type kill_type; + bool is_shutdown_wait_for_slaves; /* This variable is used in post-parse stage to declare that sum-functions, or functions which have sense only if GROUP BY is present, are allowed. diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 5a520c0df02..91f43fdbaa6 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2168,6 +2168,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, */ enum mysql_enum_shutdown_level level; level= (enum mysql_enum_shutdown_level) (uchar) packet[0]; + thd->lex->is_shutdown_wait_for_slaves= false; // "deferred" cleanup if (level == SHUTDOWN_DEFAULT) level= SHUTDOWN_WAIT_ALL_BUFFERS; // soon default will be configurable else if (level != SHUTDOWN_WAIT_ALL_BUFFERS) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 7fc3bb5926d..4c7a768b9ce 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -506,6 +506,22 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * DBUG_RETURN(ret); } + +/** + Set current_linfo + + Setting current_linfo needs to be done with LOCK_thd_data to ensure that + adjust_linfo_offsets doesn't use a structure that may be deleted. +*/ + +void THD::set_current_linfo(LOG_INFO *linfo) +{ + mysql_mutex_lock(&LOCK_thd_data); + current_linfo= linfo; + mysql_mutex_unlock(&LOCK_thd_data); +} + + /* Adjust the position pointer in the binary log file for all running slaves @@ -2125,9 +2141,8 @@ static int init_binlog_sender(binlog_send_info *info, // set current pos too linfo->pos= *pos; - // note: publish that we use file, before we open it - thd->current_linfo= linfo; + thd->set_current_linfo(linfo); if (check_start_offset(info, linfo->log_file_name, *pos)) return 1; @@ -2365,14 +2380,15 @@ static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log, DBUG_RETURN(0); } -static bool should_stop(binlog_send_info *info) +static bool should_stop(binlog_send_info *info, bool kill_server_check= false) { return - info->net->error || - info->net->vio == NULL || - info->thd->killed || - info->error != 0 || - info->should_stop; + info->net->error || + info->net->vio == NULL || + (info->thd->killed && + (info->thd->killed != KILL_SERVER || kill_server_check)) || + info->error != 0 || + info->should_stop; } /** @@ -2393,7 +2409,7 @@ static int wait_new_events(binlog_send_info *info, /* in */ &stage_master_has_sent_all_binlog_to_slave, &old_stage); - while (!should_stop(info)) + while (!should_stop(info, true)) { *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0) @@ -2745,6 +2761,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, info->error= ER_UNKNOWN_ERROR; goto err; } + DBUG_EXECUTE_IF("simulate_delay_at_shutdown", + { + const char act[]= + "now " + "WAIT_FOR greetings_from_kill_mysql"; + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + };); /* heartbeat_period from @master_heartbeat_period user variable @@ -3952,7 +3976,7 @@ bool mysql_show_binlog_events(THD* thd) goto err; } - thd->current_linfo= &linfo; + thd->set_current_linfo(&linfo); if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) goto err; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 970f5d50334..5c911928a9a 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -14717,6 +14717,15 @@ kill_expr: shutdown: SHUTDOWN { Lex->sql_command= SQLCOM_SHUTDOWN; } + shutdown_option {} + ; + +shutdown_option: + /* Empty */ { Lex->is_shutdown_wait_for_slaves= false; } + | WAIT_SYM FOR_SYM ALL SLAVES + { + Lex->is_shutdown_wait_for_slaves= true; + } ; /* change database */ diff --git a/sql/sql_yacc_ora.yy b/sql/sql_yacc_ora.yy index e709e7e3afa..b6ea6ba9970 100644 --- a/sql/sql_yacc_ora.yy +++ b/sql/sql_yacc_ora.yy @@ -14862,8 +14862,16 @@ kill_expr: shutdown: SHUTDOWN { Lex->sql_command= SQLCOM_SHUTDOWN; } + shutdown_option {} ; +shutdown_option: + /* Empty */ { Lex->is_shutdown_wait_for_slaves= false; } + | WAIT_SYM FOR_SYM ALL SLAVES + { + Lex->is_shutdown_wait_for_slaves= true; + } + ; /* change database */ use: |