diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 275 |
1 files changed, 260 insertions, 15 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 965480c57a3..0f8c69f6a68 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -287,6 +287,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_count; + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + if (!gco->installed) { group_commit_orderer *prev_gco= gco->prev_gco; @@ -343,6 +345,214 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, } +static void +do_ftwrl_wait(rpl_group_info *rgi, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 sub_id= rgi->gtid_sub_id; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + /* + If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this + transaction is later than transactions that have priority to complete + before FTWRL. If so, wait here so that FTWRL can proceed and complete + first. + + (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes + this test false as required). + */ + if (unlikely(sub_id > entry->pause_sub_id)) + { + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, + &stage_waiting_for_ftwrl, old_stage); + *did_enter_cond= true; + do + { + if (entry->force_abort || rgi->worker_error) + break; + if (thd->check_killed()) + { + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + break; + } + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + } while (sub_id > entry->pause_sub_id); + + /* + We do not call EXIT_COND() here, as this will be done later by our + caller (since we set *did_enter_cond to true). + */ + } + + if (sub_id > entry->largest_started_sub_id) + entry->largest_started_sub_id= sub_id; +} + + +static int +pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) +{ + PSI_stage_info old_stage; + int res= 0; + + /* + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH + READ LOCK work correctly, without incuring extra locking penalties in + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the + thread pool, and for this we need to make sure the pool will not go away + during the operation. The LOCK_rpl_thread_pool is not suitable for + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it + must be released before locking any LOCK_rpl_thread lock, or a deadlock + can occur. + + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and + pool size changes with this condition wait. + */ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (thd) + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, + &stage_waiting_for_rpl_thread_pool, &old_stage); + while (pool->busy) + { + if (thd && thd->check_killed()) + { + thd->send_kill_message(); + res= 1; + break; + } + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + } + if (!res) + pool->busy= true; + if (thd) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + + return res; +} + + +static void +pool_mark_not_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(pool->busy); + pool->busy= false; + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); +} + + +void +rpl_unpause_after_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + + DBUG_ASSERT(pool->busy); + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + rpt->pause_for_ftwrl = false; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + e->pause_sub_id= (uint64)ULONGLONG_MAX; + mysql_cond_broadcast(&e->COND_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } + + pool_mark_not_busy(pool); +} + + +/* + . + + Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called. +*/ +int +rpl_pause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + int err; + + /* + While the count_pending_pause_for_ftwrl counter is non-zero, the pool + cannot be shutdown/resized, so threads are guaranteed to not disappear. + + This is required to safely be able to access the individual threads below. + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, + as this can deadlock against release_thread()). + */ + if ((err= pool_mark_busy(pool, thd))) + return err; + + for (i= 0; i < pool->count; ++i) + { + PSI_stage_info old_stage; + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not + de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl(). + */ + rpt->pause_for_ftwrl = true; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + ++e->need_sub_id_signal; + if (e->pause_sub_id == (uint64)ULONGLONG_MAX) + e->pause_sub_id= e->largest_started_sub_id; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); + while (e->pause_sub_id < (uint64)ULONGLONG_MAX && + e->last_committed_sub_id < e->pause_sub_id && + !err) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + }; + --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); + if (err) + break; + } + + if (err) + rpl_unpause_after_ftwrl(thd); + return err; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -1037,17 +1247,40 @@ handle_rpl_parallel_thread(void *arg) */ rpt->batch_free(); - if ((events= rpt->event_queue) != NULL) + for (;;) { + if ((events= rpt->event_queue) != NULL) + { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give + us a new event. + */ + rpt->dequeue1(events); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + goto more_events; + } + if (!rpt->pause_for_ftwrl || + (in_event_group && !group_rgi->parallel_entry->force_abort)) + break; /* - Take next group of events from the replication pool. - This is faster than having to wakeup the pool manager thread to give us - a new event. + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. */ - rpt->dequeue1(events); + mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - goto more_events; + mysql_cond_wait(&rpt->current_entry->COND_parallel_entry, + &rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* + Now loop to check again for more events available, since we released + and re-aquired the LOCK_rpl_thread mutex. + */ } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) @@ -1124,6 +1357,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; + int res; + + if ((res= pool_mark_busy(pool, current_thd))) + return res; /* Allocate the new list of threads up-front. @@ -1172,7 +1409,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + while ((rpt= pool->free_list) == NULL) + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + pool->free_list= rpt->next; + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1222,9 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); return 0; @@ -1248,6 +1490,7 @@ err: } my_free(new_list); } + pool_mark_not_busy(pool); return 1; } @@ -1511,7 +1754,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), inited(false) + : threads(0), free_list(0), count(0), inited(false), busy(false) { } @@ -1519,9 +1762,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() int rpl_parallel_thread_pool::init(uint32 size) { - count= 0; threads= NULL; free_list= NULL; + count= 0; + busy= false; mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); @@ -1562,7 +1806,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_thread *rpt; mysql_mutex_lock(&LOCK_rpl_thread_pool); - while ((rpt= free_list) == NULL) + while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); @@ -1773,6 +2017,7 @@ rpl_parallel::find(uint32 domain_id) e->rpl_thread_max= count; e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; + e->pause_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -1974,7 +2219,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - e->need_sub_id_signal= true; + ++e->need_sub_id_signal; thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) @@ -1987,7 +2232,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) } mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - e->need_sub_id_signal= false; + --e->need_sub_id_signal; thd->EXIT_COND(&old_stage); if (err) return err; |