diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 287 |
1 files changed, 68 insertions, 219 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index cfefe5746d4..1da030084ef 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -67,6 +67,7 @@ Master_info_index *master_info_index; #include "rpl_parallel.h" #include "sql_show.h" #include "semisync_slave.h" +#include "sql_manager.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -364,10 +365,33 @@ end: return err; } +static THD *new_bg_THD() +{ + THD *thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + thd->variables.wsrep_on= 0; + return thd; +} -static void -handle_gtid_pos_auto_create_request(THD *thd, void *hton) +static void bg_gtid_delete_pending(void *) { + THD *thd= new_bg_THD(); + + rpl_slave_state::list_element *list; + list= rpl_global_gtid_slave_state->gtid_grab_pending_delete_list(); + rpl_global_gtid_slave_state->gtid_delete_pending(thd, &list); + if (list) + rpl_global_gtid_slave_state->put_back_list(list); + delete thd; +} + +static void bg_gtid_pos_auto_create(void *hton) +{ + THD *thd= NULL; int UNINIT_VAR(err); plugin_ref engine= NULL, *auto_engines; rpl_slave_state::gtid_pos_table *entry; @@ -379,7 +403,6 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) it. */ mysql_mutex_lock(&LOCK_global_system_variables); - engine= NULL; for (auto_engines= opt_gtid_pos_auto_plugins; auto_engines && *auto_engines; ++auto_engines) @@ -424,6 +447,7 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) table_name.str= loc_table_name.c_ptr_safe(); table_name.length= loc_table_name.length(); + thd= new_bg_THD(); err= gtid_pos_table_creation(thd, engine, &table_name); if (err) { @@ -451,47 +475,17 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); end: + delete thd; if (engine) plugin_unlock(NULL, engine); } - -static bool slave_background_thread_running; -static bool slave_background_thread_stop; static bool slave_background_thread_gtid_loaded; -static struct slave_background_kill_t { - slave_background_kill_t *next; - THD *to_kill; -} *slave_background_kill_list; - -static struct slave_background_gtid_pos_create_t { - slave_background_gtid_pos_create_t *next; - void *hton; -} *slave_background_gtid_pos_create_list; - -static volatile bool slave_background_gtid_pending_delete_flag; - - -pthread_handler_t -handle_slave_background(void *arg __attribute__((unused))) +static void bg_rpl_load_gtid_slave_state(void *) { - THD *thd; - PSI_stage_info old_stage; - bool stop; - - my_thread_init(); - thd= new THD(next_thread_id()); - thd->thread_stack= (char*) &thd; /* Set approximate stack start */ - thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; - thd->store_globals(); - thd->security_ctx->skip_grants(); - thd->set_command(COM_DAEMON); -#ifdef WITH_WSREP - thd->variables.wsrep_on= 0; -#endif + THD *thd= new_bg_THD(); thd->set_psi(PSI_CALL_get_thread()); - thd_proc_info(thd, "Loading slave GTID position from table"); if (rpl_load_gtid_slave_state(thd)) sql_print_warning("Failed to load slave replication state from table " @@ -500,207 +494,62 @@ handle_slave_background(void *arg __attribute__((unused))) thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); - mysql_mutex_lock(&LOCK_slave_background); + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); slave_background_thread_gtid_loaded= true; - mysql_cond_broadcast(&COND_slave_background); - - THD_STAGE_INFO(thd, stage_slave_background_process_request); - do - { - slave_background_kill_t *kill_list; - slave_background_gtid_pos_create_t *create_list; - bool pending_deletes; - - thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background, - &stage_slave_background_wait_request, - &old_stage); - for (;;) - { - stop= thd->killed || slave_background_thread_stop; - kill_list= slave_background_kill_list; - create_list= slave_background_gtid_pos_create_list; - pending_deletes= slave_background_gtid_pending_delete_flag; - if (stop || kill_list || create_list || pending_deletes) - break; - mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); - } - - slave_background_kill_list= NULL; - slave_background_gtid_pos_create_list= NULL; - slave_background_gtid_pending_delete_flag= false; - thd->EXIT_COND(&old_stage); - - while (kill_list) - { - slave_background_kill_t *p = kill_list; - THD *to_kill= p->to_kill; - kill_list= p->next; - - to_kill->awake(KILL_CONNECTION); - mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); - to_kill->rgi_slave->killed_for_retry= - rpl_group_info::RETRY_KILL_KILLED; - mysql_cond_broadcast(&to_kill->COND_wakeup_ready); - mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); - my_free(p); - } - - while (create_list) - { - slave_background_gtid_pos_create_t *next= create_list->next; - void *hton= create_list->hton; - handle_gtid_pos_auto_create_request(thd, hton); - my_free(create_list); - create_list= next; - } - - if (pending_deletes) - { - rpl_slave_state::list_element *list; - - slave_background_gtid_pending_delete_flag= false; - list= rpl_global_gtid_slave_state->gtid_grab_pending_delete_list(); - rpl_global_gtid_slave_state->gtid_delete_pending(thd, &list); - if (list) - rpl_global_gtid_slave_state->put_back_list(list); - } - - mysql_mutex_lock(&LOCK_slave_background); - } while (!stop); - - slave_background_thread_running= false; - mysql_cond_broadcast(&COND_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); - + mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); delete thd; - - my_thread_end(); - return 0; } +static void bg_slave_kill(void *victim) +{ + THD *to_kill= (THD *)victim; + to_kill->awake(KILL_CONNECTION); + mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; + mysql_cond_broadcast(&to_kill->COND_wakeup_ready); + mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); +} - -void -slave_background_kill_request(THD *to_kill) +void slave_background_kill_request(THD *to_kill) { if (to_kill->rgi_slave->killed_for_retry) return; // Already deadlock killed. - slave_background_kill_t *p= - (slave_background_kill_t *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*p), MYF(MY_WME)); - if (p) - { - p->to_kill= to_kill; - to_kill->rgi_slave->killed_for_retry= - rpl_group_info::RETRY_KILL_PENDING; - mysql_mutex_lock(&LOCK_slave_background); - p->next= slave_background_kill_list; - slave_background_kill_list= p; - mysql_cond_signal(&COND_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); - } + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING; + mysql_manager_submit(bg_slave_kill, to_kill); } - /* This function must only be called from a slave SQL thread (or worker thread), to ensure that the table_entry will not go away before we can lock the LOCK_slave_state. */ -void -slave_background_gtid_pos_create_request( +void slave_background_gtid_pos_create_request( rpl_slave_state::gtid_pos_table *table_entry) { - slave_background_gtid_pos_create_t *p; - if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) return; - p= (slave_background_gtid_pos_create_t *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*p), MYF(MY_WME)); - if (!p) - return; mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) { - my_free(p); mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); return; } table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED; mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); - p->hton= table_entry->table_hton; - mysql_mutex_lock(&LOCK_slave_background); - p->next= slave_background_gtid_pos_create_list; - slave_background_gtid_pos_create_list= p; - mysql_cond_signal(&COND_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); + mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton); } /* - Request the slave background thread to delete no longer used rows from the + Request the manager thread to delete no longer used rows from the mysql.gtid_slave_pos* tables. - - This is called from time-critical rpl_slave_state::update(), so we avoid - taking any locks here. This means we may race with the background thread - to occasionally lose a signal. This is not a problem; any pending rows to - be deleted will just be deleted a bit later as part of the next batch. */ -void -slave_background_gtid_pending_delete_request(void) +void slave_background_gtid_pending_delete_request(void) { - slave_background_gtid_pending_delete_flag= true; - mysql_cond_signal(&COND_slave_background); -} - - -/* - Start the slave background thread. - - This thread is currently used for two purposes: - - 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading - from table requires valid THD, which is otherwise not available during - server init. - - 2. To kill worker thread transactions during parallel replication, when a - storage engine attempts to take an errorneous conflicting lock that would - cause a deadlock. Killing is done asynchroneously, as the kill may not - be safe within the context of a callback from inside storage engine - locking code. -*/ -static int -start_slave_background_thread() -{ - pthread_t th; - - slave_background_thread_running= true; - slave_background_thread_stop= false; - slave_background_thread_gtid_loaded= false; - if (mysql_thread_create(key_thread_slave_background, - &th, &connection_attrib, handle_slave_background, - NULL)) - { - sql_print_error("Failed to create thread while initialising slave"); - return 1; - } - mysql_mutex_lock(&LOCK_slave_background); - while (!slave_background_thread_gtid_loaded) - mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); - - return 0; -} - - -static void -stop_slave_background_thread() -{ - mysql_mutex_lock(&LOCK_slave_background); - slave_background_thread_stop= true; - mysql_cond_broadcast(&COND_slave_background); - while (slave_background_thread_running) - mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); + mysql_manager_submit(bg_gtid_delete_pending, NULL); } @@ -715,12 +564,19 @@ int init_slave() init_slave_psi_keys(); #endif - if (start_slave_background_thread()) - return 1; - if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) return 1; + slave_background_thread_gtid_loaded= false; + mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL); + + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + while (!slave_background_thread_gtid_loaded) + mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool, + &global_rpl_thread_pool.LOCK_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + /* This is called when mysqld starts. Before client connections are accepted. However bootstrap may conflict with us if it does START SLAVE. @@ -1216,12 +1072,8 @@ terminate_slave_thread(THD *thd, int error __attribute__((unused)); DBUG_PRINT("loop", ("killing slave thread")); -#ifdef WITH_WSREP - /* awake_no_mutex() requires LOCK_thd_data to be locked if wsrep - is enabled */ - if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); -#endif /* WITH_WSREP */ mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); #ifndef DONT_USE_THR_ALARM /* Error codes from pthread_kill are: @@ -1234,9 +1086,7 @@ terminate_slave_thread(THD *thd, thd->awake_no_mutex(NOT_KILLED); mysql_mutex_unlock(&thd->LOCK_thd_kill); -#ifdef WITH_WSREP - if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data); -#endif /* WITH_WSREP */ + mysql_mutex_unlock(&thd->LOCK_thd_data); /* There is a small chance that slave thread might miss the first @@ -1449,7 +1299,6 @@ void slave_prepare_for_shutdown() // It's safe to destruct worker pool now when // all driver threads are gone. global_rpl_thread_pool.deactivate(); - stop_slave_background_thread(); } /* @@ -1480,8 +1329,6 @@ void end_slave() active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); - stop_slave_background_thread(); - global_rpl_thread_pool.destroy(); free_all_rpl_filters(); DBUG_VOID_RETURN; @@ -4783,10 +4630,7 @@ pthread_handler_t handle_slave_io(void *arg) goto err; } - -#ifdef WITH_WSREP thd->variables.wsrep_on= 0; -#endif if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) || repl_semisync_slave.slave_start(mi)) { @@ -5106,8 +4950,11 @@ log space"); err: // print the current replication position if (mi->using_gtid == Master_info::USE_GTID_NO) + { sql_print_information("Slave I/O thread exiting, read up to log '%s', " "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos); + sql_print_information("master was %s:%d", mi->host, mi->port); + } else { StringBuffer<100> tmp; @@ -5116,6 +4963,7 @@ err: "position %llu; GTID position %s", IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); } repl_semisync_slave.slave_stop(mi); thd->reset_query(); @@ -5720,6 +5568,7 @@ pthread_handler_t handle_slave_sql(void *arg) sql_print_information("Slave SQL thread exiting, replication stopped in " "log '%s' at position %llu%s", RPL_LOG_NAME, rli->group_master_log_pos, tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); } #ifdef WITH_WSREP wsrep_after_command_before_result(thd); |