summaryrefslogtreecommitdiff
path: root/src/support/thread_group.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/support/thread_group.c')
-rw-r--r--src/support/thread_group.c83
1 files changed, 56 insertions, 27 deletions
diff --git a/src/support/thread_group.c b/src/support/thread_group.c
index a866d2d01c5..beb143e63e2 100644
--- a/src/support/thread_group.c
+++ b/src/support/thread_group.c
@@ -50,8 +50,7 @@ __thread_group_grow(
{
WT_THREAD *thread;
- WT_ASSERT(session,
- __wt_rwlock_islocked(session, group->lock));
+ WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
/*
* Any bounds checking is done by the caller so we know that
@@ -72,20 +71,19 @@ __thread_group_grow(
/*
* __thread_group_shrink --
- * Decrease the number of running threads in the group, and free any
+ * Decrease the number of running threads in the group. Optionally free any
* memory associated with slots larger than the new count.
*/
static int
__thread_group_shrink(WT_SESSION_IMPL *session,
- WT_THREAD_GROUP *group, uint32_t new_count)
+ WT_THREAD_GROUP *group, uint32_t new_count, bool free_thread)
{
WT_DECL_RET;
WT_SESSION *wt_session;
WT_THREAD *thread;
uint32_t current_slot;
- WT_ASSERT(session,
- __wt_rwlock_islocked(session, group->lock));
+ WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
for (current_slot = group->alloc; current_slot > new_count; ) {
/*
@@ -107,14 +105,15 @@ __thread_group_shrink(WT_SESSION_IMPL *session,
WT_TRET(__wt_thread_join(session, thread->tid));
thread->tid = 0;
}
-
- if (thread->session != NULL) {
- wt_session = (WT_SESSION *)thread->session;
- WT_TRET(wt_session->close(wt_session, NULL));
- thread->session = NULL;
+ if (free_thread) {
+ if (thread->session != NULL) {
+ wt_session = (WT_SESSION *)thread->session;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ thread->session = NULL;
+ }
+ __wt_free(session, thread);
+ group->threads[current_slot] = NULL;
}
- __wt_free(session, thread);
- group->threads[current_slot] = NULL;
}
/* Update the thread group state to match our changes */
@@ -142,16 +141,19 @@ __thread_group_resize(
WT_ASSERT(session,
group->current_threads <= group->alloc &&
- __wt_rwlock_islocked(session, group->lock));
+ __wt_rwlock_islocked(session, &group->lock));
if (new_min == group->min && new_max == group->max)
return (0);
+ if (new_min > new_max)
+ return (EINVAL);
+
/*
- * Coll shrink to reduce the number of thread structures and running
+ * Call shrink to reduce the number of thread structures and running
* threads if required by the change in group size.
*/
- WT_RET(__thread_group_shrink(session, group, new_max));
+ WT_RET(__thread_group_shrink(session, group, new_max, true));
/*
* Only reallocate the thread array if it is the largest ever, since
@@ -227,9 +229,9 @@ __wt_thread_group_resize(
" from max: %" PRIu32 " -> %" PRIu32,
(void *)group, group->min, new_min, group->max, new_max);
- __wt_writelock(session, group->lock);
+ __wt_writelock(session, &group->lock);
WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags));
- __wt_writeunlock(session, group->lock);
+ __wt_writeunlock(session, &group->lock);
return (ret);
}
@@ -255,17 +257,17 @@ __wt_thread_group_create(
__wt_verbose(session, WT_VERB_THREAD_GROUP,
"Creating thread group: %p", (void *)group);
- WT_RET(__wt_rwlock_alloc(session, &group->lock, "Thread group"));
+ __wt_rwlock_init(session, &group->lock);
WT_ERR(__wt_cond_alloc(
session, "Thread group cond", false, &group->wait_cond));
cond_alloced = true;
- __wt_writelock(session, group->lock);
+ __wt_writelock(session, &group->lock);
group->run_func = run_func;
group->name = name;
WT_TRET(__thread_group_resize(session, group, min, max, flags));
- __wt_writeunlock(session, group->lock);
+ __wt_writeunlock(session, &group->lock);
/* Cleanup on error to avoid leaking resources */
err: if (ret != 0) {
@@ -288,10 +290,10 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group)
__wt_verbose(session, WT_VERB_THREAD_GROUP,
"Destroying thread group: %p", (void *)group);
- WT_ASSERT(session, __wt_rwlock_islocked(session, group->lock));
+ WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock));
/* Shut down all threads and free associated resources. */
- WT_TRET(__thread_group_shrink(session, group, 0));
+ WT_TRET(__thread_group_shrink(session, group, 0, true));
__wt_free(session, group->threads);
@@ -322,15 +324,42 @@ __wt_thread_group_start_one(
return (0);
if (wait)
- __wt_writelock(session, group->lock);
- else if (__wt_try_writelock(session, group->lock) != 0)
- return (0);
+ __wt_writelock(session, &group->lock);
+ else
+ WT_RET(__wt_try_writelock(session, &group->lock));
/* Recheck the bounds now that we hold the lock */
if (group->current_threads < group->max)
WT_TRET(__thread_group_grow(
session, group, group->current_threads + 1));
- __wt_writeunlock(session, group->lock);
+ __wt_writeunlock(session, &group->lock);
+
+ return (ret);
+}
+
+/*
+ * __wt_thread_group_stop_one --
+ * Stop one thread if possible.
+ */
+int
+__wt_thread_group_stop_one(
+ WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait)
+{
+ WT_DECL_RET;
+
+ if (group->current_threads <= group->min)
+ return (0);
+
+ if (wait)
+ __wt_writelock(session, &group->lock);
+ else
+ WT_RET(__wt_try_writelock(session, &group->lock));
+
+ /* Recheck the bounds now that we hold the lock */
+ if (group->current_threads > group->min)
+ WT_TRET(__thread_group_shrink(
+ session, group, group->current_threads - 1, false));
+ __wt_writeunlock(session, &group->lock);
return (ret);
}