diff options
-rw-r--r-- | sql/log_event.cc | 7 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 33 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 3 | ||||
-rw-r--r-- | sql/rpl_rli.h | 1 | ||||
-rw-r--r-- | sql/slave.cc | 15 |
6 files changed, 52 insertions, 10 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index d0cfa799d3c..361efe4428e 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6649,7 +6649,7 @@ Gtid_list_log_event::write(IO_CACHE *file) int Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { - Relay_log_info const *rli= rgi->rli; + Relay_log_info *rli= const_cast<Relay_log_info*>(rgi->rli); int ret; if (gl_flags & FLAG_IGN_GTIDS) { @@ -6669,10 +6669,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { char str_buf[128]; String str(str_buf, sizeof(str_buf), system_charset_info); - const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str); + rli->until_gtid_pos.to_string(&str); sql_print_information("Slave SQL thread stops because it reached its" " UNTIL master_gtid_pos %s", str.c_ptr_safe()); - const_cast<Relay_log_info*>(rli)->abort_slave= true; + rli->abort_slave= true; + rli->stop_for_until= true; } return ret; } diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index eab5b980c02..cceb6013d6c 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) rgi->is_error= true; rgi->cleanup_context(thd, true); rgi->rli->abort_slave= true; + rgi->rli->stop_for_until= false; mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); rgi->rli->relay_log.signal_update(); @@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id) void -rpl_parallel::wait_for_done(THD *thd) +rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) { struct rpl_parallel_entry *e; rpl_parallel_thread *rpt; @@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd) started executing yet. So we set e->stop_count here and use it to decide in the worker threads whether to continue executing an event group or whether to skip it, when force_abort is set. + + If we stop due to reaching the START SLAVE UNTIL condition, then we + need to continue executing any queued events up to that point. */ e->force_abort= true; - e->stop_count= e->count_committing_event_groups; + e->stop_count= rli->stop_for_until ? + e->count_queued_event_groups : e->count_committing_event_groups; mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { @@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd) } +/* + This function handles the case where the SQL driver thread reached the + START SLAVE UNTIL position; we stop queueing more events but continue + processing remaining, already queued events; then use executes manual + STOP SLAVE; then this function signals to worker threads that they + should stop the processing of any remaining queued events. +*/ +void +rpl_parallel::stop_during_until() +{ + struct rpl_parallel_entry *e; + uint32 i; + + for (i= 0; i < domain_hash.records; ++i) + { + e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + if (e->force_abort) + e->stop_count= e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } +} + + bool rpl_parallel::workers_idle() { diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 90649230f98..31a6a035dd8 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -222,7 +222,8 @@ struct rpl_parallel { ~rpl_parallel(); void reset(); rpl_parallel_entry *find(uint32 domain_id); - void wait_for_done(THD *thd); + void wait_for_done(THD *thd, Relay_log_info *rli); + void stop_during_until(); bool workers_idle(); bool do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index ffe5e516069..26776bb46f3 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), - inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), + inited(0), abort_slave(0), stop_for_until(0), + slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), m_flags(0) { diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 0ba259b0efd..6db4ce5d61b 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -262,6 +262,7 @@ public: */ volatile bool inited; volatile bool abort_slave; + volatile bool stop_for_until; volatile uint slave_running; /* diff --git a/sql/slave.cc b/sql/slave.cc index b081a3369f5..29515fb3821 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -615,7 +615,14 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - mi->rli.abort_slave=1; + if (opt_slave_parallel_threads > 0 && + mi->rli.abort_slave && mi->rli.stop_for_until) + { + mi->rli.stop_for_until= false; + mi->rli.parallel.stop_during_until(); + } + else + mi->rli.abort_slave=1; if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, &mi->rli.stop_cond, &mi->rli.slave_running, @@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, message about error in query execution to be printed. */ rli->abort_slave= 1; + rli->stop_for_until= true; mysql_mutex_unlock(&rli->data_lock); delete ev; DBUG_RETURN(1); @@ -4356,6 +4364,7 @@ pthread_handler_t handle_slave_sql(void *arg) Seconds_Behind_Master grows. No big deal. */ rli->abort_slave = 0; + rli->stop_for_until= false; mysql_mutex_unlock(&rli->run_lock); mysql_cond_broadcast(&rli->start_cond); @@ -4526,7 +4535,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, } if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(thd); + rli->parallel.wait_for_done(thd, rli); /* Thread stopped. Print the current replication position to the log */ { @@ -4552,7 +4561,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, get the correct position printed.) */ if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(thd); + rli->parallel.wait_for_done(thd, rli); /* Some events set some playgrounds, which won't be cleared because thread |