diff options
-rw-r--r-- | storage/innobase/include/srv0srv.h | 25 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 198 | ||||
-rw-r--r-- | storage/xtradb/include/srv0srv.h | 25 | ||||
-rw-r--r-- | storage/xtradb/srv/srv0srv.cc | 198 |
4 files changed, 194 insertions, 252 deletions
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 6389efa7b84..9e016b90bf3 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -3,7 +3,7 @@ Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2008, 2009, Google Inc. Copyright (c) 2009, Percona Inc. -Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -781,17 +781,12 @@ ulint srv_get_task_queue_length(void); /*===========================*/ -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not -enough threads were suspended at the moment */ -UNIV_INTERN -ulint -srv_release_threads( -/*================*/ - enum srv_thread_type type, /*!< in: thread type */ - ulint n); /*!< in: number of threads to release */ +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n); /**********************************************************************//** Check whether any background thread are active. If so print which thread @@ -802,12 +797,10 @@ const char* srv_any_background_threads_are_active(void); /*=======================================*/ -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wake up the purge threads. */ UNIV_INTERN void -srv_purge_wakeup(void); -/*==================*/ +srv_purge_wakeup(); /** Status variables to be passed to MySQL */ struct export_var_t{ diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 4c58a3e84fb..eb82d7bcb2f 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -3,7 +3,7 @@ Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2008, 2009 Google Inc. Copyright (c) 2009, Percona Inc. -Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -781,7 +781,6 @@ srv_suspend_thread_low( /*===================*/ srv_slot_t* slot) /*!< in/out: thread slot */ { - ut_ad(!srv_read_only_mode); ut_ad(srv_sys_mutex_own()); @@ -839,34 +838,71 @@ srv_suspend_thread( return(sig_count); } -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not - enough threads were suspended at the moment. */ -UNIV_INTERN -ulint -srv_release_threads( -/*================*/ - srv_thread_type type, /*!< in: thread type */ - ulint n) /*!< in: number of threads to release */ +/** Resume the calling thread. +@param[in,out] slot thread slot +@param[in] sig_count signal count (if wait) +@param[in] wait whether to wait for the event +@param[in] timeout_usec timeout in microseconds (0=infinite) +@return whether the wait timed out */ +static +bool +srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true, + ulint timeout_usec = 0) +{ + bool timeout; + + ut_ad(!srv_read_only_mode); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + if (!wait) { + timeout = false; + } else if (timeout_usec) { + timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low( + slot->event, timeout_usec, sig_count); + } else { + timeout = false; + os_event_wait_low(slot->event, sig_count); + } + + srv_sys_mutex_enter(); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + slot->suspended = FALSE; + ++srv_sys->n_threads_active[slot->type]; + srv_sys_mutex_exit(); + return(timeout); +} + +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n) { - ulint i; - ulint count = 0; + ulint running; ut_ad(srv_thread_type_validate(type)); ut_ad(n > 0); - srv_sys_mutex_enter(); + do { + running = 0; - for (i = 0; i < srv_sys->n_sys_threads; i++) { - srv_slot_t* slot; + srv_sys_mutex_enter(); - slot = &srv_sys->sys_threads[i]; + for (ulint i = 0; i < srv_sys->n_sys_threads; i++) { + srv_slot_t* slot = &srv_sys->sys_threads[i]; - if (slot->in_use - && srv_slot_get_type(slot) == type - && slot->suspended) { + if (!slot->in_use || srv_slot_get_type(slot) != type) { + continue; + } else if (!slot->suspended) { + if (++running >= n) { + break; + } + continue; + } switch (type) { case SRV_NONE: @@ -896,21 +932,11 @@ srv_release_threads( break; } - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[type]; - os_event_set(slot->event); - - if (++count == n) { - break; - } } - } - - srv_sys_mutex_exit(); - return(count); + srv_sys_mutex_exit(); + } while (running && running < n); } /*********************************************************************//** @@ -923,11 +949,8 @@ srv_free_slot( { srv_sys_mutex_enter(); - if (!slot->suspended) { - /* Mark the thread as inactive. */ - srv_suspend_thread_low(slot); - } - + /* Mark the thread as inactive. */ + srv_suspend_thread_low(slot); /* Free the slot for reuse. */ ut_ad(slot->in_use); slot->in_use = FALSE; @@ -1965,15 +1988,7 @@ srv_active_wake_master_thread(void) if (slot->in_use) { ut_a(srv_slot_get_type(slot) == SRV_MASTER); - - if (slot->suspended) { - - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[SRV_MASTER]; - - os_event_set(slot->event); - } + os_event_set(slot->event); } srv_sys_mutex_exit(); @@ -2439,7 +2454,7 @@ suspend_thread: manual also mentions this string in several places. */ srv_main_thread_op_info = "waiting for server activity"; - os_event_wait(slot->event); + srv_resume_thread(slot); if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { os_thread_exit(NULL); @@ -2551,8 +2566,7 @@ DECLARE_THREAD(srv_worker_thread)( do { srv_suspend_thread(slot); - - os_event_wait(slot->event); + srv_resume_thread(slot); if (srv_task_execute()) { @@ -2688,8 +2702,6 @@ srv_purge_coordinator_suspend( ib_int64_t sig_count = srv_suspend_thread(slot); do { - ulint ret; - rw_lock_x_lock(&purge_sys->latch); purge_sys->running = false; @@ -2698,32 +2710,11 @@ srv_purge_coordinator_suspend( /* We don't wait right away on the the non-timed wait because we want to signal the thread that wants to suspend purge. */ - - if (stop) { - os_event_wait_low(slot->event, sig_count); - ret = 0; - } else if (rseg_history_len <= trx_sys->rseg_history_len) { - ret = os_event_wait_time_low( - slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count); - } else { - /* We don't want to waste time waiting, if the - history list increased by the time we got here, - unless purge has been stopped. */ - ret = 0; - } - - srv_sys_mutex_enter(); - - /* The thread can be in state !suspended after the timeout - but before this check if another thread sent a wakeup signal. */ - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + const bool wait = stop + || rseg_history_len <= trx_sys->rseg_history_len; + const bool timeout = srv_resume_thread( + slot, sig_count, wait, + stop ? 0 : SRV_PURGE_MAX_TIMEOUT); sig_count = srv_suspend_thread(slot); @@ -2735,6 +2726,19 @@ srv_purge_coordinator_suspend( if (!stop) { ut_a(purge_sys->n_stop == 0); purge_sys->running = true; + + if (timeout + && rseg_history_len == trx_sys->rseg_history_len + && trx_sys->rseg_history_len < 5000) { + /* No new records were added since the + wait started. Simply wait for new + records. The magic number 5000 is an + approximation for the case where we + have cached UNDO log records which + prevent truncate of the UNDO + segments. */ + stop = true; + } } else { ut_a(purge_sys->n_stop > 0); @@ -2743,33 +2747,9 @@ srv_purge_coordinator_suspend( } rw_lock_x_unlock(&purge_sys->latch); - - if (ret == OS_SYNC_TIME_EXCEEDED) { - - /* No new records added since wait started then simply - wait for new records. The magic number 5000 is an - approximation for the case where we have cached UNDO - log records which prevent truncate of the UNDO - segments. */ - - if (rseg_history_len == trx_sys->rseg_history_len - && trx_sys->rseg_history_len < 5000) { - - stop = true; - } - } - } while (stop); - srv_sys_mutex_enter(); - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + srv_resume_thread(slot, 0, false); } /*********************************************************************//** @@ -2822,8 +2802,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( srv_purge_coordinator_suspend(slot, rseg_history_len); } + ut_ad(!slot->suspended); + if (srv_purge_should_exit(n_total_purged)) { - ut_a(!slot->suspended); break; } @@ -2934,12 +2915,10 @@ srv_get_task_queue_length(void) return(n_tasks); } -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wake up the purge threads. */ UNIV_INTERN void -srv_purge_wakeup(void) -/*==================*/ +srv_purge_wakeup() { ut_ad(!srv_read_only_mode); @@ -2954,4 +2933,3 @@ srv_purge_wakeup(void) } } } - diff --git a/storage/xtradb/include/srv0srv.h b/storage/xtradb/include/srv0srv.h index 45c9cb80566..8f5ea21712c 100644 --- a/storage/xtradb/include/srv0srv.h +++ b/storage/xtradb/include/srv0srv.h @@ -3,7 +3,7 @@ Copyright (c) 1995, 2016, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2008, 2009, Google Inc. Copyright (c) 2009, Percona Inc. -Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -968,17 +968,12 @@ ulint srv_get_task_queue_length(void); /*===========================*/ -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not -enough threads were suspended at the moment */ -UNIV_INTERN -ulint -srv_release_threads( -/*================*/ - enum srv_thread_type type, /*!< in: thread type */ - ulint n); /*!< in: number of threads to release */ +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n); /**********************************************************************//** Check whether any background thread are active. If so print which thread @@ -989,12 +984,10 @@ const char* srv_any_background_threads_are_active(void); /*=======================================*/ -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wake up the purge threads. */ UNIV_INTERN void -srv_purge_wakeup(void); -/*==================*/ +srv_purge_wakeup(); /** Status variables to be passed to MySQL */ struct export_var_t{ diff --git a/storage/xtradb/srv/srv0srv.cc b/storage/xtradb/srv/srv0srv.cc index 5d043e4a57e..e5b2062b217 100644 --- a/storage/xtradb/srv/srv0srv.cc +++ b/storage/xtradb/srv/srv0srv.cc @@ -3,7 +3,7 @@ Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2008, 2009 Google Inc. Copyright (c) 2009, Percona Inc. -Copyright (c) 2013, 2014, SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, 2017, MariaDB Corporation Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -931,7 +931,6 @@ srv_suspend_thread_low( /*===================*/ srv_slot_t* slot) /*!< in/out: thread slot */ { - ut_ad(!srv_read_only_mode); ut_ad(srv_sys_mutex_own()); @@ -989,34 +988,71 @@ srv_suspend_thread( return(sig_count); } -/*********************************************************************//** -Releases threads of the type given from suspension in the thread table. -NOTE! The server mutex has to be reserved by the caller! -@return number of threads released: this may be less than n if not - enough threads were suspended at the moment. */ -UNIV_INTERN -ulint -srv_release_threads( -/*================*/ - srv_thread_type type, /*!< in: thread type */ - ulint n) /*!< in: number of threads to release */ +/** Resume the calling thread. +@param[in,out] slot thread slot +@param[in] sig_count signal count (if wait) +@param[in] wait whether to wait for the event +@param[in] timeout_usec timeout in microseconds (0=infinite) +@return whether the wait timed out */ +static +bool +srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true, + ulint timeout_usec = 0) { - ulint i; - ulint count = 0; + bool timeout; + + ut_ad(!srv_read_only_mode); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + if (!wait) { + timeout = false; + } else if (timeout_usec) { + timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low( + slot->event, timeout_usec, sig_count); + } else { + timeout = false; + os_event_wait_low(slot->event, sig_count); + } + + srv_sys_mutex_enter(); + ut_ad(slot->in_use); + ut_ad(slot->suspended); + + slot->suspended = FALSE; + ++srv_sys->n_threads_active[slot->type]; + srv_sys_mutex_exit(); + return(timeout); +} + +/** Ensure that a given number of threads of the type given are running +(or are already terminated). +@param[in] type thread type +@param[in] n number of threads that have to run */ +void +srv_release_threads(enum srv_thread_type type, ulint n) +{ + ulint running; ut_ad(srv_thread_type_validate(type)); ut_ad(n > 0); - srv_sys_mutex_enter(); + do { + running = 0; - for (i = 0; i < srv_sys->n_sys_threads; i++) { - srv_slot_t* slot; + srv_sys_mutex_enter(); - slot = &srv_sys->sys_threads[i]; + for (ulint i = 0; i < srv_sys->n_sys_threads; i++) { + srv_slot_t* slot = &srv_sys->sys_threads[i]; - if (slot->in_use - && srv_slot_get_type(slot) == type - && slot->suspended) { + if (!slot->in_use || srv_slot_get_type(slot) != type) { + continue; + } else if (!slot->suspended) { + if (++running >= n) { + break; + } + continue; + } switch (type) { case SRV_NONE: @@ -1046,21 +1082,11 @@ srv_release_threads( break; } - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[type]; - os_event_set(slot->event); - - if (++count == n) { - break; - } } - } - - srv_sys_mutex_exit(); - return(count); + srv_sys_mutex_exit(); + } while (running && running < n); } /*********************************************************************//** @@ -1073,11 +1099,8 @@ srv_free_slot( { srv_sys_mutex_enter(); - if (!slot->suspended) { - /* Mark the thread as inactive. */ - srv_suspend_thread_low(slot); - } - + /* Mark the thread as inactive. */ + srv_suspend_thread_low(slot); /* Free the slot for reuse. */ ut_ad(slot->in_use); slot->in_use = FALSE; @@ -2581,15 +2604,7 @@ srv_active_wake_master_thread(void) if (slot->in_use) { ut_a(srv_slot_get_type(slot) == SRV_MASTER); - - if (slot->suspended) { - - slot->suspended = FALSE; - - ++srv_sys->n_threads_active[SRV_MASTER]; - - os_event_set(slot->event); - } + os_event_set(slot->event); } srv_sys_mutex_exit(); @@ -3110,7 +3125,7 @@ suspend_thread: manual also mentions this string in several places. */ srv_main_thread_op_info = "waiting for server activity"; - os_event_wait(slot->event); + srv_resume_thread(slot); if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { os_thread_exit(NULL); @@ -3232,8 +3247,7 @@ DECLARE_THREAD(srv_worker_thread)( do { srv_suspend_thread(slot); - - os_event_wait(slot->event); + srv_resume_thread(slot); srv_current_thread_priority = srv_purge_thread_priority; @@ -3373,8 +3387,6 @@ srv_purge_coordinator_suspend( ib_int64_t sig_count = srv_suspend_thread(slot); do { - ulint ret; - rw_lock_x_lock(&purge_sys->latch); purge_sys->running = false; @@ -3383,32 +3395,11 @@ srv_purge_coordinator_suspend( /* We don't wait right away on the the non-timed wait because we want to signal the thread that wants to suspend purge. */ - - if (stop) { - os_event_wait_low(slot->event, sig_count); - ret = 0; - } else if (rseg_history_len <= trx_sys->rseg_history_len) { - ret = os_event_wait_time_low( - slot->event, SRV_PURGE_MAX_TIMEOUT, sig_count); - } else { - /* We don't want to waste time waiting, if the - history list increased by the time we got here, - unless purge has been stopped. */ - ret = 0; - } - - srv_sys_mutex_enter(); - - /* The thread can be in state !suspended after the timeout - but before this check if another thread sent a wakeup signal. */ - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + const bool wait = stop + || rseg_history_len <= trx_sys->rseg_history_len; + const bool timeout = srv_resume_thread( + slot, sig_count, wait, + stop ? 0 : SRV_PURGE_MAX_TIMEOUT); sig_count = srv_suspend_thread(slot); @@ -3420,6 +3411,19 @@ srv_purge_coordinator_suspend( if (!stop) { ut_a(purge_sys->n_stop == 0); purge_sys->running = true; + + if (timeout + && rseg_history_len == trx_sys->rseg_history_len + && trx_sys->rseg_history_len < 5000) { + /* No new records were added since the + wait started. Simply wait for new + records. The magic number 5000 is an + approximation for the case where we + have cached UNDO log records which + prevent truncate of the UNDO + segments. */ + stop = true; + } } else { ut_a(purge_sys->n_stop > 0); @@ -3428,33 +3432,9 @@ srv_purge_coordinator_suspend( } rw_lock_x_unlock(&purge_sys->latch); - - if (ret == OS_SYNC_TIME_EXCEEDED) { - - /* No new records added since wait started then simply - wait for new records. The magic number 5000 is an - approximation for the case where we have cached UNDO - log records which prevent truncate of the UNDO - segments. */ - - if (rseg_history_len == trx_sys->rseg_history_len - && trx_sys->rseg_history_len < 5000) { - - stop = true; - } - } - } while (stop); - srv_sys_mutex_enter(); - - if (slot->suspended) { - slot->suspended = FALSE; - ++srv_sys->n_threads_active[slot->type]; - ut_a(srv_sys->n_threads_active[slot->type] == 1); - } - - srv_sys_mutex_exit(); + srv_resume_thread(slot, 0, false); } /*********************************************************************//** @@ -3510,8 +3490,9 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( srv_purge_coordinator_suspend(slot, rseg_history_len); } + ut_ad(!slot->suspended); + if (srv_purge_should_exit(n_total_purged)) { - ut_a(!slot->suspended); break; } @@ -3626,12 +3607,10 @@ srv_get_task_queue_length(void) return(n_tasks); } -/**********************************************************************//** -Wakeup the purge threads. */ +/** Wake up the purge threads. */ UNIV_INTERN void -srv_purge_wakeup(void) -/*==================*/ +srv_purge_wakeup() { ut_ad(!srv_read_only_mode); @@ -3646,4 +3625,3 @@ srv_purge_wakeup(void) } } } - |