diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 125 |
1 files changed, 91 insertions, 34 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index f7aab704c43..6788f422cbd 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1320,39 +1320,16 @@ handle_rpl_parallel_thread(void *arg) */ rpt->batch_free(); - for (;;) + if ((events= rpt->event_queue) != NULL) { - if ((events= rpt->event_queue) != NULL) - { - /* - Take next group of events from the replication pool. - This is faster than having to wakeup the pool manager thread to give - us a new event. - */ - rpt->dequeue1(events); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - goto more_events; - } - if (!rpt->pause_for_ftwrl || - (in_event_group && !group_rgi->parallel_entry->force_abort)) - break; /* - We are currently in the delicate process of pausing parallel - replication while FLUSH TABLES WITH READ LOCK is starting. We must - not de-allocate the thread (setting rpt->current_owner= NULL) until - rpl_unpause_after_ftwrl() has woken us up. + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give + us a new event. */ - mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry); + rpt->dequeue1(events); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - if (rpt->pause_for_ftwrl) - mysql_cond_wait(&rpt->current_entry->COND_parallel_entry, - &rpt->current_entry->LOCK_parallel_entry); - mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry); - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - /* - Now loop to check again for more events available, since we released - and re-aquired the LOCK_rpl_thread mutex. - */ + goto more_events; } rpt->inuse_relaylog_refcount_update(); @@ -1379,11 +1356,35 @@ handle_rpl_parallel_thread(void *arg) } if (!in_event_group) { + /* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */ + while (rpt->current_entry && rpt->pause_for_ftwrl) + { + /* + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. + */ + rpl_parallel_entry *e= rpt->current_entry; + /* + Wait for rpl_unpause_after_ftwrl() to wake us up. + Note that rpl_pause_for_ftwrl() may wait for 'e->pause_sub_id' + to change. This should happen eventually in finish_event_group() + */ + mysql_mutex_lock(&e->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (rpt->pause_for_ftwrl) + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + } + rpt->current_owner= NULL; /* Tell wait_for_done() that we are done, if it is waiting. */ if (likely(rpt->current_entry) && unlikely(rpt->current_entry->force_abort)) mysql_cond_broadcast(&rpt->COND_rpl_thread_stop); + rpt->current_entry= NULL; if (!rpt->stop) rpt->pool->release_thread(rpt); @@ -1422,10 +1423,24 @@ dealloc_gco(group_commit_orderer *gco) my_free(gco); } +/** + Change thread count for global parallel worker threads + + @param pool parallel thread pool + @param new_count Number of threads to be in pool. 0 in shutdown + @param force Force thread count to new_count even if slave + threads are running + + By default we don't resize pool of there are running threads. + However during shutdown we will always do it. + This is needed as any_slave_sql_running() returns 1 during shutdown + as we don't want to access master_info while + Master_info_index::free_connections are running. +*/ static int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, - uint32 new_count) + uint32 new_count, bool force) { uint32 i; rpl_parallel_thread **old_list= NULL; @@ -1437,6 +1452,28 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, if ((res= pool_mark_busy(pool, current_thd))) return res; + /* Protect against parallel pool resizes */ + if (pool->count == new_count) + { + pool_mark_not_busy(pool); + return 0; + } + + /* + If we are about to delete pool, do an extra check that there are no new + slave threads running since we marked pool busy + */ + if (!new_count && !force) + { + if (any_slave_sql_running()) + { + DBUG_PRINT("warning", + ("SQL threads running while trying to reset parallel pool")); + pool_mark_not_busy(pool); + return 0; // Ok to not resize pool + } + } + /* Allocate the new list of threads up-front. That way, if we fail half-way, we only need to free whatever we managed @@ -1450,7 +1487,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + new_count*sizeof(*rpt_array)))); - goto err;; + goto err; } for (i= 0; i < new_count; ++i) @@ -1575,12 +1612,26 @@ err: return 1; } +/* + Deactivate the parallel replication thread pool, if there are now no more + SQL threads running. +*/ + +int rpl_parallel_resize_pool_if_no_slaves(void) +{ + /* master_info_index is set to NULL on shutdown */ + if (opt_slave_parallel_threads > 0 && !any_slave_sql_running()) + return rpl_parallel_inactivate_pool(&global_rpl_thread_pool); + return 0; +} + int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) { if (!pool->count) - return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads); + return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads, + 0); return 0; } @@ -1588,7 +1639,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) { - return rpl_parallel_change_thread_count(pool, 0); + return rpl_parallel_change_thread_count(pool, 0, 0); } @@ -1866,7 +1917,7 @@ rpl_parallel_thread_pool::destroy() { if (!inited) return; - rpl_parallel_change_thread_count(this, 0); + rpl_parallel_change_thread_count(this, 0, 1); mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool); inited= false; @@ -1885,6 +1936,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, { rpl_parallel_thread *rpt; + DBUG_ASSERT(count > 0); mysql_mutex_lock(&LOCK_rpl_thread_pool); while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); @@ -2113,6 +2165,11 @@ rpl_parallel::find(uint32 domain_id) return e; } +/** + Wait until all sql worker threads has stopped processing + + This is called when sql thread has been killed/stopped +*/ void rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) |