summaryrefslogtreecommitdiff
path: root/src/support/thread_group.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/support/thread_group.c')
-rw-r--r--src/support/thread_group.c258
1 files changed, 148 insertions, 110 deletions
diff --git a/src/support/thread_group.c b/src/support/thread_group.c
index 2b4b7ad4e61..59caaedf5cf 100644
--- a/src/support/thread_group.c
+++ b/src/support/thread_group.c
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2014-2016 MongoDB, Inc.
+ * Copyright (c) 2014-2017 MongoDB, Inc.
* Copyright (c) 2008-2014 WiredTiger, Inc.
* All rights reserved.
*
@@ -9,11 +9,11 @@
#include "wt_internal.h"
/*
- * __wt_thread_run --
+ * __thread_run --
* General wrapper for any thread.
*/
-WT_THREAD_RET
-__wt_thread_run(void *arg)
+static WT_THREAD_RET
+__thread_run(void *arg)
{
WT_DECL_RET;
WT_SESSION_IMPL *session;
@@ -22,7 +22,20 @@ __wt_thread_run(void *arg)
thread = (WT_THREAD*)arg;
session = thread->session;
- ret = thread->run_func(session, thread);
+ for (;;) {
+ if (!F_ISSET(thread, WT_THREAD_RUN))
+ break;
+ if (!F_ISSET(thread, WT_THREAD_ACTIVE))
+ __wt_cond_wait(session, thread->pause_cond,
+ WT_THREAD_PAUSE * WT_MILLION, thread->chk_func);
+ WT_ERR(thread->run_func(session, thread));
+ }
+
+ /*
+ * If a thread is stopping it may have subsystem cleanup to do.
+ */
+err: if (thread->stop_func != NULL)
+ ret = thread->stop_func(session, thread);
if (ret != 0 && F_ISSET(thread, WT_THREAD_PANIC_FAIL))
WT_PANIC_MSG(session, ret,
@@ -41,42 +54,13 @@ __wt_thread_run(void *arg)
}
/*
- * __thread_group_grow --
- * Increase the number of running threads in the group.
- */
-static int
-__thread_group_grow(
- WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count)
-{
- WT_THREAD *thread;
-
- WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
-
- /*
- * Any bounds checking is done by the caller so we know that
- * there is space in the array for new threads.
- */
- while (group->current_threads < new_count) {
- thread = group->threads[group->current_threads++];
- __wt_verbose(session, WT_VERB_THREAD_GROUP,
- "Starting utility thread: %p:%" PRIu32,
- (void *)group, thread->id);
- F_SET(thread, WT_THREAD_RUN);
- WT_ASSERT(session, thread->session != NULL);
- WT_RET(__wt_thread_create(thread->session,
- &thread->tid, __wt_thread_run, thread));
- }
- return (0);
-}
-
-/*
* __thread_group_shrink --
- * Decrease the number of running threads in the group. Optionally free any
- * memory associated with slots larger than the new count.
+ * Decrease the number of threads in the group and free memory
+ * associated with slots larger than the new count.
*/
static int
-__thread_group_shrink(WT_SESSION_IMPL *session,
- WT_THREAD_GROUP *group, uint32_t new_count, bool free_thread)
+__thread_group_shrink(
+ WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t new_count)
{
WT_DECL_RET;
WT_SESSION *wt_session;
@@ -95,29 +79,47 @@ __thread_group_shrink(WT_SESSION_IMPL *session,
if (thread == NULL)
continue;
- /* Wake threads to ensure they notice the state change */
- if (thread->tid != 0) {
- __wt_verbose(session, WT_VERB_THREAD_GROUP,
- "Stopping utility thread: %p:%" PRIu32,
- (void *)group, thread->id);
- F_CLR(thread, WT_THREAD_RUN);
- __wt_cond_signal(session, group->wait_cond);
- WT_TRET(__wt_thread_join(session, thread->tid));
- thread->tid = 0;
- }
- if (free_thread) {
- if (thread->session != NULL) {
- wt_session = (WT_SESSION *)thread->session;
- WT_TRET(wt_session->close(wt_session, NULL));
- thread->session = NULL;
- }
- __wt_free(session, thread);
- group->threads[current_slot] = NULL;
- }
+ WT_ASSERT(session, thread->tid.created);
+ __wt_verbose(session, WT_VERB_THREAD_GROUP,
+ "Stopping utility thread: %p:%" PRIu32,
+ (void *)group, thread->id);
+ if (F_ISSET(thread, WT_THREAD_ACTIVE))
+ --group->current_threads;
+ F_CLR(thread, WT_THREAD_ACTIVE | WT_THREAD_RUN);
+ /*
+ * Signal the thread in case it is in a long timeout.
+ */
+ __wt_cond_signal(session, thread->pause_cond);
+ __wt_cond_signal(session, group->wait_cond);
+ }
+
+ /*
+ * We have to perform the join without holding the lock because
+ * the threads themselves may be waiting on the lock.
+ */
+ __wt_writeunlock(session, &group->lock);
+ for (current_slot = group->alloc; current_slot > new_count; ) {
+ thread = group->threads[--current_slot];
+
+ if (thread == NULL)
+ continue;
+ WT_TRET(__wt_thread_join(session, thread->tid));
+ __wt_cond_destroy(session, &thread->pause_cond);
+ }
+ __wt_writelock(session, &group->lock);
+ for (current_slot = group->alloc; current_slot > new_count; ) {
+ thread = group->threads[--current_slot];
+
+ if (thread == NULL)
+ continue;
+ WT_ASSERT(session, thread->session != NULL);
+ wt_session = (WT_SESSION *)thread->session;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ thread->session = NULL;
+ __wt_free(session, thread);
+ group->threads[current_slot] = NULL;
}
- /* Update the thread group state to match our changes */
- group->current_threads = current_slot;
return (ret);
}
@@ -132,13 +134,20 @@ __thread_group_resize(
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
+ WT_SESSION *wt_session;
WT_THREAD *thread;
size_t alloc;
uint32_t i, session_flags;
conn = S2C(session);
+ thread = NULL;
session_flags = 0;
+ __wt_verbose(session, WT_VERB_THREAD_GROUP,
+ "Resize thread group: %p, from min: %" PRIu32 " -> %" PRIu32
+ " from max: %" PRIu32 " -> %" PRIu32,
+ (void *)group, group->min, new_min, group->max, new_max);
+
WT_ASSERT(session,
group->current_threads <= group->alloc &&
__wt_rwlock_islocked(session, &group->lock));
@@ -153,7 +162,7 @@ __thread_group_resize(
* Call shrink to reduce the number of thread structures and running
* threads if required by the change in group size.
*/
- WT_RET(__thread_group_shrink(session, group, new_max, true));
+ WT_RET(__thread_group_shrink(session, group, new_max));
/*
* Only reallocate the thread array if it is the largest ever, since
@@ -187,30 +196,57 @@ __thread_group_resize(
if (LF_ISSET(WT_THREAD_PANIC_FAIL))
F_SET(thread, WT_THREAD_PANIC_FAIL);
thread->id = i;
+ thread->chk_func = group->chk_func;
thread->run_func = group->run_func;
+ thread->stop_func = group->stop_func;
+ WT_ERR(__wt_cond_alloc(
+ session, "Thread cond", &thread->pause_cond));
+
+ /*
+ * Start thread as inactive. We'll activate the needed
+ * number later.
+ */
+ __wt_verbose(session, WT_VERB_THREAD_GROUP,
+ "Starting utility thread: %p:%" PRIu32,
+ (void *)group, thread->id);
+ F_SET(thread, WT_THREAD_RUN);
+ WT_ERR(__wt_thread_create(thread->session,
+ &thread->tid, __thread_run, thread));
+
WT_ASSERT(session, group->threads[i] == NULL);
group->threads[i] = thread;
+ thread = NULL;
}
- if (group->current_threads < new_min)
- WT_ERR(__thread_group_grow(session, group, new_min));
+ group->max = new_max;
+ group->min = new_min;
+ while (group->current_threads < new_min)
+ __wt_thread_group_start_one(session, group, true);
+ return (0);
err: /*
+ * An error resizing a thread array is currently fatal, it should only
+ * happen in an out of memory situation. Do real cleanup just in case
+ * that changes in the future.
+ */
+ if (thread != NULL) {
+ if (thread->session != NULL) {
+ wt_session = (WT_SESSION *)thread->session;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ }
+ __wt_cond_destroy(session, &thread->pause_cond);
+ __wt_free(session, thread);
+ }
+
+ /*
* Update the thread group information even on failure to improve our
* chances of cleaning up properly.
*/
group->max = new_max;
group->min = new_min;
+ WT_TRET(__wt_thread_group_destroy(session, group));
- /*
- * An error resizing a thread array is fatal, it should only happen
- * in an out of memory situation.
- */
- if (ret != 0) {
- WT_TRET(__wt_thread_group_destroy(session, group));
- WT_PANIC_RET(session, ret, "Error while resizing thread group");
- }
- return (ret);
+ WT_PANIC_RET(session, ret, "Error while resizing thread group");
}
/*
@@ -224,11 +260,6 @@ __wt_thread_group_resize(
{
WT_DECL_RET;
- __wt_verbose(session, WT_VERB_THREAD_GROUP,
- "Resize thread group: %p, from min: %" PRIu32 " -> %" PRIu32
- " from max: %" PRIu32 " -> %" PRIu32,
- (void *)group, group->min, new_min, group->max, new_max);
-
__wt_writelock(session, &group->lock);
WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags));
__wt_writeunlock(session, &group->lock);
@@ -244,7 +275,9 @@ int
__wt_thread_group_create(
WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, const char *name,
uint32_t min, uint32_t max, uint32_t flags,
- int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context))
+ bool (*chk_func)(WT_SESSION_IMPL *session),
+ int (*run_func)(WT_SESSION_IMPL *session, WT_THREAD *context),
+ int (*stop_func)(WT_SESSION_IMPL *session, WT_THREAD *context))
{
WT_DECL_RET;
bool cond_alloced;
@@ -257,13 +290,15 @@ __wt_thread_group_create(
__wt_verbose(session, WT_VERB_THREAD_GROUP,
"Creating thread group: %p", (void *)group);
- __wt_rwlock_init(session, &group->lock);
+ WT_RET(__wt_rwlock_init(session, &group->lock));
WT_ERR(__wt_cond_alloc(
session, "thread group cond", &group->wait_cond));
cond_alloced = true;
__wt_writelock(session, &group->lock);
+ group->chk_func = chk_func;
group->run_func = run_func;
+ group->stop_func = stop_func;
group->name = name;
WT_TRET(__thread_group_resize(session, group, min, max, flags));
@@ -272,7 +307,7 @@ __wt_thread_group_create(
/* Cleanup on error to avoid leaking resources */
err: if (ret != 0) {
if (cond_alloced)
- WT_TRET(__wt_cond_destroy(session, &group->wait_cond));
+ __wt_cond_destroy(session, &group->wait_cond);
__wt_rwlock_destroy(session, &group->lock);
}
return (ret);
@@ -293,11 +328,11 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
/* Shut down all threads and free associated resources. */
- WT_TRET(__thread_group_shrink(session, group, 0, true));
+ WT_TRET(__thread_group_shrink(session, group, 0));
__wt_free(session, group->threads);
- WT_TRET(__wt_cond_destroy(session, &group->wait_cond));
+ __wt_cond_destroy(session, &group->wait_cond);
__wt_rwlock_destroy(session, &group->lock);
/*
@@ -314,52 +349,55 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
* __wt_thread_group_start_one --
* Start a new thread if possible.
*/
-int
+void
__wt_thread_group_start_one(
- WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait)
+ WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool is_locked)
{
- WT_DECL_RET;
+ WT_THREAD *thread;
if (group->current_threads >= group->max)
- return (0);
+ return;
- if (wait)
+ if (!is_locked)
__wt_writelock(session, &group->lock);
- else
- WT_RET(__wt_try_writelock(session, &group->lock));
/* Recheck the bounds now that we hold the lock */
- if (group->current_threads < group->max)
- WT_TRET(__thread_group_grow(
- session, group, group->current_threads + 1));
- __wt_writeunlock(session, &group->lock);
-
- return (ret);
+ if (group->current_threads < group->max) {
+ thread = group->threads[group->current_threads++];
+ WT_ASSERT(session, thread != NULL);
+ __wt_verbose(session, WT_VERB_THREAD_GROUP,
+ "Activating utility thread: %p:%" PRIu32,
+ (void *)group, thread->id);
+ WT_ASSERT(session, !F_ISSET(thread, WT_THREAD_ACTIVE));
+ F_SET(thread, WT_THREAD_ACTIVE);
+ __wt_cond_signal(session, thread->pause_cond);
+ }
+ if (!is_locked)
+ __wt_writeunlock(session, &group->lock);
}
/*
* __wt_thread_group_stop_one --
- * Stop one thread if possible.
+ * Pause one thread if possible.
*/
-int
-__wt_thread_group_stop_one(
- WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait)
+void
+__wt_thread_group_stop_one(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
{
- WT_DECL_RET;
+ WT_THREAD *thread;
if (group->current_threads <= group->min)
- return (0);
-
- if (wait)
- __wt_writelock(session, &group->lock);
- else
- WT_RET(__wt_try_writelock(session, &group->lock));
+ return;
+ __wt_writelock(session, &group->lock);
/* Recheck the bounds now that we hold the lock */
- if (group->current_threads > group->min)
- WT_TRET(__thread_group_shrink(
- session, group, group->current_threads - 1, false));
+ if (group->current_threads > group->min) {
+ thread = group->threads[--group->current_threads];
+ __wt_verbose(session, WT_VERB_THREAD_GROUP,
+ "Pausing utility thread: %p:%" PRIu32,
+ (void *)group, thread->id);
+ WT_ASSERT(session, F_ISSET(thread, WT_THREAD_ACTIVE));
+ F_CLR(thread, WT_THREAD_ACTIVE);
+ __wt_cond_signal(session, thread->pause_cond);
+ }
__wt_writeunlock(session, &group->lock);
-
- return (ret);
}