diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 153 |
1 files changed, 128 insertions, 25 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 81597212c62..08cbf9acb6a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -283,18 +283,27 @@ static void init_slave_psi_keys(void) #endif /* HAVE_PSI_INTERFACE */ -static bool slave_init_thread_running; +static bool slave_background_thread_running; +static bool slave_background_thread_stop; +static bool slave_background_thread_gtid_loaded; + +struct slave_background_kill_t { + slave_background_kill_t *next; + THD *to_kill; +} *slave_background_kill_list; pthread_handler_t -handle_slave_init(void *arg __attribute__((unused))) +handle_slave_background(void *arg __attribute__((unused))) { THD *thd; + PSI_stage_info old_stage; + bool stop; my_thread_init(); thd= new THD(next_thread_id()); thd->thread_stack= (char*) &thd; /* Set approximate stack start */ - thd->system_thread = SYSTEM_THREAD_SLAVE_INIT; + thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; thread_safe_increment32(&service_thread_count); thd->store_globals(); thd->security_ctx->skip_grants(); @@ -307,49 +316,137 @@ handle_slave_init(void *arg __attribute__((unused))) rpl_gtid_slave_state_table_name.str, thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); - delete thd; - thread_safe_decrement32(&service_thread_count); - /* Signal run_slave_init_thread() that we are done */ + mysql_mutex_lock(&LOCK_slave_background); + slave_background_thread_gtid_loaded= true; + mysql_cond_broadcast(&COND_slave_background); - mysql_mutex_lock(&LOCK_start_thread); - slave_init_thread_running= false; - mysql_cond_broadcast(&COND_start_thread); - mysql_mutex_unlock(&LOCK_start_thread); + THD_STAGE_INFO(thd, stage_slave_background_process_request); + do + { + slave_background_kill_t *kill_list; + + thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background, + &stage_slave_background_wait_request, + &old_stage); + for (;;) + { + stop= abort_loop || thd->killed || slave_background_thread_stop; + kill_list= slave_background_kill_list; + if (stop || kill_list) + break; + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + } + + slave_background_kill_list= NULL; + thd->EXIT_COND(&old_stage); + + while (kill_list) + { + slave_background_kill_t *p = kill_list; + THD *to_kill= p->to_kill; + kill_list= p->next; + + mysql_mutex_lock(&to_kill->LOCK_thd_data); + to_kill->awake(KILL_CONNECTION); + mysql_mutex_unlock(&to_kill->LOCK_thd_data); + mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); + to_kill->rgi_slave->killed_for_retry= + rpl_group_info::RETRY_KILL_KILLED; + mysql_cond_broadcast(&to_kill->COND_wakeup_ready); + mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); + my_free(p); + } + mysql_mutex_lock(&LOCK_slave_background); + } while (!stop); + + slave_background_thread_running= false; + mysql_cond_broadcast(&COND_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); + + delete thd; + thread_safe_decrement32(&service_thread_count); + signal_thd_deleted(); my_thread_end(); return 0; } + +void +slave_background_kill_request(THD *to_kill) +{ + if (to_kill->rgi_slave->killed_for_retry) + return; // Already deadlock killed. + slave_background_kill_t *p= + (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME)); + if (p) + { + p->to_kill= to_kill; + to_kill->rgi_slave->killed_for_retry= + rpl_group_info::RETRY_KILL_PENDING; + mysql_mutex_lock(&LOCK_slave_background); + p->next= slave_background_kill_list; + slave_background_kill_list= p; + mysql_cond_signal(&COND_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); + } +} + + /* - Start the slave init thread. + Start the slave background thread. + + This thread is currently used for two purposes: + + 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading + from table requires valid THD, which is otherwise not available during + server init. - This thread is used to load the GTID state from mysql.gtid_slave_pos at - server start; reading from table requires valid THD, which is otherwise not - available during server init. + 2. To kill worker thread transactions during parallel replication, when a + storage engine attempts to take an errorneous conflicting lock that would + cause a deadlock. Killing is done asynchroneously, as the kill may not + be safe within the context of a callback from inside storage engine + locking code. */ static int -run_slave_init_thread() +start_slave_background_thread() { pthread_t th; - slave_init_thread_running= true; - if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib, - handle_slave_init, NULL)) + slave_background_thread_running= true; + slave_background_thread_stop= false; + slave_background_thread_gtid_loaded= false; + if (mysql_thread_create(key_thread_slave_background, + &th, &connection_attrib, handle_slave_background, + NULL)) { sql_print_error("Failed to create thread while initialising slave"); return 1; } - mysql_mutex_lock(&LOCK_start_thread); - while (slave_init_thread_running) - mysql_cond_wait(&COND_start_thread, &LOCK_start_thread); - mysql_mutex_unlock(&LOCK_start_thread); + mysql_mutex_lock(&LOCK_slave_background); + while (!slave_background_thread_gtid_loaded) + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); + return 0; } +static void +stop_slave_background_thread() +{ + mysql_mutex_lock(&LOCK_slave_background); + slave_background_thread_stop= true; + mysql_cond_broadcast(&COND_slave_background); + while (slave_background_thread_running) + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); +} + + /* Initialize slave structures */ int init_slave() @@ -361,7 +458,7 @@ int init_slave() init_slave_psi_keys(); #endif - if (run_slave_init_thread()) + if (start_slave_background_thread()) return 1; if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) @@ -647,6 +744,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) mysql_mutex_unlock(log_lock); } if (opt_slave_parallel_threads > 0 && + master_info_index &&// master_info_index is set to NULL on server shutdown !master_info_index->any_slave_sql_running()) rpl_parallel_inactivate_pool(&global_rpl_thread_pool); if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) @@ -1000,6 +1098,9 @@ void end_slave() master_info_index= 0; active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); + + stop_slave_background_thread(); + global_rpl_thread_pool.destroy(); free_all_rpl_filters(); DBUG_VOID_RETURN; @@ -4111,6 +4212,7 @@ connected: } DBUG_PRINT("info",("Starting reading binary log from master")); + thd->set_command(COM_SLAVE_IO); while (!io_slave_killed(mi)) { THD_STAGE_INFO(thd, stage_requesting_binlog_dump); @@ -4733,6 +4835,7 @@ pthread_handler_t handle_slave_sql(void *arg) /* Read queries from the IO/THREAD until this thread is killed */ + thd->set_command(COM_SLAVE_SQL); while (!sql_slave_killed(serial_rgi)) { THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log); @@ -6203,9 +6306,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, #ifndef DBUG_OFF mi->events_till_disconnect = disconnect_slave_event_count; #endif - ulong client_flag= 0; + ulong client_flag= CLIENT_REMEMBER_OPTIONS; if (opt_slave_compressed_protocol) - client_flag=CLIENT_COMPRESS; /* We will use compression */ + client_flag|= CLIENT_COMPRESS; /* We will use compression */ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); |