summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2015-10-22 11:18:34 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2015-11-13 14:02:15 +0100
commitba02550166eb39c0375a6422ecaa4731421250b6 (patch)
treecdc1a7f94a64fb0250883dcf50bcd85e5c91d6fb /sql/rpl_parallel.cc
parent6d96fab7dd9ce6f384eebc9b73a9128a0b0ca1aa (diff)
downloadmariadb-git-ba02550166eb39c0375a6422ecaa4731421250b6.tar.gz
MDEV-7818: Deadlock occurring with parallel replication and FTWRL
Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from starting new commits, then waits for running commits to complete. But in-order parallel replication needs commits to happen in a particular order, so this can easily deadlock. To fix this problem, this patch introduces a way to temporarily pause the parallel replication worker threads. Before starting FTWRL, we let all worker threads complete in-progress transactions, and then wait. Then we proceed to take the global read lock. Once the lock is obtained, we unpause the worker threads. Now commits are blocked from starting by the global read lock, so the deadlock will no longer occur.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc275
1 files changed, 260 insertions, 15 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 965480c57a3..0f8c69f6a68 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -287,6 +287,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_count;
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
if (!gco->installed)
{
group_commit_orderer *prev_gco= gco->prev_gco;
@@ -343,6 +345,214 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
}
+static void
+do_ftwrl_wait(rpl_group_info *rgi,
+ bool *did_enter_cond, PSI_stage_info *old_stage)
+{
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 sub_id= rgi->gtid_sub_id;
+
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
+ /*
+ If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
+ transaction is later than transactions that have priority to complete
+ before FTWRL. If so, wait here so that FTWRL can proceed and complete
+ first.
+
+ (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
+ this test false as required).
+ */
+ if (unlikely(sub_id > entry->pause_sub_id))
+ {
+ thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl, old_stage);
+ *did_enter_cond= true;
+ do
+ {
+ if (entry->force_abort || rgi->worker_error)
+ break;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ break;
+ }
+ mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
+ } while (sub_id > entry->pause_sub_id);
+
+ /*
+ We do not call EXIT_COND() here, as this will be done later by our
+ caller (since we set *did_enter_cond to true).
+ */
+ }
+
+ if (sub_id > entry->largest_started_sub_id)
+ entry->largest_started_sub_id= sub_id;
+}
+
+
+static int
+pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
+{
+ PSI_stage_info old_stage;
+ int res= 0;
+
+ /*
+ Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
+ READ LOCK work correctly, without incuring extra locking penalties in
+ normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
+ thread pool, and for this we need to make sure the pool will not go away
+ during the operation. The LOCK_rpl_thread_pool is not suitable for
+ this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
+ must be released before locking any LOCK_rpl_thread lock, or a deadlock
+ can occur.
+
+ So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
+ pool size changes with this condition wait.
+ */
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (thd)
+ thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
+ &stage_waiting_for_rpl_thread_pool, &old_stage);
+ while (pool->busy)
+ {
+ if (thd && thd->check_killed())
+ {
+ thd->send_kill_message();
+ res= 1;
+ break;
+ }
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ }
+ if (!res)
+ pool->busy= true;
+ if (thd)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+
+ return res;
+}
+
+
+static void
+pool_mark_not_busy(rpl_parallel_thread_pool *pool)
+{
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ DBUG_ASSERT(pool->busy);
+ pool->busy= false;
+ mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+}
+
+
+void
+rpl_unpause_after_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+
+ DBUG_ASSERT(pool->busy);
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ rpt->pause_for_ftwrl = false;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ mysql_cond_broadcast(&e->COND_parallel_entry);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ }
+
+ pool_mark_not_busy(pool);
+}
+
+
+/*
+ .
+
+ Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
+*/
+int
+rpl_pause_for_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+ int err;
+
+ /*
+ While the count_pending_pause_for_ftwrl counter is non-zero, the pool
+ cannot be shutdown/resized, so threads are guaranteed to not disappear.
+
+ This is required to safely be able to access the individual threads below.
+ (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
+ as this can deadlock against release_thread()).
+ */
+ if ((err= pool_mark_busy(pool, thd)))
+ return err;
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ PSI_stage_info old_stage;
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
+ de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
+ */
+ rpt->pause_for_ftwrl = true;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ ++e->need_sub_id_signal;
+ if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
+ e->pause_sub_id= e->largest_started_sub_id;
+ thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
+ while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
+ e->last_committed_sub_id < e->pause_sub_id &&
+ !err)
+ {
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ err= 1;
+ break;
+ }
+ mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
+ };
+ --e->need_sub_id_signal;
+ thd->EXIT_COND(&old_stage);
+ if (err)
+ break;
+ }
+
+ if (err)
+ rpl_unpause_after_ftwrl(thd);
+ return err;
+}
+
+
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -1037,17 +1247,40 @@ handle_rpl_parallel_thread(void *arg)
*/
rpt->batch_free();
- if ((events= rpt->event_queue) != NULL)
+ for (;;)
{
+ if ((events= rpt->event_queue) != NULL)
+ {
+ /*
+ Take next group of events from the replication pool.
+ This is faster than having to wakeup the pool manager thread to give
+ us a new event.
+ */
+ rpt->dequeue1(events);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ goto more_events;
+ }
+ if (!rpt->pause_for_ftwrl ||
+ (in_event_group && !group_rgi->parallel_entry->force_abort))
+ break;
/*
- Take next group of events from the replication pool.
- This is faster than having to wakeup the pool manager thread to give us
- a new event.
+ We are currently in the delicate process of pausing parallel
+ replication while FLUSH TABLES WITH READ LOCK is starting. We must
+ not de-allocate the thread (setting rpt->current_owner= NULL) until
+ rpl_unpause_after_ftwrl() has woken us up.
*/
- rpt->dequeue1(events);
+ mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- goto more_events;
+ mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
+ &rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /*
+ Now loop to check again for more events available, since we released
+ and re-aquired the LOCK_rpl_thread mutex.
+ */
}
+
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
@@ -1124,6 +1357,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
rpl_parallel_thread *rpt_array= NULL;
+ int res;
+
+ if ((res= pool_mark_busy(pool, current_thd)))
+ return res;
/*
Allocate the new list of threads up-front.
@@ -1172,7 +1409,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
+ rpl_parallel_thread *rpt;
+
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ while ((rpt= pool->free_list) == NULL)
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ pool->free_list= rpt->next;
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -1222,9 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ pool_mark_not_busy(pool);
return 0;
@@ -1248,6 +1490,7 @@ err:
}
my_free(new_list);
}
+ pool_mark_not_busy(pool);
return 1;
}
@@ -1511,7 +1754,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : count(0), threads(0), free_list(0), inited(false)
+ : threads(0), free_list(0), count(0), inited(false), busy(false)
{
}
@@ -1519,9 +1762,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
int
rpl_parallel_thread_pool::init(uint32 size)
{
- count= 0;
threads= NULL;
free_list= NULL;
+ count= 0;
+ busy= false;
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
@@ -1562,7 +1806,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
rpl_parallel_thread *rpt;
mysql_mutex_lock(&LOCK_rpl_thread_pool);
- while ((rpt= free_list) == NULL)
+ while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
@@ -1773,6 +2017,7 @@ rpl_parallel::find(uint32 domain_id)
e->rpl_thread_max= count;
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -1974,7 +2219,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
- e->need_sub_id_signal= true;
+ ++e->need_sub_id_signal;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)
@@ -1987,7 +2232,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
- e->need_sub_id_signal= false;
+ --e->need_sub_id_signal;
thd->EXIT_COND(&old_stage);
if (err)
return err;