summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc145
1 files changed, 122 insertions, 23 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index f0613d048a9..8385d8f81a1 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -283,18 +283,27 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
-static bool slave_init_thread_running;
+static bool slave_background_thread_running;
+static bool slave_background_thread_stop;
+static bool slave_background_thread_gtid_loaded;
+
+struct slave_background_kill_t {
+ slave_background_kill_t *next;
+ THD *to_kill;
+} *slave_background_kill_list;
pthread_handler_t
-handle_slave_init(void *arg __attribute__((unused)))
+handle_slave_background(void *arg __attribute__((unused)))
{
THD *thd;
+ PSI_stage_info old_stage;
+ bool stop;
my_thread_init();
thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */
- thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
@@ -307,49 +316,136 @@ handle_slave_init(void *arg __attribute__((unused)))
rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
- delete thd;
- thread_safe_decrement32(&service_thread_count);
- /* Signal run_slave_init_thread() that we are done */
+ mysql_mutex_lock(&LOCK_slave_background);
+ slave_background_thread_gtid_loaded= true;
+ mysql_cond_broadcast(&COND_slave_background);
- mysql_mutex_lock(&LOCK_start_thread);
- slave_init_thread_running= false;
- mysql_cond_broadcast(&COND_start_thread);
- mysql_mutex_unlock(&LOCK_start_thread);
+ THD_STAGE_INFO(thd, stage_slave_background_process_request);
+ do
+ {
+ slave_background_kill_t *kill_list;
+
+ thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
+ &stage_slave_background_wait_request,
+ &old_stage);
+ for (;;)
+ {
+ stop= abort_loop || thd->killed || slave_background_thread_stop;
+ kill_list= slave_background_kill_list;
+ if (stop || kill_list)
+ break;
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ }
+
+ slave_background_kill_list= NULL;
+ thd->EXIT_COND(&old_stage);
+
+ while (kill_list)
+ {
+ slave_background_kill_t *p = kill_list;
+ THD *to_kill= p->to_kill;
+ kill_list= p->next;
+
+ mysql_mutex_lock(&to_kill->LOCK_thd_data);
+ to_kill->awake(KILL_CONNECTION);
+ mysql_mutex_unlock(&to_kill->LOCK_thd_data);
+ mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
+ to_kill->rgi_slave->killed_for_retry=
+ rpl_group_info::RETRY_KILL_KILLED;
+ mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
+ mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
+ my_free(p);
+ }
+ mysql_mutex_lock(&LOCK_slave_background);
+ } while (!stop);
+
+ slave_background_thread_running= false;
+ mysql_cond_broadcast(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
my_thread_end();
return 0;
}
+
+void
+slave_background_kill_request(THD *to_kill)
+{
+ if (to_kill->rgi_slave->killed_for_retry)
+ return; // Already deadlock killed.
+ slave_background_kill_t *p=
+ (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
+ if (p)
+ {
+ p->to_kill= to_kill;
+ to_kill->rgi_slave->killed_for_retry=
+ rpl_group_info::RETRY_KILL_PENDING;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_kill_list;
+ slave_background_kill_list= p;
+ mysql_cond_signal(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+ }
+}
+
+
/*
- Start the slave init thread.
+ Start the slave background thread.
+
+ This thread is currently used for two purposes:
+
+ 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
+ from table requires valid THD, which is otherwise not available during
+ server init.
- This thread is used to load the GTID state from mysql.gtid_slave_pos at
- server start; reading from table requires valid THD, which is otherwise not
- available during server init.
+ 2. To kill worker thread transactions during parallel replication, when a
+ storage engine attempts to take an errorneous conflicting lock that would
+ cause a deadlock. Killing is done asynchroneously, as the kill may not
+ be safe within the context of a callback from inside storage engine
+ locking code.
*/
static int
-run_slave_init_thread()
+start_slave_background_thread()
{
pthread_t th;
- slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
- handle_slave_init, NULL))
+ slave_background_thread_running= true;
+ slave_background_thread_stop= false;
+ slave_background_thread_gtid_loaded= false;
+ if (mysql_thread_create(key_thread_slave_background,
+ &th, &connection_attrib, handle_slave_background,
+ NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
- mysql_mutex_lock(&LOCK_start_thread);
- while (slave_init_thread_running)
- mysql_cond_wait(&COND_start_thread, &LOCK_start_thread);
- mysql_mutex_unlock(&LOCK_start_thread);
+ mysql_mutex_lock(&LOCK_slave_background);
+ while (!slave_background_thread_gtid_loaded)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
return 0;
}
+static void
+stop_slave_background_thread()
+{
+ mysql_mutex_lock(&LOCK_slave_background);
+ slave_background_thread_stop= true;
+ mysql_cond_broadcast(&COND_slave_background);
+ while (slave_background_thread_running)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+}
+
+
/* Initialize slave structures */
int init_slave()
@@ -361,7 +457,7 @@ int init_slave()
init_slave_psi_keys();
#endif
- if (run_slave_init_thread())
+ if (start_slave_background_thread())
return 1;
if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
@@ -1001,6 +1097,9 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
+
+ stop_slave_background_thread();
+
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;