diff options
-rw-r--r-- | storage/innobase/include/srv0srv.h | 22 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 196 |
2 files changed, 97 insertions, 121 deletions
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index fee6ceaca4c..90c3284f95a 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -888,22 +888,16 @@ ulint srv_get_task_queue_length(void); /*===========================*/ -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not -enough threads were suspended at the moment */ -ulint -srv_release_threads( -/*================*/ - enum srv_thread_type type, /*!< in: thread type */ - ulint n); /*!< in: number of threads to release */ +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n); -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wakeup the purge threads. */ void -srv_purge_wakeup(void); -/*==================*/ +srv_purge_wakeup(); /** Check if tablespace is being truncated. (Ignore system-tablespace as we don't re-create the tablespace diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 0fd5ff7afaf..6fab9b12a69 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -854,7 +854,6 @@ srv_suspend_thread_low( /*===================*/ srv_slot_t* slot) /*!< in/out: thread slot */ { - ut_ad(!srv_read_only_mode); ut_ad(srv_sys_mutex_own()); @@ -912,33 +911,73 @@ srv_suspend_thread( return(sig_count); } -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not - enough threads were suspended at the moment. */ -ulint -srv_release_threads( -/*================*/ - srv_thread_type type, /*!< in: thread type */ - ulint n) /*!< in: number of threads to release */ +/** Resume the calling thread. +@param[in,out] slot thread slot +@param[in] sig_count signal count (if wait) +@param[in] wait whether to wait for the event +@param[in] timeout_usec timeout in microseconds (0=infinite) +@return whether the wait timed out */ +static +bool +srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true, + ulint timeout_usec = 0) { - ulint i; - ulint count = 0; + bool timeout; + + ut_ad(!srv_read_only_mode); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + if (!wait) { + timeout = false; + } else if (timeout_usec) { + timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low( + slot->event, timeout_usec, sig_count); + } else { + timeout = false; + os_event_wait_low(slot->event, sig_count); + } + + srv_sys_mutex_enter(); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + slot->suspended = FALSE; + ++srv_sys->n_threads_active[slot->type]; + srv_sys_mutex_exit(); + return(timeout); +} + +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n) +{ + ulint running; ut_ad(srv_thread_type_validate(type)); ut_ad(n > 0); - srv_sys_mutex_enter(); + do { + srv_sys_mutex_enter(); - for (i = 0; i < srv_sys->n_sys_threads; i++) { - srv_slot_t* slot; + running = 0; - slot = &srv_sys->sys_threads[i]; + for (ulint i = 0; i < srv_sys->n_sys_threads; i++) { + srv_slot_t* slot; - if (slot->in_use - && srv_slot_get_type(slot) == type - && slot->suspended) { + slot = &srv_sys->sys_threads[i]; + + if (!slot->in_use || srv_slot_get_type(slot) != type) { + continue; + } else if (!slot->suspended) { + if (++running >= n) { + break; + } + continue; + } switch (type) { case SRV_NONE: @@ -968,21 +1007,11 @@ srv_release_threads( break; } - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[type]; - os_event_set(slot->event); - - if (++count == n) { - break; - } } - } - - srv_sys_mutex_exit(); - return(count); + srv_sys_mutex_exit(); + } while (running && running < n); } /*********************************************************************//** @@ -995,11 +1024,8 @@ srv_free_slot( { srv_sys_mutex_enter(); - if (!slot->suspended) { - /* Mark the thread as inactive. */ - srv_suspend_thread_low(slot); - } - + /* Mark the thread as inactive. */ + srv_suspend_thread_low(slot); /* Free the slot for reuse. */ ut_ad(slot->in_use); slot->in_use = FALSE; @@ -2046,15 +2072,7 @@ srv_active_wake_master_thread_low() if (slot->in_use) { ut_a(srv_slot_get_type(slot) == SRV_MASTER); - - if (slot->suspended) { - - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[SRV_MASTER]; - - os_event_set(slot->event); - } + os_event_set(slot->event); } srv_sys_mutex_exit(); @@ -2572,7 +2590,7 @@ suspend_thread: manual also mentions this string in several places. */ srv_main_thread_op_info = "waiting for server activity"; - os_event_wait(slot->event); + srv_resume_thread(slot); if (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { goto loop; @@ -2689,8 +2707,7 @@ DECLARE_THREAD(srv_worker_thread)( do { srv_suspend_thread(slot); - - os_event_wait(slot->event); + srv_resume_thread(slot); if (srv_task_execute()) { @@ -2835,8 +2852,6 @@ srv_purge_coordinator_suspend( int64_t sig_count = srv_suspend_thread(slot); do { - ulint ret; - rw_lock_x_lock(&purge_sys->latch); purge_sys->running = false; @@ -2845,32 +2860,11 @@ srv_purge_coordinator_suspend( /* We don't wait right away on the the non-timed wait because we want to signal the thread that wants to suspend purge. */ - - if (stop) { - os_event_wait_low(slot->event, sig_count); - ret = 0; - } else if (rseg_history_len <= trx_sys->rseg_history_len) { - ret = os_event_wait_time_low( - slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count); - } else { - /* We don't want to waste time waiting, if the - history list increased by the time we got here, - unless purge has been stopped. */ - ret = 0; - } - - srv_sys_mutex_enter(); - - /* The thread can be in state !suspended after the timeout - but before this check if another thread sent a wakeup signal. */ - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + const bool wait = stop + || rseg_history_len <= trx_sys->rseg_history_len; + const bool timeout = srv_resume_thread( + slot, sig_count, wait, + stop ? 0 : SRV_PURGE_MAX_TIMEOUT); sig_count = srv_suspend_thread(slot); @@ -2882,6 +2876,19 @@ srv_purge_coordinator_suspend( if (!stop) { ut_a(purge_sys->n_stop == 0); purge_sys->running = true; + + if (timeout + && rseg_history_len == trx_sys->rseg_history_len + && trx_sys->rseg_history_len < 5000) { + /* No new records were added since the + wait started. Simply wait for new + records. The magic number 5000 is an + approximation for the case where we + have cached UNDO log records which + prevent truncate of the UNDO + segments. */ + stop = true; + } } else { ut_a(purge_sys->n_stop > 0); @@ -2890,33 +2897,9 @@ srv_purge_coordinator_suspend( } rw_lock_x_unlock(&purge_sys->latch); - - if (ret == OS_SYNC_TIME_EXCEEDED) { - - /* No new records added since wait started then simply - wait for new records. The magic number 5000 is an - approximation for the case where we have cached UNDO - log records which prevent truncate of the UNDO - segments. */ - - if (rseg_history_len == trx_sys->rseg_history_len - && trx_sys->rseg_history_len < 5000) { - - stop = true; - } - } - } while (stop && !thd_kill_level(thd)); - srv_sys_mutex_enter(); - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + srv_resume_thread(slot, 0, false); } /*********************************************************************//** @@ -2971,8 +2954,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( srv_purge_coordinator_suspend(thd, slot, rseg_history_len); } + ut_ad(!slot->suspended); + if (srv_purge_should_exit(thd, n_total_purged)) { - ut_a(!slot->suspended); break; } @@ -3092,11 +3076,9 @@ srv_get_task_queue_length(void) return(n_tasks); } -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wake up the purge threads. */ void -srv_purge_wakeup(void) -/*==================*/ +srv_purge_wakeup() { ut_ad(!srv_read_only_mode); |