summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log_event.cc7
-rw-r--r--sql/rpl_parallel.cc33
-rw-r--r--sql/rpl_parallel.h3
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/slave.cc15
6 files changed, 52 insertions, 10 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index d0cfa799d3c..361efe4428e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6649,7 +6649,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
- Relay_log_info const *rli= rgi->rli;
+ Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli);
int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
@@ -6669,10 +6669,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
char str_buf[128];
String str(str_buf, sizeof(str_buf), system_charset_info);
- const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
+ rli->until_gtid_pos.to_string(&str);
sql_print_information("Slave SQL thread stops because it reached its"
" UNTIL master_gtid_pos %s", str.c_ptr_safe());
- const_cast<Relay_log_info*>(rli)->abort_slave= true;
+ rli->abort_slave= true;
+ rli->stop_for_until= true;
}
return ret;
}
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index eab5b980c02..cceb6013d6c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
rgi->is_error= true;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
+ rgi->rli->stop_for_until= false;
mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
rgi->rli->relay_log.signal_update();
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done(THD *thd)
+rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
{
struct rpl_parallel_entry *e;
rpl_parallel_thread *rpt;
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
started executing yet. So we set e->stop_count here and use it to
decide in the worker threads whether to continue executing an event
group or whether to skip it, when force_abort is set.
+
+ If we stop due to reaching the START SLAVE UNTIL condition, then we
+ need to continue executing any queued events up to that point.
*/
e->force_abort= true;
- e->stop_count= e->count_committing_event_groups;
+ e->stop_count= rli->stop_for_until ?
+ e->count_queued_event_groups : e->count_committing_event_groups;
mysql_mutex_unlock(&e->LOCK_parallel_entry);
for (j= 0; j < e->rpl_thread_max; ++j)
{
@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
}
+/*
+ This function handles the case where the SQL driver thread reached the
+ START SLAVE UNTIL position; we stop queueing more events but continue
+ processing remaining, already queued events; then use executes manual
+ STOP SLAVE; then this function signals to worker threads that they
+ should stop the processing of any remaining queued events.
+*/
+void
+rpl_parallel::stop_during_until()
+{
+ struct rpl_parallel_entry *e;
+ uint32 i;
+
+ for (i= 0; i < domain_hash.records; ++i)
+ {
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ if (e->force_abort)
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ }
+}
+
+
bool
rpl_parallel::workers_idle()
{
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 90649230f98..31a6a035dd8 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -222,7 +222,8 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done(THD *thd);
+ void wait_for_done(THD *thd, Relay_log_info *rli);
+ void stop_during_until();
bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index ffe5e516069..26776bb46f3 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
- inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
+ inited(0), abort_slave(0), stop_for_until(0),
+ slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0)
{
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 0ba259b0efd..6db4ce5d61b 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -262,6 +262,7 @@ public:
*/
volatile bool inited;
volatile bool abort_slave;
+ volatile bool stop_for_until;
volatile uint slave_running;
/*
diff --git a/sql/slave.cc b/sql/slave.cc
index b081a3369f5..29515fb3821 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- mi->rli.abort_slave=1;
+ if (opt_slave_parallel_threads > 0 &&
+ mi->rli.abort_slave && mi->rli.stop_for_until)
+ {
+ mi->rli.stop_for_until= false;
+ mi->rli.parallel.stop_during_until();
+ }
+ else
+ mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
&mi->rli.stop_cond,
&mi->rli.slave_running,
@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
message about error in query execution to be printed.
*/
rli->abort_slave= 1;
+ rli->stop_for_until= true;
mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
@@ -4356,6 +4364,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Seconds_Behind_Master grows. No big deal.
*/
rli->abort_slave = 0;
+ rli->stop_for_until= false;
mysql_mutex_unlock(&rli->run_lock);
mysql_cond_broadcast(&rli->start_cond);
@@ -4526,7 +4535,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done(thd);
+ rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */
{
@@ -4552,7 +4561,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done(thd);
+ rli->parallel.wait_for_done(thd, rli);
/*
Some events set some playgrounds, which won't be cleared because thread