summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--storage/innobase/include/srv0srv.h25
-rw-r--r--storage/innobase/srv/srv0srv.cc198
-rw-r--r--storage/xtradb/include/srv0srv.h25
-rw-r--r--storage/xtradb/srv/srv0srv.cc198
4 files changed, 194 insertions, 252 deletions
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 6389efa7b84..9e016b90bf3 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009, Google Inc.
Copyright (c) 2009, Percona Inc.
-Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -781,17 +781,12 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
-enough threads were suspended at the moment */
-UNIV_INTERN
-ulint
-srv_release_threads(
-/*================*/
- enum srv_thread_type type, /*!< in: thread type */
- ulint n); /*!< in: number of threads to release */
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n);
/**********************************************************************//**
Check whether any background thread are active. If so print which thread
@@ -802,12 +797,10 @@ const char*
srv_any_background_threads_are_active(void);
/*=======================================*/
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wake up the purge threads. */
UNIV_INTERN
void
-srv_purge_wakeup(void);
-/*==================*/
+srv_purge_wakeup();
/** Status variables to be passed to MySQL */
struct export_var_t{
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 4c58a3e84fb..eb82d7bcb2f 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
-Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -781,7 +781,6 @@ srv_suspend_thread_low(
/*===================*/
srv_slot_t* slot) /*!< in/out: thread slot */
{
-
ut_ad(!srv_read_only_mode);
ut_ad(srv_sys_mutex_own());
@@ -839,34 +838,71 @@ srv_suspend_thread(
return(sig_count);
}
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
- enough threads were suspended at the moment. */
-UNIV_INTERN
-ulint
-srv_release_threads(
-/*================*/
- srv_thread_type type, /*!< in: thread type */
- ulint n) /*!< in: number of threads to release */
+/** Resume the calling thread.
+@param[in,out] slot thread slot
+@param[in] sig_count signal count (if wait)
+@param[in] wait whether to wait for the event
+@param[in] timeout_usec timeout in microseconds (0=infinite)
+@return whether the wait timed out */
+static
+bool
+srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true,
+ ulint timeout_usec = 0)
+{
+ bool timeout;
+
+ ut_ad(!srv_read_only_mode);
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ if (!wait) {
+ timeout = false;
+ } else if (timeout_usec) {
+ timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low(
+ slot->event, timeout_usec, sig_count);
+ } else {
+ timeout = false;
+ os_event_wait_low(slot->event, sig_count);
+ }
+
+ srv_sys_mutex_enter();
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ slot->suspended = FALSE;
+ ++srv_sys->n_threads_active[slot->type];
+ srv_sys_mutex_exit();
+ return(timeout);
+}
+
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n)
{
- ulint i;
- ulint count = 0;
+ ulint running;
ut_ad(srv_thread_type_validate(type));
ut_ad(n > 0);
- srv_sys_mutex_enter();
+ do {
+ running = 0;
- for (i = 0; i < srv_sys->n_sys_threads; i++) {
- srv_slot_t* slot;
+ srv_sys_mutex_enter();
- slot = &srv_sys->sys_threads[i];
+ for (ulint i = 0; i < srv_sys->n_sys_threads; i++) {
+ srv_slot_t* slot = &srv_sys->sys_threads[i];
- if (slot->in_use
- && srv_slot_get_type(slot) == type
- && slot->suspended) {
+ if (!slot->in_use || srv_slot_get_type(slot) != type) {
+ continue;
+ } else if (!slot->suspended) {
+ if (++running >= n) {
+ break;
+ }
+ continue;
+ }
switch (type) {
case SRV_NONE:
@@ -896,21 +932,11 @@ srv_release_threads(
break;
}
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[type];
-
os_event_set(slot->event);
-
- if (++count == n) {
- break;
- }
}
- }
-
- srv_sys_mutex_exit();
- return(count);
+ srv_sys_mutex_exit();
+ } while (running && running < n);
}
/*********************************************************************//**
@@ -923,11 +949,8 @@ srv_free_slot(
{
srv_sys_mutex_enter();
- if (!slot->suspended) {
- /* Mark the thread as inactive. */
- srv_suspend_thread_low(slot);
- }
-
+ /* Mark the thread as inactive. */
+ srv_suspend_thread_low(slot);
/* Free the slot for reuse. */
ut_ad(slot->in_use);
slot->in_use = FALSE;
@@ -1965,15 +1988,7 @@ srv_active_wake_master_thread(void)
if (slot->in_use) {
ut_a(srv_slot_get_type(slot) == SRV_MASTER);
-
- if (slot->suspended) {
-
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[SRV_MASTER];
-
- os_event_set(slot->event);
- }
+ os_event_set(slot->event);
}
srv_sys_mutex_exit();
@@ -2439,7 +2454,7 @@ suspend_thread:
manual also mentions this string in several places. */
srv_main_thread_op_info = "waiting for server activity";
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
os_thread_exit(NULL);
@@ -2551,8 +2566,7 @@ DECLARE_THREAD(srv_worker_thread)(
do {
srv_suspend_thread(slot);
-
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
if (srv_task_execute()) {
@@ -2688,8 +2702,6 @@ srv_purge_coordinator_suspend(
ib_int64_t sig_count = srv_suspend_thread(slot);
do {
- ulint ret;
-
rw_lock_x_lock(&purge_sys->latch);
purge_sys->running = false;
@@ -2698,32 +2710,11 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
-
- if (stop) {
- os_event_wait_low(slot->event, sig_count);
- ret = 0;
- } else if (rseg_history_len <= trx_sys->rseg_history_len) {
- ret = os_event_wait_time_low(
- slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count);
- } else {
- /* We don't want to waste time waiting, if the
- history list increased by the time we got here,
- unless purge has been stopped. */
- ret = 0;
- }
-
- srv_sys_mutex_enter();
-
- /* The thread can be in state !suspended after the timeout
- but before this check if another thread sent a wakeup signal. */
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ const bool wait = stop
+ || rseg_history_len <= trx_sys->rseg_history_len;
+ const bool timeout = srv_resume_thread(
+ slot, sig_count, wait,
+ stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
sig_count = srv_suspend_thread(slot);
@@ -2735,6 +2726,19 @@ srv_purge_coordinator_suspend(
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
+
+ if (timeout
+ && rseg_history_len == trx_sys->rseg_history_len
+ && trx_sys->rseg_history_len < 5000) {
+ /* No new records were added since the
+ wait started. Simply wait for new
+ records. The magic number 5000 is an
+ approximation for the case where we
+ have cached UNDO log records which
+ prevent truncate of the UNDO
+ segments. */
+ stop = true;
+ }
} else {
ut_a(purge_sys->n_stop > 0);
@@ -2743,33 +2747,9 @@ srv_purge_coordinator_suspend(
}
rw_lock_x_unlock(&purge_sys->latch);
-
- if (ret == OS_SYNC_TIME_EXCEEDED) {
-
- /* No new records added since wait started then simply
- wait for new records. The magic number 5000 is an
- approximation for the case where we have cached UNDO
- log records which prevent truncate of the UNDO
- segments. */
-
- if (rseg_history_len == trx_sys->rseg_history_len
- && trx_sys->rseg_history_len < 5000) {
-
- stop = true;
- }
- }
-
} while (stop);
- srv_sys_mutex_enter();
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ srv_resume_thread(slot, 0, false);
}
/*********************************************************************//**
@@ -2822,8 +2802,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
+ ut_ad(!slot->suspended);
+
if (srv_purge_should_exit(n_total_purged)) {
- ut_a(!slot->suspended);
break;
}
@@ -2934,12 +2915,10 @@ srv_get_task_queue_length(void)
return(n_tasks);
}
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wake up the purge threads. */
UNIV_INTERN
void
-srv_purge_wakeup(void)
-/*==================*/
+srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);
@@ -2954,4 +2933,3 @@ srv_purge_wakeup(void)
}
}
}
-
diff --git a/storage/xtradb/include/srv0srv.h b/storage/xtradb/include/srv0srv.h
index 45c9cb80566..8f5ea21712c 100644
--- a/storage/xtradb/include/srv0srv.h
+++ b/storage/xtradb/include/srv0srv.h
@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009, Google Inc.
Copyright (c) 2009, Percona Inc.
-Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -968,17 +968,12 @@ ulint
srv_get_task_queue_length(void);
/*===========================*/
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
-enough threads were suspended at the moment */
-UNIV_INTERN
-ulint
-srv_release_threads(
-/*================*/
- enum srv_thread_type type, /*!< in: thread type */
- ulint n); /*!< in: number of threads to release */
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n);
/**********************************************************************//**
Check whether any background thread are active. If so print which thread
@@ -989,12 +984,10 @@ const char*
srv_any_background_threads_are_active(void);
/*=======================================*/
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wake up the purge threads. */
UNIV_INTERN
void
-srv_purge_wakeup(void);
-/*==================*/
+srv_purge_wakeup();
/** Status variables to be passed to MySQL */
struct export_var_t{
diff --git a/storage/xtradb/srv/srv0srv.cc b/storage/xtradb/srv/srv0srv.cc
index 5d043e4a57e..e5b2062b217 100644
--- a/storage/xtradb/srv/srv0srv.cc
+++ b/storage/xtradb/srv/srv0srv.cc
@@ -3,7 +3,7 @@
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
-Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -931,7 +931,6 @@ srv_suspend_thread_low(
/*===================*/
srv_slot_t* slot) /*!< in/out: thread slot */
{
-
ut_ad(!srv_read_only_mode);
ut_ad(srv_sys_mutex_own());
@@ -989,34 +988,71 @@ srv_suspend_thread(
return(sig_count);
}
-/*********************************************************************//**
-Releases threads of the type given from suspension in the thread table.
-NOTE! The server mutex has to be reserved by the caller!
-@return number of threads released: this may be less than n if not
- enough threads were suspended at the moment. */
-UNIV_INTERN
-ulint
-srv_release_threads(
-/*================*/
- srv_thread_type type, /*!< in: thread type */
- ulint n) /*!< in: number of threads to release */
+/** Resume the calling thread.
+@param[in,out] slot thread slot
+@param[in] sig_count signal count (if wait)
+@param[in] wait whether to wait for the event
+@param[in] timeout_usec timeout in microseconds (0=infinite)
+@return whether the wait timed out */
+static
+bool
+srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true,
+ ulint timeout_usec = 0)
{
- ulint i;
- ulint count = 0;
+ bool timeout;
+
+ ut_ad(!srv_read_only_mode);
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ if (!wait) {
+ timeout = false;
+ } else if (timeout_usec) {
+ timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low(
+ slot->event, timeout_usec, sig_count);
+ } else {
+ timeout = false;
+ os_event_wait_low(slot->event, sig_count);
+ }
+
+ srv_sys_mutex_enter();
+ ut_ad(slot->in_use);
+ ut_ad(slot->suspended);
+
+ slot->suspended = FALSE;
+ ++srv_sys->n_threads_active[slot->type];
+ srv_sys_mutex_exit();
+ return(timeout);
+}
+
+/** Ensure that a given number of threads of the type given are running
+(or are already terminated).
+@param[in] type thread type
+@param[in] n number of threads that have to run */
+void
+srv_release_threads(enum srv_thread_type type, ulint n)
+{
+ ulint running;
ut_ad(srv_thread_type_validate(type));
ut_ad(n > 0);
- srv_sys_mutex_enter();
+ do {
+ running = 0;
- for (i = 0; i < srv_sys->n_sys_threads; i++) {
- srv_slot_t* slot;
+ srv_sys_mutex_enter();
- slot = &srv_sys->sys_threads[i];
+ for (ulint i = 0; i < srv_sys->n_sys_threads; i++) {
+ srv_slot_t* slot = &srv_sys->sys_threads[i];
- if (slot->in_use
- && srv_slot_get_type(slot) == type
- && slot->suspended) {
+ if (!slot->in_use || srv_slot_get_type(slot) != type) {
+ continue;
+ } else if (!slot->suspended) {
+ if (++running >= n) {
+ break;
+ }
+ continue;
+ }
switch (type) {
case SRV_NONE:
@@ -1046,21 +1082,11 @@ srv_release_threads(
break;
}
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[type];
-
os_event_set(slot->event);
-
- if (++count == n) {
- break;
- }
}
- }
-
- srv_sys_mutex_exit();
- return(count);
+ srv_sys_mutex_exit();
+ } while (running && running < n);
}
/*********************************************************************//**
@@ -1073,11 +1099,8 @@ srv_free_slot(
{
srv_sys_mutex_enter();
- if (!slot->suspended) {
- /* Mark the thread as inactive. */
- srv_suspend_thread_low(slot);
- }
-
+ /* Mark the thread as inactive. */
+ srv_suspend_thread_low(slot);
/* Free the slot for reuse. */
ut_ad(slot->in_use);
slot->in_use = FALSE;
@@ -2581,15 +2604,7 @@ srv_active_wake_master_thread(void)
if (slot->in_use) {
ut_a(srv_slot_get_type(slot) == SRV_MASTER);
-
- if (slot->suspended) {
-
- slot->suspended = FALSE;
-
- ++srv_sys->n_threads_active[SRV_MASTER];
-
- os_event_set(slot->event);
- }
+ os_event_set(slot->event);
}
srv_sys_mutex_exit();
@@ -3110,7 +3125,7 @@ suspend_thread:
manual also mentions this string in several places. */
srv_main_thread_op_info = "waiting for server activity";
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
os_thread_exit(NULL);
@@ -3232,8 +3247,7 @@ DECLARE_THREAD(srv_worker_thread)(
do {
srv_suspend_thread(slot);
-
- os_event_wait(slot->event);
+ srv_resume_thread(slot);
srv_current_thread_priority = srv_purge_thread_priority;
@@ -3373,8 +3387,6 @@ srv_purge_coordinator_suspend(
ib_int64_t sig_count = srv_suspend_thread(slot);
do {
- ulint ret;
-
rw_lock_x_lock(&purge_sys->latch);
purge_sys->running = false;
@@ -3383,32 +3395,11 @@ srv_purge_coordinator_suspend(
/* We don't wait right away on the the non-timed wait because
we want to signal the thread that wants to suspend purge. */
-
- if (stop) {
- os_event_wait_low(slot->event, sig_count);
- ret = 0;
- } else if (rseg_history_len <= trx_sys->rseg_history_len) {
- ret = os_event_wait_time_low(
- slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count);
- } else {
- /* We don't want to waste time waiting, if the
- history list increased by the time we got here,
- unless purge has been stopped. */
- ret = 0;
- }
-
- srv_sys_mutex_enter();
-
- /* The thread can be in state !suspended after the timeout
- but before this check if another thread sent a wakeup signal. */
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ const bool wait = stop
+ || rseg_history_len <= trx_sys->rseg_history_len;
+ const bool timeout = srv_resume_thread(
+ slot, sig_count, wait,
+ stop ? 0 : SRV_PURGE_MAX_TIMEOUT);
sig_count = srv_suspend_thread(slot);
@@ -3420,6 +3411,19 @@ srv_purge_coordinator_suspend(
if (!stop) {
ut_a(purge_sys->n_stop == 0);
purge_sys->running = true;
+
+ if (timeout
+ && rseg_history_len == trx_sys->rseg_history_len
+ && trx_sys->rseg_history_len < 5000) {
+ /* No new records were added since the
+ wait started. Simply wait for new
+ records. The magic number 5000 is an
+ approximation for the case where we
+ have cached UNDO log records which
+ prevent truncate of the UNDO
+ segments. */
+ stop = true;
+ }
} else {
ut_a(purge_sys->n_stop > 0);
@@ -3428,33 +3432,9 @@ srv_purge_coordinator_suspend(
}
rw_lock_x_unlock(&purge_sys->latch);
-
- if (ret == OS_SYNC_TIME_EXCEEDED) {
-
- /* No new records added since wait started then simply
- wait for new records. The magic number 5000 is an
- approximation for the case where we have cached UNDO
- log records which prevent truncate of the UNDO
- segments. */
-
- if (rseg_history_len == trx_sys->rseg_history_len
- && trx_sys->rseg_history_len < 5000) {
-
- stop = true;
- }
- }
-
} while (stop);
- srv_sys_mutex_enter();
-
- if (slot->suspended) {
- slot->suspended = FALSE;
- ++srv_sys->n_threads_active[slot->type];
- ut_a(srv_sys->n_threads_active[slot->type] == 1);
- }
-
- srv_sys_mutex_exit();
+ srv_resume_thread(slot, 0, false);
}
/*********************************************************************//**
@@ -3510,8 +3490,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)(
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
+ ut_ad(!slot->suspended);
+
if (srv_purge_should_exit(n_total_purged)) {
- ut_a(!slot->suspended);
break;
}
@@ -3626,12 +3607,10 @@ srv_get_task_queue_length(void)
return(n_tasks);
}
-/**********************************************************************//**
-Wakeup the purge threads. */
+/** Wake up the purge threads. */
UNIV_INTERN
void
-srv_purge_wakeup(void)
-/*==================*/
+srv_purge_wakeup()
{
ut_ad(!srv_read_only_mode);
@@ -3646,4 +3625,3 @@ srv_purge_wakeup(void)
}
}
}
-