summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc160
1 files changed, 92 insertions, 68 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index e031a424ea6..fbdb78b5c5d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -233,16 +233,14 @@ void init_thread_mask(int* mask,Master_info* mi,bool inverse)
/*
- lock_slave_threads()
+ lock_slave_threads() against other threads doing STOP, START or RESET SLAVE
+
*/
-void lock_slave_threads(Master_info* mi)
+void Master_info::lock_slave_threads()
{
DBUG_ENTER("lock_slave_threads");
-
- //TODO: see if we can do this without dual mutex
- mysql_mutex_lock(&mi->run_lock);
- mysql_mutex_lock(&mi->rli.run_lock);
+ mysql_mutex_lock(&start_stop_lock);
DBUG_VOID_RETURN;
}
@@ -251,13 +249,10 @@ void lock_slave_threads(Master_info* mi)
unlock_slave_threads()
*/
-void unlock_slave_threads(Master_info* mi)
+void Master_info::unlock_slave_threads()
{
DBUG_ENTER("unlock_slave_threads");
-
- //TODO: see if we can do this without dual mutex
- mysql_mutex_unlock(&mi->rli.run_lock);
- mysql_mutex_unlock(&mi->run_lock);
+ mysql_mutex_unlock(&start_stop_lock);
DBUG_VOID_RETURN;
}
@@ -470,7 +465,6 @@ int init_slave()
accepted. However bootstrap may conflict with us if it does START SLAVE.
So it's safer to take the lock.
*/
- mysql_mutex_lock(&LOCK_active_mi);
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
@@ -479,7 +473,6 @@ int init_slave()
if (!master_info_index || master_info_index->init_all_master_info())
{
sql_print_error("Failed to initialize multi master structures");
- mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(1);
}
if (!(active_mi= new Master_info(&default_master_connection_name,
@@ -524,13 +517,22 @@ int init_slave()
if (active_mi->host[0] && !opt_skip_slave_start)
{
- if (start_slave_threads(0, /* No active thd */
- 1 /* need mutex */,
- 0 /* no wait for start*/,
- active_mi,
- master_info_file,
- relay_log_info_file,
- SLAVE_IO | SLAVE_SQL))
+ int error;
+ THD *thd= new THD(next_thread_id());
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+
+ error= start_slave_threads(0, /* No active thd */
+ 1 /* need mutex */,
+ 1 /* wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_IO | SLAVE_SQL);
+
+ thd->reset_globals();
+ delete thd;
+ if (error)
{
sql_print_error("Failed to create slave threads");
goto err;
@@ -538,7 +540,6 @@ int init_slave()
}
end:
- mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error);
err:
@@ -711,6 +712,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (!mi->inited)
DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
+ int retval= 0;
mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock();
@@ -730,24 +732,18 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
+ retval= error;
mysql_mutex_lock(log_lock);
DBUG_PRINT("info",("Flushing relay-log info file."));
if (current_thd)
THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file);
- if (mi->rli.flush())
- DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
-
- if (my_sync(mi->rli.info_fd, MYF(MY_WME)))
- DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+ if (mi->rli.flush() || my_sync(mi->rli.info_fd, MYF(MY_WME)))
+ retval= ER_ERROR_DURING_FLUSH_LOGS;
mysql_mutex_unlock(log_lock);
}
- if (opt_slave_parallel_threads > 0 &&
- master_info_index &&// master_info_index is set to NULL on server shutdown
- !master_info_index->any_slave_sql_running())
- rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating IO thread"));
@@ -758,25 +754,26 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
+ if (!retval)
+ retval= error;
mysql_mutex_lock(log_lock);
DBUG_PRINT("info",("Flushing relay log and master info file."));
if (current_thd)
THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository);
- if (flush_master_info(mi, TRUE, FALSE))
- DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
-
+ if (likely(mi->fd >= 0))
+ {
+ if (flush_master_info(mi, TRUE, FALSE) || my_sync(mi->fd, MYF(MY_WME)))
+ retval= ER_ERROR_DURING_FLUSH_LOGS;
+ }
if (mi->rli.relay_log.is_open() &&
my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME)))
- DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
-
- if (my_sync(mi->fd, MYF(MY_WME)))
- DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
+ retval= ER_ERROR_DURING_FLUSH_LOGS;
mysql_mutex_unlock(log_lock);
}
- DBUG_RETURN(0);
+ DBUG_RETURN(retval);
}
@@ -939,6 +936,15 @@ int start_slave_thread(
mysql_mutex_unlock(start_lock);
DBUG_RETURN(ER_SLAVE_THREAD);
}
+
+ /*
+ In the following loop we can't check for thd->killed as we have to
+ wait until THD structures for the slave thread are created
+ before we can return.
+ This should be ok as there is no major work done in the slave
+ threads before they signal that we can stop waiting.
+ */
+
if (start_cond && cond_lock) // caller has cond_lock
{
THD* thd = current_thd;
@@ -956,16 +962,9 @@ int start_slave_thread(
registered, we could otherwise go waiting though thd->killed is
set.
*/
- if (!thd->killed)
- mysql_cond_wait(start_cond, cond_lock);
+ mysql_cond_wait(start_cond, cond_lock);
thd->EXIT_COND(& saved_stage);
mysql_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
- if (thd->killed)
- {
- if (start_lock)
- mysql_mutex_unlock(start_lock);
- DBUG_RETURN(thd->killed_errno());
- }
}
}
if (start_lock)
@@ -1054,10 +1053,7 @@ int start_slave_threads(THD *thd,
mi);
if (!error && (thread_mask & SLAVE_SQL))
{
- if (opt_slave_parallel_threads > 0)
- error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
- if (!error)
- error= start_slave_thread(
+ error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql,
#endif
@@ -1073,10 +1069,19 @@ int start_slave_threads(THD *thd,
/*
- Release slave threads at time of executing shutdown.
+ Kill slaves preparing for shutdown
+*/
- SYNOPSIS
- end_slave()
+void slave_prepare_for_shutdown()
+{
+ mysql_mutex_lock(&LOCK_active_mi);
+ master_info_index->free_connections();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ stop_slave_background_thread();
+}
+
+/*
+ Release slave threads at time of executing shutdown.
*/
void end_slave()
@@ -1094,7 +1099,10 @@ void end_slave()
startup parameter to the server was wrong.
*/
mysql_mutex_lock(&LOCK_active_mi);
- /* This will call terminate_slave_threads() on all connections */
+ /*
+ master_info_index should not have any threads anymore as they where
+ killed as part of slave_prepare_for_shutdown()
+ */
delete master_info_index;
master_info_index= 0;
active_mi= 0;
@@ -2878,7 +2886,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&mi->rli.data_lock);
+ /* err_lock is to protect mi->last_error() */
mysql_mutex_lock(&mi->err_lock);
+ /* err_lock is to protect mi->rli.last_error() */
mysql_mutex_lock(&mi->rli.err_lock);
protocol->store(mi->host, &my_charset_bin);
protocol->store(mi->user, &my_charset_bin);
@@ -4884,6 +4894,16 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
pthread_detach_this_thread();
+
+ if (opt_slave_parallel_threads > 0 &&
+ rpl_parallel_activate_pool(&global_rpl_thread_pool))
+ {
+ mysql_cond_broadcast(&rli->start_cond);
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
+ "Failed during parallel slave pool activation");
+ goto err_during_init;
+ }
+
if (init_slave_thread(thd, mi, SLAVE_THD_SQL))
{
/*
@@ -5184,8 +5204,15 @@ pthread_handler_t handle_slave_sql(void *arg)
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
ulong domain_count;
+ my_bool save_log_all_errors= thd->log_all_errors;
+ /*
+ We don't need to check return value for rli->flush()
+ as any errors should be logged to stderr
+ */
+ thd->log_all_errors= 1;
rli->flush();
+ thd->log_all_errors= save_log_all_errors;
if (mi->using_parallel())
{
/*
@@ -5296,17 +5323,7 @@ err_during_init:
DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
mysql_mutex_unlock(&rli->run_lock); // tell the world we are done
- /*
- Deactivate the parallel replication thread pool, if there are now no more
- SQL threads running. Do this here, when we have released all locks, but
- while our THD (and current_thd) is still valid.
- */
- mysql_mutex_lock(&LOCK_active_mi);
- if (opt_slave_parallel_threads > 0 &&
- master_info_index &&// master_info_index is set to NULL on server shutdown
- !master_info_index->any_slave_sql_running())
- rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
- mysql_mutex_unlock(&LOCK_active_mi);
+ rpl_parallel_resize_pool_if_no_slaves();
/* TODO: Check if this lock is needed */
mysql_mutex_lock(&LOCK_thread_count);
@@ -6519,6 +6536,7 @@ err:
void end_relay_log_info(Relay_log_info* rli)
{
+ mysql_mutex_t *log_lock;
DBUG_ENTER("end_relay_log_info");
if (!rli->inited)
@@ -6536,8 +6554,11 @@ void end_relay_log_info(Relay_log_info* rli)
rli->cur_log_fd = -1;
}
rli->inited = 0;
+ log_lock= rli->relay_log.get_log_lock();
+ mysql_mutex_lock(log_lock);
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+ mysql_mutex_unlock(log_lock);
/*
Delete the slave's temporary tables from memory.
In the future there will be other actions than this, to ensure persistance
@@ -6688,7 +6709,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
suppress_warnings= 0;
mi->report(ERROR_LEVEL, last_errno, NULL,
"error %s to master '%s@%s:%d'"
- " - retry-time: %d retries: %lu message: %s",
+ " - retry-time: %d maximum-retries: %lu message: %s",
(reconnect ? "reconnecting" : "connecting"),
mi->user, mi->host, mi->port,
mi->connect_retry, master_retry_count,
@@ -7181,9 +7202,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
}
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
- rli->flush();
+ if (rli->flush())
+ {
+ errmsg= "error flushing relay log";
+ goto err;
+ }
}
-
/*
Now we want to open this next log. To know if it's a hot log (the one
being written by the I/O thread now) or a cold log, we can use