diff options
Diffstat (limited to 'src/support/thread_group.c')
-rw-r--r-- | src/support/thread_group.c | 258 |
1 files changed, 148 insertions, 110 deletions
diff --git a/src/support/thread_group.c b/src/support/thread_group.c index 2b4b7ad4e61..59caaedf5cf 100644 --- a/src/support/thread_group.c +++ b/src/support/thread_group.c @@ -1,5 +1,5 @@ /*- - * Copyright (c) 2014-2016 MongoDB, Inc. + * Copyright (c) 2014-2017 MongoDB, Inc. * Copyright (c) 2008-2014 WiredTiger, Inc. * All rights reserved. * @@ -9,11 +9,11 @@ #include "wt_internal.h" /* - * __wt_thread_run -- + * __thread_run -- * General wrapper for any thread. */ -WT_THREAD_RET -__wt_thread_run(void *arg) +static WT_THREAD_RET +__thread_run(void *arg) { WT_DECL_RET; WT_SESSION_IMPL *session; @@ -22,7 +22,20 @@ __wt_thread_run(void *arg) thread = (WT_THREAD*)arg; session = thread->session; - ret = thread->run_func(session, thread); + for (;;) { + if (!F_ISSET(thread, WT_THREAD_RUN)) + break; + if (!F_ISSET(thread, WT_THREAD_ACTIVE)) + __wt_cond_wait(session, thread->pause_cond, + WT_THREAD_PAUSE * WT_MILLION, thread->chk_func); + WT_ERR(thread->run_func(session, thread)); + } + + /* + * If a thread is stopping it may have subsystem cleanup to do. + */ +err: if (thread->stop_func != NULL) + ret = thread->stop_func(session, thread); if (ret != 0 && F_ISSET(thread, WT_THREAD_PANIC_FAIL)) WT_PANIC_MSG(session, ret, @@ -41,42 +54,13 @@ __wt_thread_run(void *arg) } /* - * __thread_group_grow -- - * Increase the number of running threads in the group. - */ -static int -__thread_group_grow( - WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count) -{ - WT_THREAD *thread; - - WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock)); - - /* - * Any bounds checking is done by the caller so we know that - * there is space in the array for new threads. - */ - while (group->current_threads < new_count) { - thread = group->threads[group->current_threads++]; - __wt_verbose(session, WT_VERB_THREAD_GROUP, - "Starting utility thread: %p:%" PRIu32, - (void *)group, thread->id); - F_SET(thread, WT_THREAD_RUN); - WT_ASSERT(session, thread->session != NULL); - WT_RET(__wt_thread_create(thread->session, - &thread->tid, __wt_thread_run, thread)); - } - return (0); -} - -/* * __thread_group_shrink -- - * Decrease the number of running threads in the group. Optionally free any - * memory associated with slots larger than the new count. + * Decrease the number of threads in the group and free memory + * associated with slots larger than the new count. */ static int -__thread_group_shrink(WT_SESSION_IMPL *session, - WT_THREAD_GROUP *group, uint32_t new_count, bool free_thread) +__thread_group_shrink( + WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count) { WT_DECL_RET; WT_SESSION *wt_session; @@ -95,29 +79,47 @@ __thread_group_shrink(WT_SESSION_IMPL *session, if (thread == NULL) continue; - /* Wake threads to ensure they notice the state change */ - if (thread->tid != 0) { - __wt_verbose(session, WT_VERB_THREAD_GROUP, - "Stopping utility thread: %p:%" PRIu32, - (void *)group, thread->id); - F_CLR(thread, WT_THREAD_RUN); - __wt_cond_signal(session, group->wait_cond); - WT_TRET(__wt_thread_join(session, thread->tid)); - thread->tid = 0; - } - if (free_thread) { - if (thread->session != NULL) { - wt_session = (WT_SESSION *)thread->session; - WT_TRET(wt_session->close(wt_session, NULL)); - thread->session = NULL; - } - __wt_free(session, thread); - group->threads[current_slot] = NULL; - } + WT_ASSERT(session, thread->tid.created); + __wt_verbose(session, WT_VERB_THREAD_GROUP, + "Stopping utility thread: %p:%" PRIu32, + (void *)group, thread->id); + if (F_ISSET(thread, WT_THREAD_ACTIVE)) + --group->current_threads; + F_CLR(thread, WT_THREAD_ACTIVE | WT_THREAD_RUN); + /* + * Signal the thread in case it is in a long timeout. + */ + __wt_cond_signal(session, thread->pause_cond); + __wt_cond_signal(session, group->wait_cond); + } + + /* + * We have to perform the join without holding the lock because + * the threads themselves may be waiting on the lock. + */ + __wt_writeunlock(session, &group->lock); + for (current_slot = group->alloc; current_slot > new_count; ) { + thread = group->threads[--current_slot]; + + if (thread == NULL) + continue; + WT_TRET(__wt_thread_join(session, thread->tid)); + __wt_cond_destroy(session, &thread->pause_cond); + } + __wt_writelock(session, &group->lock); + for (current_slot = group->alloc; current_slot > new_count; ) { + thread = group->threads[--current_slot]; + + if (thread == NULL) + continue; + WT_ASSERT(session, thread->session != NULL); + wt_session = (WT_SESSION *)thread->session; + WT_TRET(wt_session->close(wt_session, NULL)); + thread->session = NULL; + __wt_free(session, thread); + group->threads[current_slot] = NULL; } - /* Update the thread group state to match our changes */ - group->current_threads = current_slot; return (ret); } @@ -132,13 +134,20 @@ __thread_group_resize( { WT_CONNECTION_IMPL *conn; WT_DECL_RET; + WT_SESSION *wt_session; WT_THREAD *thread; size_t alloc; uint32_t i, session_flags; conn = S2C(session); + thread = NULL; session_flags = 0; + __wt_verbose(session, WT_VERB_THREAD_GROUP, + "Resize thread group: %p, from min: %" PRIu32 " -> %" PRIu32 + " from max: %" PRIu32 " -> %" PRIu32, + (void *)group, group->min, new_min, group->max, new_max); + WT_ASSERT(session, group->current_threads <= group->alloc && __wt_rwlock_islocked(session, &group->lock)); @@ -153,7 +162,7 @@ __thread_group_resize( * Call shrink to reduce the number of thread structures and running * threads if required by the change in group size. */ - WT_RET(__thread_group_shrink(session, group, new_max, true)); + WT_RET(__thread_group_shrink(session, group, new_max)); /* * Only reallocate the thread array if it is the largest ever, since @@ -187,30 +196,57 @@ __thread_group_resize( if (LF_ISSET(WT_THREAD_PANIC_FAIL)) F_SET(thread, WT_THREAD_PANIC_FAIL); thread->id = i; + thread->chk_func = group->chk_func; thread->run_func = group->run_func; + thread->stop_func = group->stop_func; + WT_ERR(__wt_cond_alloc( + session, "Thread cond", &thread->pause_cond)); + + /* + * Start thread as inactive. We'll activate the needed + * number later. + */ + __wt_verbose(session, WT_VERB_THREAD_GROUP, + "Starting utility thread: %p:%" PRIu32, + (void *)group, thread->id); + F_SET(thread, WT_THREAD_RUN); + WT_ERR(__wt_thread_create(thread->session, + &thread->tid, __thread_run, thread)); + WT_ASSERT(session, group->threads[i] == NULL); group->threads[i] = thread; + thread = NULL; } - if (group->current_threads < new_min) - WT_ERR(__thread_group_grow(session, group, new_min)); + group->max = new_max; + group->min = new_min; + while (group->current_threads < new_min) + __wt_thread_group_start_one(session, group, true); + return (0); err: /* + * An error resizing a thread array is currently fatal, it should only + * happen in an out of memory situation. Do real cleanup just in case + * that changes in the future. + */ + if (thread != NULL) { + if (thread->session != NULL) { + wt_session = (WT_SESSION *)thread->session; + WT_TRET(wt_session->close(wt_session, NULL)); + } + __wt_cond_destroy(session, &thread->pause_cond); + __wt_free(session, thread); + } + + /* * Update the thread group information even on failure to improve our * chances of cleaning up properly. */ group->max = new_max; group->min = new_min; + WT_TRET(__wt_thread_group_destroy(session, group)); - /* - * An error resizing a thread array is fatal, it should only happen - * in an out of memory situation. - */ - if (ret != 0) { - WT_TRET(__wt_thread_group_destroy(session, group)); - WT_PANIC_RET(session, ret, "Error while resizing thread group"); - } - return (ret); + WT_PANIC_RET(session, ret, "Error while resizing thread group"); } /* @@ -224,11 +260,6 @@ __wt_thread_group_resize( { WT_DECL_RET; - __wt_verbose(session, WT_VERB_THREAD_GROUP, - "Resize thread group: %p, from min: %" PRIu32 " -> %" PRIu32 - " from max: %" PRIu32 " -> %" PRIu32, - (void *)group, group->min, new_min, group->max, new_max); - __wt_writelock(session, &group->lock); WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags)); __wt_writeunlock(session, &group->lock); @@ -244,7 +275,9 @@ int __wt_thread_group_create( WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, const char *name, uint32_t min, uint32_t max, uint32_t flags, - int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context)) + bool (*chk_func)(WT_SESSION_IMPL *session), + int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context), + int (*stop_func)(WT_SESSION_IMPL *session, WT_THREAD *context)) { WT_DECL_RET; bool cond_alloced; @@ -257,13 +290,15 @@ __wt_thread_group_create( __wt_verbose(session, WT_VERB_THREAD_GROUP, "Creating thread group: %p", (void *)group); - __wt_rwlock_init(session, &group->lock); + WT_RET(__wt_rwlock_init(session, &group->lock)); WT_ERR(__wt_cond_alloc( session, "thread group cond", &group->wait_cond)); cond_alloced = true; __wt_writelock(session, &group->lock); + group->chk_func = chk_func; group->run_func = run_func; + group->stop_func = stop_func; group->name = name; WT_TRET(__thread_group_resize(session, group, min, max, flags)); @@ -272,7 +307,7 @@ __wt_thread_group_create( /* Cleanup on error to avoid leaking resources */ err: if (ret != 0) { if (cond_alloced) - WT_TRET(__wt_cond_destroy(session, &group->wait_cond)); + __wt_cond_destroy(session, &group->wait_cond); __wt_rwlock_destroy(session, &group->lock); } return (ret); @@ -293,11 +328,11 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group) WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock)); /* Shut down all threads and free associated resources. */ - WT_TRET(__thread_group_shrink(session, group, 0, true)); + WT_TRET(__thread_group_shrink(session, group, 0)); __wt_free(session, group->threads); - WT_TRET(__wt_cond_destroy(session, &group->wait_cond)); + __wt_cond_destroy(session, &group->wait_cond); __wt_rwlock_destroy(session, &group->lock); /* @@ -314,52 +349,55 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group) * __wt_thread_group_start_one -- * Start a new thread if possible. */ -int +void __wt_thread_group_start_one( - WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait) + WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked) { - WT_DECL_RET; + WT_THREAD *thread; if (group->current_threads >= group->max) - return (0); + return; - if (wait) + if (!is_locked) __wt_writelock(session, &group->lock); - else - WT_RET(__wt_try_writelock(session, &group->lock)); /* Recheck the bounds now that we hold the lock */ - if (group->current_threads < group->max) - WT_TRET(__thread_group_grow( - session, group, group->current_threads + 1)); - __wt_writeunlock(session, &group->lock); - - return (ret); + if (group->current_threads < group->max) { + thread = group->threads[group->current_threads++]; + WT_ASSERT(session, thread != NULL); + __wt_verbose(session, WT_VERB_THREAD_GROUP, + "Activating utility thread: %p:%" PRIu32, + (void *)group, thread->id); + WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_ACTIVE)); + F_SET(thread, WT_THREAD_ACTIVE); + __wt_cond_signal(session, thread->pause_cond); + } + if (!is_locked) + __wt_writeunlock(session, &group->lock); } /* * __wt_thread_group_stop_one -- - * Stop one thread if possible. + * Pause one thread if possible. */ -int -__wt_thread_group_stop_one( - WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait) +void +__wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group) { - WT_DECL_RET; + WT_THREAD *thread; if (group->current_threads <= group->min) - return (0); - - if (wait) - __wt_writelock(session, &group->lock); - else - WT_RET(__wt_try_writelock(session, &group->lock)); + return; + __wt_writelock(session, &group->lock); /* Recheck the bounds now that we hold the lock */ - if (group->current_threads > group->min) - WT_TRET(__thread_group_shrink( - session, group, group->current_threads - 1, false)); + if (group->current_threads > group->min) { + thread = group->threads[--group->current_threads]; + __wt_verbose(session, WT_VERB_THREAD_GROUP, + "Pausing utility thread: %p:%" PRIu32, + (void *)group, thread->id); + WT_ASSERT(session, F_ISSET(thread, WT_THREAD_ACTIVE)); + F_CLR(thread, WT_THREAD_ACTIVE); + __wt_cond_signal(session, thread->pause_cond); + } __wt_writeunlock(session, &group->lock); - - return (ret); } |