summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc153
1 files changed, 128 insertions, 25 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 81597212c62..08cbf9acb6a 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -283,18 +283,27 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
-static bool slave_init_thread_running;
+static bool slave_background_thread_running;
+static bool slave_background_thread_stop;
+static bool slave_background_thread_gtid_loaded;
+
+struct slave_background_kill_t {
+ slave_background_kill_t *next;
+ THD *to_kill;
+} *slave_background_kill_list;
pthread_handler_t
-handle_slave_init(void *arg __attribute__((unused)))
+handle_slave_background(void *arg __attribute__((unused)))
{
THD *thd;
+ PSI_stage_info old_stage;
+ bool stop;
my_thread_init();
thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */
- thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
@@ -307,49 +316,137 @@ handle_slave_init(void *arg __attribute__((unused)))
rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
- delete thd;
- thread_safe_decrement32(&service_thread_count);
- /* Signal run_slave_init_thread() that we are done */
+ mysql_mutex_lock(&LOCK_slave_background);
+ slave_background_thread_gtid_loaded= true;
+ mysql_cond_broadcast(&COND_slave_background);
- mysql_mutex_lock(&LOCK_start_thread);
- slave_init_thread_running= false;
- mysql_cond_broadcast(&COND_start_thread);
- mysql_mutex_unlock(&LOCK_start_thread);
+ THD_STAGE_INFO(thd, stage_slave_background_process_request);
+ do
+ {
+ slave_background_kill_t *kill_list;
+
+ thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
+ &stage_slave_background_wait_request,
+ &old_stage);
+ for (;;)
+ {
+ stop= abort_loop || thd->killed || slave_background_thread_stop;
+ kill_list= slave_background_kill_list;
+ if (stop || kill_list)
+ break;
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ }
+
+ slave_background_kill_list= NULL;
+ thd->EXIT_COND(&old_stage);
+
+ while (kill_list)
+ {
+ slave_background_kill_t *p = kill_list;
+ THD *to_kill= p->to_kill;
+ kill_list= p->next;
+
+ mysql_mutex_lock(&to_kill->LOCK_thd_data);
+ to_kill->awake(KILL_CONNECTION);
+ mysql_mutex_unlock(&to_kill->LOCK_thd_data);
+ mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
+ to_kill->rgi_slave->killed_for_retry=
+ rpl_group_info::RETRY_KILL_KILLED;
+ mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
+ mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
+ my_free(p);
+ }
+ mysql_mutex_lock(&LOCK_slave_background);
+ } while (!stop);
+
+ slave_background_thread_running= false;
+ mysql_cond_broadcast(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
my_thread_end();
return 0;
}
+
+void
+slave_background_kill_request(THD *to_kill)
+{
+ if (to_kill->rgi_slave->killed_for_retry)
+ return; // Already deadlock killed.
+ slave_background_kill_t *p=
+ (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
+ if (p)
+ {
+ p->to_kill= to_kill;
+ to_kill->rgi_slave->killed_for_retry=
+ rpl_group_info::RETRY_KILL_PENDING;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_kill_list;
+ slave_background_kill_list= p;
+ mysql_cond_signal(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+ }
+}
+
+
/*
- Start the slave init thread.
+ Start the slave background thread.
+
+ This thread is currently used for two purposes:
+
+ 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
+ from table requires valid THD, which is otherwise not available during
+ server init.
- This thread is used to load the GTID state from mysql.gtid_slave_pos at
- server start; reading from table requires valid THD, which is otherwise not
- available during server init.
+ 2. To kill worker thread transactions during parallel replication, when a
+ storage engine attempts to take an errorneous conflicting lock that would
+ cause a deadlock. Killing is done asynchroneously, as the kill may not
+ be safe within the context of a callback from inside storage engine
+ locking code.
*/
static int
-run_slave_init_thread()
+start_slave_background_thread()
{
pthread_t th;
- slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
- handle_slave_init, NULL))
+ slave_background_thread_running= true;
+ slave_background_thread_stop= false;
+ slave_background_thread_gtid_loaded= false;
+ if (mysql_thread_create(key_thread_slave_background,
+ &th, &connection_attrib, handle_slave_background,
+ NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
- mysql_mutex_lock(&LOCK_start_thread);
- while (slave_init_thread_running)
- mysql_cond_wait(&COND_start_thread, &LOCK_start_thread);
- mysql_mutex_unlock(&LOCK_start_thread);
+ mysql_mutex_lock(&LOCK_slave_background);
+ while (!slave_background_thread_gtid_loaded)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
return 0;
}
+static void
+stop_slave_background_thread()
+{
+ mysql_mutex_lock(&LOCK_slave_background);
+ slave_background_thread_stop= true;
+ mysql_cond_broadcast(&COND_slave_background);
+ while (slave_background_thread_running)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+}
+
+
/* Initialize slave structures */
int init_slave()
@@ -361,7 +458,7 @@ int init_slave()
init_slave_psi_keys();
#endif
- if (run_slave_init_thread())
+ if (start_slave_background_thread())
return 1;
if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
@@ -647,6 +744,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
mysql_mutex_unlock(log_lock);
}
if (opt_slave_parallel_threads > 0 &&
+ master_info_index &&// master_info_index is set to NULL on server shutdown
!master_info_index->any_slave_sql_running())
rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
@@ -1000,6 +1098,9 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
+
+ stop_slave_background_thread();
+
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
@@ -4111,6 +4212,7 @@ connected:
}
DBUG_PRINT("info",("Starting reading binary log from master"));
+ thd->set_command(COM_SLAVE_IO);
while (!io_slave_killed(mi))
{
THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
@@ -4733,6 +4835,7 @@ pthread_handler_t handle_slave_sql(void *arg)
/* Read queries from the IO/THREAD until this thread is killed */
+ thd->set_command(COM_SLAVE_SQL);
while (!sql_slave_killed(serial_rgi))
{
THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log);
@@ -6203,9 +6306,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
#ifndef DBUG_OFF
mi->events_till_disconnect = disconnect_slave_event_count;
#endif
- ulong client_flag= 0;
+ ulong client_flag= CLIENT_REMEMBER_OPTIONS;
if (opt_slave_compressed_protocol)
- client_flag=CLIENT_COMPRESS; /* We will use compression */
+ client_flag|= CLIENT_COMPRESS; /* We will use compression */
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);