summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2015-11-13 14:24:40 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2015-11-13 14:24:40 +0100
commit8f2e05f41cf3b92bdb4c409e542ef126d5fe8f95 (patch)
tree701b5599a3c9478e125535580e294f8b627f2834 /sql
parent2828c2be554b62646fc990ac28b4aef20cd9b9d2 (diff)
parentba02550166eb39c0375a6422ecaa4731421250b6 (diff)
downloadmariadb-git-8f2e05f41cf3b92bdb4c409e542ef126d5fe8f95.tar.gz
Merge branch 'mdev7818-4' into 10.1
Conflicts: mysql-test/suite/perfschema/r/stage_mdl_global.result sql/rpl_rli.cc sql/sql_parse.cc
Diffstat (limited to 'sql')
-rw-r--r--sql/mysqld.cc3
-rw-r--r--sql/mysqld.h3
-rw-r--r--sql/rpl_parallel.cc416
-rw-r--r--sql/rpl_parallel.h38
-rw-r--r--sql/rpl_rli.cc13
-rw-r--r--sql/rpl_rli.h7
-rw-r--r--sql/slave.cc7
-rw-r--r--sql/sql_parse.cc13
8 files changed, 415 insertions, 85 deletions
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 932bb0c20ba..f9a3c2a0b3a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -10117,6 +10117,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0};
+PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0};
+PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0};
+PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0};
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 29164c7629e..081d89c48a4 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -486,6 +486,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit;
extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_waiting_for_workers_idle;
+extern PSI_stage_info stage_waiting_for_ftwrl;
+extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause;
+extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index d5020dd4cba..526c3ec7a92 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
+ if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
+ (ev->when == 0)))
+ rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
@@ -271,6 +274,284 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
}
+/*
+ Do not start parallel execution of this event group until all prior groups
+ have reached the commit phase that are not safe to run in parallel with.
+*/
+static bool
+do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
+ bool *did_enter_cond, PSI_stage_info *old_stage)
+{
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 wait_count;
+
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
+ if (!gco->installed)
+ {
+ group_commit_orderer *prev_gco= gco->prev_gco;
+ if (prev_gco)
+ {
+ prev_gco->last_sub_id= gco->prior_sub_id;
+ prev_gco->next_gco= gco;
+ }
+ gco->installed= true;
+ }
+ wait_count= gco->wait_count;
+ if (wait_count > entry->count_committing_event_groups)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ thd->ENTER_COND(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry,
+ &stage_waiting_for_prior_transaction_to_start_commit,
+ old_stage);
+ *did_enter_cond= true;
+ do
+ {
+ if (thd->check_killed() && !rgi->worker_error)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ thd->send_kill_message();
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ /*
+ Even though we were killed, we need to continue waiting for the
+ prior event groups to signal that we can continue. Otherwise we
+ mess up the accounting for ordering. However, now that we have
+ marked the error, events will just be skipped rather than
+ executed, and things will progress quickly towards stop.
+ */
+ }
+ mysql_cond_wait(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry);
+ } while (wait_count > entry->count_committing_event_groups);
+ }
+
+ if (entry->force_abort && wait_count > entry->stop_count)
+ {
+ /*
+ We are stopping (STOP SLAVE), and this event group is beyond the point
+ where we can safely stop. So return a flag that will cause us to skip,
+ rather than execute, the following events.
+ */
+ return true;
+ }
+ else
+ return false;
+}
+
+
+static void
+do_ftwrl_wait(rpl_group_info *rgi,
+ bool *did_enter_cond, PSI_stage_info *old_stage)
+{
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 sub_id= rgi->gtid_sub_id;
+
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
+ /*
+ If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
+ transaction is later than transactions that have priority to complete
+ before FTWRL. If so, wait here so that FTWRL can proceed and complete
+ first.
+
+ (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
+ this test false as required).
+ */
+ if (unlikely(sub_id > entry->pause_sub_id))
+ {
+ thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl, old_stage);
+ *did_enter_cond= true;
+ do
+ {
+ if (entry->force_abort || rgi->worker_error)
+ break;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ break;
+ }
+ mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
+ } while (sub_id > entry->pause_sub_id);
+
+ /*
+ We do not call EXIT_COND() here, as this will be done later by our
+ caller (since we set *did_enter_cond to true).
+ */
+ }
+
+ if (sub_id > entry->largest_started_sub_id)
+ entry->largest_started_sub_id= sub_id;
+}
+
+
+static int
+pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
+{
+ PSI_stage_info old_stage;
+ int res= 0;
+
+ /*
+ Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
+ READ LOCK work correctly, without incuring extra locking penalties in
+ normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
+ thread pool, and for this we need to make sure the pool will not go away
+ during the operation. The LOCK_rpl_thread_pool is not suitable for
+ this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
+ must be released before locking any LOCK_rpl_thread lock, or a deadlock
+ can occur.
+
+ So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
+ pool size changes with this condition wait.
+ */
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (thd)
+ thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
+ &stage_waiting_for_rpl_thread_pool, &old_stage);
+ while (pool->busy)
+ {
+ if (thd && thd->check_killed())
+ {
+ thd->send_kill_message();
+ res= 1;
+ break;
+ }
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ }
+ if (!res)
+ pool->busy= true;
+ if (thd)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+
+ return res;
+}
+
+
+static void
+pool_mark_not_busy(rpl_parallel_thread_pool *pool)
+{
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ DBUG_ASSERT(pool->busy);
+ pool->busy= false;
+ mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+}
+
+
+void
+rpl_unpause_after_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+
+ DBUG_ASSERT(pool->busy);
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ rpt->pause_for_ftwrl = false;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ mysql_cond_broadcast(&e->COND_parallel_entry);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ }
+
+ pool_mark_not_busy(pool);
+}
+
+
+/*
+ .
+
+ Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
+*/
+int
+rpl_pause_for_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+ int err;
+
+ /*
+ While the count_pending_pause_for_ftwrl counter is non-zero, the pool
+ cannot be shutdown/resized, so threads are guaranteed to not disappear.
+
+ This is required to safely be able to access the individual threads below.
+ (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
+ as this can deadlock against release_thread()).
+ */
+ if ((err= pool_mark_busy(pool, thd)))
+ return err;
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ PSI_stage_info old_stage;
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
+ de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
+ */
+ rpt->pause_for_ftwrl = true;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ ++e->need_sub_id_signal;
+ if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
+ e->pause_sub_id= e->largest_started_sub_id;
+ thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
+ while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
+ e->last_committed_sub_id < e->pause_sub_id &&
+ !err)
+ {
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ err= 1;
+ break;
+ }
+ mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
+ };
+ --e->need_sub_id_signal;
+ thd->EXIT_COND(&old_stage);
+ if (err)
+ break;
+ }
+
+ if (err)
+ rpl_unpause_after_ftwrl(thd);
+ return err;
+}
+
+
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -792,7 +1073,6 @@ handle_rpl_parallel_thread(void *arg)
{
bool did_enter_cond= false;
PSI_stage_info old_stage;
- uint64 wait_count;
DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", {
if (rgi->current_gtid.domain_id == 0 &&
@@ -831,72 +1111,19 @@ handle_rpl_parallel_thread(void *arg)
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage);
+
+ if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ skip_event_group= true;
+ if (likely(!skip_event_group))
+ do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
+
/*
Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already
occured.
-
- Also do not start parallel execution of this event group until all
- prior groups have reached the commit phase that are not safe to run
- in parallel with.
*/
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (!gco->installed)
- {
- group_commit_orderer *prev_gco= gco->prev_gco;
- if (prev_gco)
- {
- prev_gco->last_sub_id= gco->prior_sub_id;
- prev_gco->next_gco= gco;
- }
- gco->installed= true;
- }
- wait_count= gco->wait_count;
- if (wait_count > entry->count_committing_event_groups)
- {
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
- thd->ENTER_COND(&gco->COND_group_commit_orderer,
- &entry->LOCK_parallel_entry,
- &stage_waiting_for_prior_transaction_to_start_commit,
- &old_stage);
- did_enter_cond= true;
- do
- {
- if (thd->check_killed() && !rgi->worker_error)
- {
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
- thd->clear_error();
- thd->get_stmt_da()->reset_diagnostics_area();
- thd->send_kill_message();
- slave_output_error_info(rgi, thd);
- signal_error_to_sql_driver_thread(thd, rgi, 1);
- /*
- Even though we were killed, we need to continue waiting for the
- prior event groups to signal that we can continue. Otherwise we
- mess up the accounting for ordering. However, now that we have
- marked the error, events will just be skipped rather than
- executed, and things will progress quickly towards stop.
- */
- }
- mysql_cond_wait(&gco->COND_group_commit_orderer,
- &entry->LOCK_parallel_entry);
- } while (wait_count > entry->count_committing_event_groups);
- }
-
- if (entry->force_abort && wait_count > entry->stop_count)
- {
- /*
- We are stopping (STOP SLAVE), and this event group is beyond the
- point where we can safely stop. So set a flag that will cause us
- to skip, rather than execute, the following events.
- */
- skip_event_group= true;
- }
- else
- skip_event_group= false;
-
- if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
- skip_event_group= true;
register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
@@ -1060,17 +1287,40 @@ handle_rpl_parallel_thread(void *arg)
*/
rpt->batch_free();
- if ((events= rpt->event_queue) != NULL)
+ for (;;)
{
+ if ((events= rpt->event_queue) != NULL)
+ {
+ /*
+ Take next group of events from the replication pool.
+ This is faster than having to wakeup the pool manager thread to give
+ us a new event.
+ */
+ rpt->dequeue1(events);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ goto more_events;
+ }
+ if (!rpt->pause_for_ftwrl ||
+ (in_event_group && !group_rgi->parallel_entry->force_abort))
+ break;
/*
- Take next group of events from the replication pool.
- This is faster than having to wakeup the pool manager thread to give us
- a new event.
+ We are currently in the delicate process of pausing parallel
+ replication while FLUSH TABLES WITH READ LOCK is starting. We must
+ not de-allocate the thread (setting rpt->current_owner= NULL) until
+ rpl_unpause_after_ftwrl() has woken us up.
*/
- rpt->dequeue1(events);
+ mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- goto more_events;
+ mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
+ &rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /*
+ Now loop to check again for more events available, since we released
+ and re-aquired the LOCK_rpl_thread mutex.
+ */
}
+
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
@@ -1148,6 +1398,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
rpl_parallel_thread *rpt_array= NULL;
+ int res;
+
+ if ((res= pool_mark_busy(pool, current_thd)))
+ return res;
/*
Allocate the new list of threads up-front.
@@ -1196,7 +1450,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
+ rpl_parallel_thread *rpt;
+
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ while ((rpt= pool->free_list) == NULL)
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ pool->free_list= rpt->next;
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -1250,9 +1511,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ pool_mark_not_busy(pool);
return 0;
@@ -1276,6 +1535,7 @@ err:
}
my_free(new_list);
}
+ pool_mark_not_busy(pool);
return 1;
}
@@ -1538,7 +1798,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : count(0), threads(0), free_list(0), inited(false)
+ : threads(0), free_list(0), count(0), inited(false), busy(false)
{
}
@@ -1546,9 +1806,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
int
rpl_parallel_thread_pool::init(uint32 size)
{
- count= 0;
threads= NULL;
free_list= NULL;
+ count= 0;
+ busy= false;
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
@@ -1589,7 +1850,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
rpl_parallel_thread *rpt;
mysql_mutex_lock(&LOCK_rpl_thread_pool);
- while ((rpt= free_list) == NULL)
+ while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
@@ -1800,6 +2061,7 @@ rpl_parallel::find(uint32 domain_id)
e->rpl_thread_max= count;
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -2001,7 +2263,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
- e->need_sub_id_signal= true;
+ ++e->need_sub_id_signal;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)
@@ -2014,7 +2276,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
- e->need_sub_id_signal= false;
+ --e->need_sub_id_signal;
thd->EXIT_COND(&old_stage);
if (err)
return err;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 0b6c0b460d0..6c1a650429b 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -93,6 +93,7 @@ struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
+ bool pause_for_ftwrl;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
mysql_cond_t COND_rpl_thread_queue;
@@ -222,12 +223,18 @@ struct rpl_parallel_thread {
struct rpl_parallel_thread_pool {
- uint32 count;
struct rpl_parallel_thread **threads;
struct rpl_parallel_thread *free_list;
mysql_mutex_t LOCK_rpl_thread_pool;
mysql_cond_t COND_rpl_thread_pool;
+ uint32 count;
bool inited;
+ /*
+ While FTWRL runs, this counter is incremented to make SQL thread or
+ STOP/START slave not try to start new activity while that operation
+ is in progress.
+ */
+ bool busy;
rpl_parallel_thread_pool();
int init(uint32 size);
@@ -242,6 +249,12 @@ struct rpl_parallel_entry {
mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
uint32 domain_id;
+ /*
+ Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
+ that they are waiting, so that finish_event_group knows to signal them
+ when last_committed_sub_id is increased.
+ */
+ uint32 need_sub_id_signal;
uint64 last_commit_id;
bool active;
/*
@@ -251,12 +264,6 @@ struct rpl_parallel_entry {
*/
bool force_abort;
/*
- Set in wait_for_workers_idle() to show that it is waiting, so that
- finish_event_group knows to signal it when last_committed_sub_id is
- increased.
- */
- bool need_sub_id_signal;
- /*
At STOP SLAVE (force_abort=true), we do not want to process all events in
the queue (which could unnecessarily delay stop, if a lot of events happen
to be queued). The stop_count provides a safe point at which to stop, so
@@ -296,6 +303,15 @@ struct rpl_parallel_entry {
queued for execution by a worker thread.
*/
uint64 current_sub_id;
+ /*
+ The largest sub_id that has started its transaction. Protected by
+ LOCK_parallel_entry.
+
+ (Transactions can start out-of-order, so this value signifies that no
+ transactions with larger sub_id have started, but not necessarily that all
+ transactions with smaller sub_id have started).
+ */
+ uint64 largest_started_sub_id;
rpl_group_info *current_group_info;
/*
If we get an error in some event group, we set the sub_id of that event
@@ -305,6 +321,12 @@ struct rpl_parallel_entry {
The value is ULONGLONG_MAX when no error occured.
*/
uint64 stop_on_error_sub_id;
+ /*
+ During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
+ this value must not start, but wait until the global read lock is released.
+ The value is set to ULONGLONG_MAX when no FTWRL is pending.
+ */
+ uint64 pause_sub_id;
/* Total count of event groups queued so far. */
uint64 count_queued_event_groups;
/*
@@ -345,5 +367,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
+extern int rpl_pause_for_ftwrl(THD *thd);
+extern void rpl_unpause_after_ftwrl(THD *thd);
#endif /* RPL_PARALLEL_H */
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 7453f57e64c..7bdd9c1861f 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -998,6 +998,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
else if (group_master_log_pos < log_pos)
group_master_log_pos= log_pos;
}
+
+ /*
+ In the parallel case, we only update the Seconds_Behind_Master at the
+ end of a transaction. In the non-parallel case, the value is updated as
+ soon as an event is read from the relay log; however this would be too
+ confusing for the user, seeing the slave reported as up-to-date when
+ potentially thousands of events are still queued up for worker threads
+ waiting for execution.
+ */
+ if (rgi->last_master_timestamp &&
+ rgi->last_master_timestamp > last_master_timestamp)
+ last_master_timestamp= rgi->last_master_timestamp;
}
else
{
@@ -1614,6 +1626,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
long_find_row_note_printed= false;
did_mark_start_commit= false;
gtid_ev_flags2= 0;
+ last_master_timestamp = 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 377db469276..23ab1664a7b 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -669,6 +669,13 @@ struct rpl_group_info
char gtid_info_buf[5+10+1+10+1+20+1];
/*
+ The timestamp, from the master, of the commit event.
+ Used to do delayed update of rli->last_master_timestamp, for getting
+ reasonable values out of Seconds_Behind_Master in SHOW SLAVE STATUS.
+ */
+ time_t last_master_timestamp;
+
+ /*
Information to be able to re-try an event group in case of a deadlock or
other temporary error.
*/
diff --git a/sql/slave.cc b/sql/slave.cc
index fed171bcc4c..235b26f81ae 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3578,8 +3578,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
If it is an artificial event, or a relay log event (IO thread generated
event) or ev->when is set to 0, we don't update the
last_master_timestamp.
+
+ In parallel replication, we might queue a large number of events, and
+ the user might be surprised to see a claim that the slave is up to date
+ long before those queued events are actually executed.
*/
- if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
+ if (!rli->mi->using_parallel() &&
+ !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
{
rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
DBUG_ASSERT(rli->last_master_timestamp >= 0);
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index d6e5082bae6..b277f64ee2a 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -4811,6 +4811,17 @@ end_with_restore_list:
}
#endif /* WITH_WSREP*/
+ if (lex->type & REFRESH_READ_LOCK)
+ {
+ /*
+ We need to pause any parallel replication slave workers during FLUSH
+ TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as
+ worker threads eun run in arbitrary order but need to commit in a
+ specific given order.
+ */
+ if (rpl_pause_for_ftwrl(thd))
+ goto error;
+ }
/*
reload_acl_and_cache() will tell us if we are allowed to write to the
binlog or not.
@@ -4841,6 +4852,8 @@ end_with_restore_list:
if (!res)
my_ok(thd);
}
+ if (lex->type & REFRESH_READ_LOCK)
+ rpl_unpause_after_ftwrl(thd);
break;
}