summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc277
1 files changed, 66 insertions, 211 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 9d4049c6452..40c09604745 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -64,6 +64,7 @@
#include "rpl_parallel.h"
#include "sql_show.h"
#include "semisync_slave.h"
+#include "sql_manager.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -362,10 +363,33 @@ end:
return err;
}
+static THD *new_bg_THD()
+{
+ THD *thd= new THD(next_thread_id());
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
+ thd->variables.wsrep_on= 0;
+ return thd;
+}
-static void
-handle_gtid_pos_auto_create_request(THD *thd, void *hton)
+static void bg_gtid_delete_pending(void *)
{
+ THD *thd= new_bg_THD();
+
+ rpl_slave_state::list_element *list;
+ list= rpl_global_gtid_slave_state->gtid_grab_pending_delete_list();
+ rpl_global_gtid_slave_state->gtid_delete_pending(thd, &list);
+ if (list)
+ rpl_global_gtid_slave_state->put_back_list(list);
+ delete thd;
+}
+
+static void bg_gtid_pos_auto_create(void *hton)
+{
+ THD *thd= NULL;
int UNINIT_VAR(err);
plugin_ref engine= NULL, *auto_engines;
rpl_slave_state::gtid_pos_table *entry;
@@ -377,7 +401,6 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
it.
*/
mysql_mutex_lock(&LOCK_global_system_variables);
- engine= NULL;
for (auto_engines= opt_gtid_pos_auto_plugins;
auto_engines && *auto_engines;
++auto_engines)
@@ -422,6 +445,7 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
table_name.str= loc_table_name.c_ptr_safe();
table_name.length= loc_table_name.length();
+ thd= new_bg_THD();
err= gtid_pos_table_creation(thd, engine, &table_name);
if (err)
{
@@ -449,46 +473,16 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
end:
+ delete thd;
if (engine)
plugin_unlock(NULL, engine);
}
-
-static bool slave_background_thread_running;
-static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
-static struct slave_background_kill_t {
- slave_background_kill_t *next;
- THD *to_kill;
-} *slave_background_kill_list;
-
-static struct slave_background_gtid_pos_create_t {
- slave_background_gtid_pos_create_t *next;
- void *hton;
-} *slave_background_gtid_pos_create_list;
-
-static volatile bool slave_background_gtid_pending_delete_flag;
-
-
-pthread_handler_t
-handle_slave_background(void *arg __attribute__((unused)))
+static void bg_rpl_load_gtid_slave_state(void *)
{
- THD *thd;
- PSI_stage_info old_stage;
- bool stop;
-
- my_thread_init();
- thd= new THD(next_thread_id());
- thd->thread_stack= (char*) &thd; /* Set approximate stack start */
- thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
- thd->store_globals();
- thd->security_ctx->skip_grants();
- thd->set_command(COM_DAEMON);
-#ifdef WITH_WSREP
- thd->variables.wsrep_on= 0;
-#endif
-
+ THD *thd= new_bg_THD();
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
sql_print_warning("Failed to load slave replication state from table "
@@ -497,207 +491,62 @@ handle_slave_background(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
- mysql_mutex_lock(&LOCK_slave_background);
+ // hijacking global_rpl_thread_pool cond here - it's only once on startup
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
slave_background_thread_gtid_loaded= true;
- mysql_cond_broadcast(&COND_slave_background);
-
- THD_STAGE_INFO(thd, stage_slave_background_process_request);
- do
- {
- slave_background_kill_t *kill_list;
- slave_background_gtid_pos_create_t *create_list;
- bool pending_deletes;
-
- thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
- &stage_slave_background_wait_request,
- &old_stage);
- for (;;)
- {
- stop= thd->killed || slave_background_thread_stop;
- kill_list= slave_background_kill_list;
- create_list= slave_background_gtid_pos_create_list;
- pending_deletes= slave_background_gtid_pending_delete_flag;
- if (stop || kill_list || create_list || pending_deletes)
- break;
- mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
- }
-
- slave_background_kill_list= NULL;
- slave_background_gtid_pos_create_list= NULL;
- slave_background_gtid_pending_delete_flag= false;
- thd->EXIT_COND(&old_stage);
-
- while (kill_list)
- {
- slave_background_kill_t *p = kill_list;
- THD *to_kill= p->to_kill;
- kill_list= p->next;
-
- to_kill->awake(KILL_CONNECTION);
- mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
- to_kill->rgi_slave->killed_for_retry=
- rpl_group_info::RETRY_KILL_KILLED;
- mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
- mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
- my_free(p);
- }
-
- while (create_list)
- {
- slave_background_gtid_pos_create_t *next= create_list->next;
- void *hton= create_list->hton;
- handle_gtid_pos_auto_create_request(thd, hton);
- my_free(create_list);
- create_list= next;
- }
-
- if (pending_deletes)
- {
- rpl_slave_state::list_element *list;
-
- slave_background_gtid_pending_delete_flag= false;
- list= rpl_global_gtid_slave_state->gtid_grab_pending_delete_list();
- rpl_global_gtid_slave_state->gtid_delete_pending(thd, &list);
- if (list)
- rpl_global_gtid_slave_state->put_back_list(list);
- }
-
- mysql_mutex_lock(&LOCK_slave_background);
- } while (!stop);
-
- slave_background_thread_running= false;
- mysql_cond_broadcast(&COND_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
-
+ mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool);
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
delete thd;
-
- my_thread_end();
- return 0;
}
+static void bg_slave_kill(void *victim)
+{
+ THD *to_kill= (THD *)victim;
+ to_kill->awake(KILL_CONNECTION);
+ mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
+ to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
+ mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
+ mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
+}
-
-void
-slave_background_kill_request(THD *to_kill)
+void slave_background_kill_request(THD *to_kill)
{
if (to_kill->rgi_slave->killed_for_retry)
return; // Already deadlock killed.
- slave_background_kill_t *p=
- (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
- if (p)
- {
- p->to_kill= to_kill;
- to_kill->rgi_slave->killed_for_retry=
- rpl_group_info::RETRY_KILL_PENDING;
- mysql_mutex_lock(&LOCK_slave_background);
- p->next= slave_background_kill_list;
- slave_background_kill_list= p;
- mysql_cond_signal(&COND_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
- }
+ to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING;
+ mysql_manager_submit(bg_slave_kill, to_kill);
}
-
/*
This function must only be called from a slave SQL thread (or worker thread),
to ensure that the table_entry will not go away before we can lock the
LOCK_slave_state.
*/
-void
-slave_background_gtid_pos_create_request(
+void slave_background_gtid_pos_create_request(
rpl_slave_state::gtid_pos_table *table_entry)
{
- slave_background_gtid_pos_create_t *p;
-
if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
return;
- p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME));
- if (!p)
- return;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
{
- my_free(p);
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
return;
}
table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- p->hton= table_entry->table_hton;
- mysql_mutex_lock(&LOCK_slave_background);
- p->next= slave_background_gtid_pos_create_list;
- slave_background_gtid_pos_create_list= p;
- mysql_cond_signal(&COND_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
+ mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton);
}
/*
- Request the slave background thread to delete no longer used rows from the
+ Request the manager thread to delete no longer used rows from the
mysql.gtid_slave_pos* tables.
-
- This is called from time-critical rpl_slave_state::update(), so we avoid
- taking any locks here. This means we may race with the background thread
- to occasionally lose a signal. This is not a problem; any pending rows to
- be deleted will just be deleted a bit later as part of the next batch.
-*/
-void
-slave_background_gtid_pending_delete_request(void)
-{
- slave_background_gtid_pending_delete_flag= true;
- mysql_cond_signal(&COND_slave_background);
-}
-
-
-/*
- Start the slave background thread.
-
- This thread is currently used for two purposes:
-
- 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
- from table requires valid THD, which is otherwise not available during
- server init.
-
- 2. To kill worker thread transactions during parallel replication, when a
- storage engine attempts to take an errorneous conflicting lock that would
- cause a deadlock. Killing is done asynchroneously, as the kill may not
- be safe within the context of a callback from inside storage engine
- locking code.
*/
-static int
-start_slave_background_thread()
+void slave_background_gtid_pending_delete_request(void)
{
- pthread_t th;
-
- slave_background_thread_running= true;
- slave_background_thread_stop= false;
- slave_background_thread_gtid_loaded= false;
- if (mysql_thread_create(key_thread_slave_background,
- &th, &connection_attrib, handle_slave_background,
- NULL))
- {
- sql_print_error("Failed to create thread while initialising slave");
- return 1;
- }
- mysql_mutex_lock(&LOCK_slave_background);
- while (!slave_background_thread_gtid_loaded)
- mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
-
- return 0;
-}
-
-
-static void
-stop_slave_background_thread()
-{
- mysql_mutex_lock(&LOCK_slave_background);
- slave_background_thread_stop= true;
- mysql_cond_broadcast(&COND_slave_background);
- while (slave_background_thread_running)
- mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
+ mysql_manager_submit(bg_gtid_delete_pending, NULL);
}
@@ -712,12 +561,19 @@ int init_slave()
init_slave_psi_keys();
#endif
- if (start_slave_background_thread())
- return 1;
-
if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
return 1;
+ slave_background_thread_gtid_loaded= false;
+ mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL);
+
+ // hijacking global_rpl_thread_pool cond here - it's only once on startup
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ while (!slave_background_thread_gtid_loaded)
+ mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool,
+ &global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+
/*
This is called when mysqld starts. Before client connections are
accepted. However bootstrap may conflict with us if it does START SLAVE.
@@ -1446,7 +1302,6 @@ void slave_prepare_for_shutdown()
// It's safe to destruct worker pool now when
// all driver threads are gone.
global_rpl_thread_pool.deactivate();
- stop_slave_background_thread();
}
/*
@@ -1477,8 +1332,6 @@ void end_slave()
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
- stop_slave_background_thread();
-
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
@@ -4737,10 +4590,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err;
}
-
-#ifdef WITH_WSREP
thd->variables.wsrep_on= 0;
-#endif
if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|| repl_semisync_slave.slave_start(mi))
{
@@ -5060,8 +4910,11 @@ log space");
err:
// print the current replication position
if (mi->using_gtid == Master_info::USE_GTID_NO)
+ {
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
"position %llu", IO_RPL_LOG_NAME, mi->master_log_pos);
+ sql_print_information("master was %s:%d", mi->host, mi->port);
+ }
else
{
StringBuffer<100> tmp;
@@ -5070,6 +4923,7 @@ err:
"position %llu; GTID position %s",
IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
+ sql_print_information("master was %s:%d", mi->host, mi->port);
}
repl_semisync_slave.slave_stop(mi);
thd->reset_query();
@@ -5672,6 +5526,7 @@ pthread_handler_t handle_slave_sql(void *arg)
sql_print_information("Slave SQL thread exiting, replication stopped in "
"log '%s' at position %llu%s", RPL_LOG_NAME,
rli->group_master_log_pos, tmp.c_ptr_safe());
+ sql_print_information("master was %s:%d", mi->host, mi->port);
}
#ifdef WITH_WSREP
wsrep_after_command_before_result(thd);