summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--storage/innobase/include/srv0srv.h22
-rw-r--r--storage/innobase/srv/srv0srv.cc196
2 files changed, 97 insertions, 121 deletions
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index fee6ceaca4c..90c3284f95a 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -888,22 +888,16 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
-enough threads were suspended at the moment */
-ulint
-srv_release_threads(
-/*================*/
- enum srv_thread_type type, /*!< in: thread type */
- ulint n); /*!< in: number of threads to release */
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n);
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wakeup the purge threads. */
void
-srv_purge_wakeup(void);
-/*==================*/
+srv_purge_wakeup();
/** Check if tablespace is being truncated.
(Ignore system-tablespace as we don't re-create the tablespace
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 0fd5ff7afaf..6fab9b12a69 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -854,7 +854,6 @@ srv_suspend_thread_low(
/*===================*/
srv_slot_t* slot) /*!< in/out: thread slot */
{
-
ut_ad(!srv_read_only_mode);
ut_ad(srv_sys_mutex_own());
@@ -912,33 +911,73 @@ srv_suspend_thread(
return(sig_count);
}
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
- enough threads were suspended at the moment. */
-ulint
-srv_release_threads(
-/*================*/
- srv_thread_type type, /*!< in: thread type */
- ulint n) /*!< in: number of threads to release */
+/** Resume the calling thread.
+@param[in,out] slot thread slot
+@param[in] sig_count signal count (if wait)
+@param[in] wait whether to wait for the event
+@param[in] timeout_usec timeout in microseconds (0=infinite)
+@return whether the wait timed out */
+static
+bool
+srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true,
+ ulint timeout_usec = 0)
{
- ulint i;
- ulint count = 0;
+ bool timeout;
+
+ ut_ad(!srv_read_only_mode);
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ if (!wait) {
+ timeout = false;
+ } else if (timeout_usec) {
+ timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low(
+ slot->event, timeout_usec, sig_count);
+ } else {
+ timeout = false;
+ os_event_wait_low(slot->event, sig_count);
+ }
+
+ srv_sys_mutex_enter();
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ slot->suspended = FALSE;
+ ++srv_sys->n_threads_active[slot->type];
+ srv_sys_mutex_exit();
+ return(timeout);
+}
+
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n)
+{
+ ulint running;
ut_ad(srv_thread_type_validate(type));
ut_ad(n > 0);
- srv_sys_mutex_enter();
+ do {
+ srv_sys_mutex_enter();
- for (i = 0; i < srv_sys->n_sys_threads; i++) {
- srv_slot_t* slot;
+ running = 0;
- slot = &srv_sys->sys_threads[i];
+ for (ulint i = 0; i < srv_sys->n_sys_threads; i++) {
+ srv_slot_t* slot;
- if (slot->in_use
- && srv_slot_get_type(slot) == type
- && slot->suspended) {
+ slot = &srv_sys->sys_threads[i];
+
+ if (!slot->in_use || srv_slot_get_type(slot) != type) {
+ continue;
+ } else if (!slot->suspended) {
+ if (++running >= n) {
+ break;
+ }
+ continue;
+ }
switch (type) {
case SRV_NONE:
@@ -968,21 +1007,11 @@ srv_release_threads(
break;
}
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[type];
-
os_event_set(slot->event);
-
- if (++count == n) {
- break;
- }
}
- }
-
- srv_sys_mutex_exit();
- return(count);
+ srv_sys_mutex_exit();
+ } while (running && running < n);
}
/*********************************************************************//**
@@ -995,11 +1024,8 @@ srv_free_slot(
{
srv_sys_mutex_enter();
- if (!slot->suspended) {
- /* Mark the thread as inactive. */
- srv_suspend_thread_low(slot);
- }
-
+ /* Mark the thread as inactive. */
+ srv_suspend_thread_low(slot);
/* Free the slot for reuse. */
ut_ad(slot->in_use);
slot->in_use = FALSE;
@@ -2046,15 +2072,7 @@ srv_active_wake_master_thread_low()
if (slot->in_use) {
ut_a(srv_slot_get_type(slot) == SRV_MASTER);
-
- if (slot->suspended) {
-
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[SRV_MASTER];
-
- os_event_set(slot->event);
- }
+ os_event_set(slot->event);
}
srv_sys_mutex_exit();
@@ -2572,7 +2590,7 @@ suspend_thread:
manual also mentions this string in several places. */
srv_main_thread_op_info = "waiting for server activity";
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
if (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
goto loop;
@@ -2689,8 +2707,7 @@ DECLARE_THREAD(srv_worker_thread)(
do {
srv_suspend_thread(slot);
-
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
if (srv_task_execute()) {
@@ -2835,8 +2852,6 @@ srv_purge_coordinator_suspend(
int64_t sig_count = srv_suspend_thread(slot);
do {
- ulint ret;
-
rw_lock_x_lock(&purge_sys->latch);
purge_sys->running = false;
@@ -2845,32 +2860,11 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
-
- if (stop) {
- os_event_wait_low(slot->event, sig_count);
- ret = 0;
- } else if (rseg_history_len <= trx_sys->rseg_history_len) {
- ret = os_event_wait_time_low(
- slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count);
- } else {
- /* We don't want to waste time waiting, if the
- history list increased by the time we got here,
- unless purge has been stopped. */
- ret = 0;
- }
-
- srv_sys_mutex_enter();
-
- /* The thread can be in state !suspended after the timeout
- but before this check if another thread sent a wakeup signal. */
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ const bool wait = stop
+ || rseg_history_len <= trx_sys->rseg_history_len;
+ const bool timeout = srv_resume_thread(
+ slot, sig_count, wait,
+ stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
sig_count = srv_suspend_thread(slot);
@@ -2882,6 +2876,19 @@ srv_purge_coordinator_suspend(
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
+
+ if (timeout
+ && rseg_history_len == trx_sys->rseg_history_len
+ && trx_sys->rseg_history_len < 5000) {
+ /* No new records were added since the
+ wait started. Simply wait for new
+ records. The magic number 5000 is an
+ approximation for the case where we
+ have cached UNDO log records which
+ prevent truncate of the UNDO
+ segments. */
+ stop = true;
+ }
} else {
ut_a(purge_sys->n_stop > 0);
@@ -2890,33 +2897,9 @@ srv_purge_coordinator_suspend(
}
rw_lock_x_unlock(&purge_sys->latch);
-
- if (ret == OS_SYNC_TIME_EXCEEDED) {
-
- /* No new records added since wait started then simply
- wait for new records. The magic number 5000 is an
- approximation for the case where we have cached UNDO
- log records which prevent truncate of the UNDO
- segments. */
-
- if (rseg_history_len == trx_sys->rseg_history_len
- && trx_sys->rseg_history_len < 5000) {
-
- stop = true;
- }
- }
-
} while (stop && !thd_kill_level(thd));
- srv_sys_mutex_enter();
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ srv_resume_thread(slot, 0, false);
}
/*********************************************************************//**
@@ -2971,8 +2954,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_purge_coordinator_suspend(thd, slot, rseg_history_len);
}
+ ut_ad(!slot->suspended);
+
if (srv_purge_should_exit(thd, n_total_purged)) {
- ut_a(!slot->suspended);
break;
}
@@ -3092,11 +3076,9 @@ srv_get_task_queue_length(void)
return(n_tasks);
}
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wake up the purge threads. */
void
-srv_purge_wakeup(void)
-/*==================*/
+srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);