diff options
author | Andrew Morrow <acm@mongodb.com> | 2017-09-20 17:02:44 -0400 |
---|---|---|
committer | Andrew Morrow <acm@mongodb.com> | 2017-09-20 22:20:27 -0400 |
commit | afb519394774f497ad345927adb052dd74a43d93 (patch) | |
tree | 1a2843679213f281a7a8d06f602d4c2a7e6f2c62 /src/mongo/transport | |
parent | 937becf51cad52450a373bbbee1e2c76009143c6 (diff) | |
download | mongo-afb519394774f497ad345927adb052dd74a43d93.tar.gz |
SERVER-30737 Use launchServiceWorkerThread to launch threads in ServiceExecutorAdaptive
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/service_entry_point_utils.cpp | 17 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.cpp | 18 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.h | 1 |
3 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp index 16eadb64817..33df131f94a 100644 --- a/src/mongo/transport/service_entry_point_utils.cpp +++ b/src/mongo/transport/service_entry_point_utils.cpp @@ -59,24 +59,22 @@ void* runFunc(void* ctx) { } // namespace Status launchServiceWorkerThread(stdx::function<void()> task) { - auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task)); try { -#ifndef __linux__ // TODO: consider making this ifdef _WIN32 - stdx::thread(stdx::bind(runFunc, ctx.get())).detach(); - ctx.release(); +#if defined(_WIN32) + stdx::thread(std::move(task)).detach(); #else pthread_attr_t attrs; pthread_attr_init(&attrs); pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - static const size_t STACK_SIZE = + static const size_t kStackSize = 1024 * 1024; // if we change this we need to update the warning struct rlimit limits; invariant(getrlimit(RLIMIT_STACK, &limits) == 0); - if (limits.rlim_cur > STACK_SIZE) { - size_t stackSizeToSet = STACK_SIZE; + if (limits.rlim_cur > kStackSize) { + size_t stackSizeToSet = kStackSize; #if !__has_feature(address_sanitizer) if (kDebugBuild) stackSizeToSet /= 2; @@ -90,8 +88,8 @@ Status launchServiceWorkerThread(stdx::function<void()> task) { warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB"; } - pthread_t thread; + auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task)); int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); pthread_attr_destroy(&attrs); @@ -101,8 +99,9 @@ Status launchServiceWorkerThread(stdx::function<void()> task) { throw std::system_error( std::make_error_code(std::errc::resource_unavailable_try_again)); } + ctx.release(); -#endif // __linux__ +#endif } catch (...) { return {ErrorCodes::InternalError, "failed to create service entry worker thread"}; diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index dbde5067e43..67760c4cf46 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -35,6 +35,7 @@ #include <random> #include "mongo/db/server_parameters.h" +#include "mongo/transport/service_entry_point_utils.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" @@ -357,7 +358,19 @@ void ServiceExecutorAdaptive::_startWorkerThread() { _threadsPending.addAndFetch(1); _threadsRunning.addAndFetch(1); - it->thread = stdx::thread(&ServiceExecutorAdaptive::_workerThreadRoutine, this, num, it); + + lk.unlock(); + + const auto launchResult = + launchServiceWorkerThread([this, num, it] { _workerThreadRoutine(num, it); }); + + if (!launchResult.isOK()) { + warning() << "Failed to launch new worker thread: " << launchResult; + lk.lock(); + _threadsPending.subtractAndFetch(1); + _threadsRunning.subtractAndFetch(1); + _threads.erase(it); + } } Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const { @@ -416,7 +429,7 @@ void ServiceExecutorAdaptive::_workerThreadRoutine( setThreadName(threadName); } - log() << "Starting new database worker thread " << threadId; + log() << "Started new database worker thread " << threadId; // Whether a thread is "pending" reflects whether its had a chance to do any useful work. // When a thread is pending, it will only try to run one task through ASIO, and report back @@ -433,7 +446,6 @@ void ServiceExecutorAdaptive::_workerThreadRoutine( { stdx::lock_guard<stdx::mutex> lk(_threadsMutex); - state->thread.detach(); _threads.erase(state); } _deathCondition.notify_one(); diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h index beca37276e0..2163d97f3e5 100644 --- a/src/mongo/transport/service_executor_adaptive.h +++ b/src/mongo/transport/service_executor_adaptive.h @@ -168,7 +168,6 @@ private: TickSource::Tick executingCurRun; CumulativeTickTimer executing; int recursionDepth = 0; - stdx::thread thread; }; using ThreadList = stdx::list<ThreadState>; |