summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc275
1 files changed, 260 insertions, 15 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 965480c57a3..0f8c69f6a68 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -287,6 +287,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_count;
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
if (!gco->installed)
{
group_commit_orderer *prev_gco= gco->prev_gco;
@@ -343,6 +345,214 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
}
+static void
+do_ftwrl_wait(rpl_group_info *rgi,
+ bool *did_enter_cond, PSI_stage_info *old_stage)
+{
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ uint64 sub_id= rgi->gtid_sub_id;
+
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+
+ /*
+ If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this
+ transaction is later than transactions that have priority to complete
+ before FTWRL. If so, wait here so that FTWRL can proceed and complete
+ first.
+
+ (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes
+ this test false as required).
+ */
+ if (unlikely(sub_id > entry->pause_sub_id))
+ {
+ thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl, old_stage);
+ *did_enter_cond= true;
+ do
+ {
+ if (entry->force_abort || rgi->worker_error)
+ break;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
+ break;
+ }
+ mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
+ } while (sub_id > entry->pause_sub_id);
+
+ /*
+ We do not call EXIT_COND() here, as this will be done later by our
+ caller (since we set *did_enter_cond to true).
+ */
+ }
+
+ if (sub_id > entry->largest_started_sub_id)
+ entry->largest_started_sub_id= sub_id;
+}
+
+
+static int
+pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
+{
+ PSI_stage_info old_stage;
+ int res= 0;
+
+ /*
+ Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
+ READ LOCK work correctly, without incuring extra locking penalties in
+ normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
+ thread pool, and for this we need to make sure the pool will not go away
+ during the operation. The LOCK_rpl_thread_pool is not suitable for
+ this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
+ must be released before locking any LOCK_rpl_thread lock, or a deadlock
+ can occur.
+
+ So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
+ pool size changes with this condition wait.
+ */
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (thd)
+ thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
+ &stage_waiting_for_rpl_thread_pool, &old_stage);
+ while (pool->busy)
+ {
+ if (thd && thd->check_killed())
+ {
+ thd->send_kill_message();
+ res= 1;
+ break;
+ }
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ }
+ if (!res)
+ pool->busy= true;
+ if (thd)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+
+ return res;
+}
+
+
+static void
+pool_mark_not_busy(rpl_parallel_thread_pool *pool)
+{
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ DBUG_ASSERT(pool->busy);
+ pool->busy= false;
+ mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+}
+
+
+void
+rpl_unpause_after_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+
+ DBUG_ASSERT(pool->busy);
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ rpt->pause_for_ftwrl = false;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ mysql_cond_broadcast(&e->COND_parallel_entry);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ }
+
+ pool_mark_not_busy(pool);
+}
+
+
+/*
+ .
+
+ Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called.
+*/
+int
+rpl_pause_for_ftwrl(THD *thd)
+{
+ uint32 i;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+ int err;
+
+ /*
+ While the count_pending_pause_for_ftwrl counter is non-zero, the pool
+ cannot be shutdown/resized, so threads are guaranteed to not disappear.
+
+ This is required to safely be able to access the individual threads below.
+ (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
+ as this can deadlock against release_thread()).
+ */
+ if ((err= pool_mark_busy(pool, thd)))
+ return err;
+
+ for (i= 0; i < pool->count; ++i)
+ {
+ PSI_stage_info old_stage;
+ rpl_parallel_entry *e;
+ rpl_parallel_thread *rpt= pool->threads[i];
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (!rpt->current_owner)
+ {
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ continue;
+ }
+ e= rpt->current_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not
+ de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl().
+ */
+ rpt->pause_for_ftwrl = true;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ ++e->need_sub_id_signal;
+ if (e->pause_sub_id == (uint64)ULONGLONG_MAX)
+ e->pause_sub_id= e->largest_started_sub_id;
+ thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
+ &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
+ while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
+ e->last_committed_sub_id < e->pause_sub_id &&
+ !err)
+ {
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ err= 1;
+ break;
+ }
+ mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
+ };
+ --e->need_sub_id_signal;
+ thd->EXIT_COND(&old_stage);
+ if (err)
+ break;
+ }
+
+ if (err)
+ rpl_unpause_after_ftwrl(thd);
+ return err;
+}
+
+
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -1037,17 +1247,40 @@ handle_rpl_parallel_thread(void *arg)
*/
rpt->batch_free();
- if ((events= rpt->event_queue) != NULL)
+ for (;;)
{
+ if ((events= rpt->event_queue) != NULL)
+ {
+ /*
+ Take next group of events from the replication pool.
+ This is faster than having to wakeup the pool manager thread to give
+ us a new event.
+ */
+ rpt->dequeue1(events);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ goto more_events;
+ }
+ if (!rpt->pause_for_ftwrl ||
+ (in_event_group && !group_rgi->parallel_entry->force_abort))
+ break;
/*
- Take next group of events from the replication pool.
- This is faster than having to wakeup the pool manager thread to give us
- a new event.
+ We are currently in the delicate process of pausing parallel
+ replication while FLUSH TABLES WITH READ LOCK is starting. We must
+ not de-allocate the thread (setting rpt->current_owner= NULL) until
+ rpl_unpause_after_ftwrl() has woken us up.
*/
- rpt->dequeue1(events);
+ mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- goto more_events;
+ mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
+ &rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /*
+ Now loop to check again for more events available, since we released
+ and re-aquired the LOCK_rpl_thread mutex.
+ */
}
+
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
@@ -1124,6 +1357,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
rpl_parallel_thread *rpt_array= NULL;
+ int res;
+
+ if ((res= pool_mark_busy(pool, current_thd)))
+ return res;
/*
Allocate the new list of threads up-front.
@@ -1172,7 +1409,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
+ rpl_parallel_thread *rpt;
+
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ while ((rpt= pool->free_list) == NULL)
+ mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
+ pool->free_list= rpt->next;
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -1222,9 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
+ pool_mark_not_busy(pool);
return 0;
@@ -1248,6 +1490,7 @@ err:
}
my_free(new_list);
}
+ pool_mark_not_busy(pool);
return 1;
}
@@ -1511,7 +1754,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : count(0), threads(0), free_list(0), inited(false)
+ : threads(0), free_list(0), count(0), inited(false), busy(false)
{
}
@@ -1519,9 +1762,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
int
rpl_parallel_thread_pool::init(uint32 size)
{
- count= 0;
threads= NULL;
free_list= NULL;
+ count= 0;
+ busy= false;
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
@@ -1562,7 +1806,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
rpl_parallel_thread *rpt;
mysql_mutex_lock(&LOCK_rpl_thread_pool);
- while ((rpt= free_list) == NULL)
+ while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
@@ -1773,6 +2017,7 @@ rpl_parallel::find(uint32 domain_id)
e->rpl_thread_max= count;
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
+ e->pause_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -1974,7 +2219,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
- e->need_sub_id_signal= true;
+ ++e->need_sub_id_signal;
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)
@@ -1987,7 +2232,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
}
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
- e->need_sub_id_signal= false;
+ --e->need_sub_id_signal;
thd->EXIT_COND(&old_stage);
if (err)
return err;