diff options
Diffstat (limited to 'src/support/thread_group.c')
-rw-r--r-- | src/support/thread_group.c | 83 |
1 files changed, 56 insertions, 27 deletions
diff --git a/src/support/thread_group.c b/src/support/thread_group.c index a866d2d01c5..beb143e63e2 100644 --- a/src/support/thread_group.c +++ b/src/support/thread_group.c @@ -50,8 +50,7 @@ __thread_group_grow( { WT_THREAD *thread; - WT_ASSERT(session, - __wt_rwlock_islocked(session, group->lock)); + WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock)); /* * Any bounds checking is done by the caller so we know that @@ -72,20 +71,19 @@ __thread_group_grow( /* * __thread_group_shrink -- - * Decrease the number of running threads in the group, and free any + * Decrease the number of running threads in the group. Optionally free any * memory associated with slots larger than the new count. */ static int __thread_group_shrink(WT_SESSION_IMPL *session, - WT_THREAD_GROUP *group, uint32_t new_count) + WT_THREAD_GROUP *group, uint32_t new_count, bool free_thread) { WT_DECL_RET; WT_SESSION *wt_session; WT_THREAD *thread; uint32_t current_slot; - WT_ASSERT(session, - __wt_rwlock_islocked(session, group->lock)); + WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock)); for (current_slot = group->alloc; current_slot > new_count; ) { /* @@ -107,14 +105,15 @@ __thread_group_shrink(WT_SESSION_IMPL *session, WT_TRET(__wt_thread_join(session, thread->tid)); thread->tid = 0; } - - if (thread->session != NULL) { - wt_session = (WT_SESSION *)thread->session; - WT_TRET(wt_session->close(wt_session, NULL)); - thread->session = NULL; + if (free_thread) { + if (thread->session != NULL) { + wt_session = (WT_SESSION *)thread->session; + WT_TRET(wt_session->close(wt_session, NULL)); + thread->session = NULL; + } + __wt_free(session, thread); + group->threads[current_slot] = NULL; } - __wt_free(session, thread); - group->threads[current_slot] = NULL; } /* Update the thread group state to match our changes */ @@ -142,16 +141,19 @@ __thread_group_resize( WT_ASSERT(session, group->current_threads <= group->alloc && - __wt_rwlock_islocked(session, group->lock)); + __wt_rwlock_islocked(session, &group->lock)); if (new_min == group->min && new_max == group->max) return (0); + if (new_min > new_max) + return (EINVAL); + /* - * Coll shrink to reduce the number of thread structures and running + * Call shrink to reduce the number of thread structures and running * threads if required by the change in group size. */ - WT_RET(__thread_group_shrink(session, group, new_max)); + WT_RET(__thread_group_shrink(session, group, new_max, true)); /* * Only reallocate the thread array if it is the largest ever, since @@ -227,9 +229,9 @@ __wt_thread_group_resize( " from max: %" PRIu32 " -> %" PRIu32, (void *)group, group->min, new_min, group->max, new_max); - __wt_writelock(session, group->lock); + __wt_writelock(session, &group->lock); WT_TRET(__thread_group_resize(session, group, new_min, new_max, flags)); - __wt_writeunlock(session, group->lock); + __wt_writeunlock(session, &group->lock); return (ret); } @@ -255,17 +257,17 @@ __wt_thread_group_create( __wt_verbose(session, WT_VERB_THREAD_GROUP, "Creating thread group: %p", (void *)group); - WT_RET(__wt_rwlock_alloc(session, &group->lock, "Thread group")); + __wt_rwlock_init(session, &group->lock); WT_ERR(__wt_cond_alloc( session, "Thread group cond", false, &group->wait_cond)); cond_alloced = true; - __wt_writelock(session, group->lock); + __wt_writelock(session, &group->lock); group->run_func = run_func; group->name = name; WT_TRET(__thread_group_resize(session, group, min, max, flags)); - __wt_writeunlock(session, group->lock); + __wt_writeunlock(session, &group->lock); /* Cleanup on error to avoid leaking resources */ err: if (ret != 0) { @@ -288,10 +290,10 @@ __wt_thread_group_destroy(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group) __wt_verbose(session, WT_VERB_THREAD_GROUP, "Destroying thread group: %p", (void *)group); - WT_ASSERT(session, __wt_rwlock_islocked(session, group->lock)); + WT_ASSERT(session, __wt_rwlock_islocked(session, &group->lock)); /* Shut down all threads and free associated resources. */ - WT_TRET(__thread_group_shrink(session, group, 0)); + WT_TRET(__thread_group_shrink(session, group, 0, true)); __wt_free(session, group->threads); @@ -322,15 +324,42 @@ __wt_thread_group_start_one( return (0); if (wait) - __wt_writelock(session, group->lock); - else if (__wt_try_writelock(session, group->lock) != 0) - return (0); + __wt_writelock(session, &group->lock); + else + WT_RET(__wt_try_writelock(session, &group->lock)); /* Recheck the bounds now that we hold the lock */ if (group->current_threads < group->max) WT_TRET(__thread_group_grow( session, group, group->current_threads + 1)); - __wt_writeunlock(session, group->lock); + __wt_writeunlock(session, &group->lock); + + return (ret); +} + +/* + * __wt_thread_group_stop_one -- + * Stop one thread if possible. + */ +int +__wt_thread_group_stop_one( + WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, bool wait) +{ + WT_DECL_RET; + + if (group->current_threads <= group->min) + return (0); + + if (wait) + __wt_writelock(session, &group->lock); + else + WT_RET(__wt_try_writelock(session, &group->lock)); + + /* Recheck the bounds now that we hold the lock */ + if (group->current_threads > group->min) + WT_TRET(__thread_group_shrink( + session, group, group->current_threads - 1, false)); + __wt_writeunlock(session, &group->lock); return (ret); } |