summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorAndrew Morrow <acm@mongodb.com>2017-09-20 17:02:44 -0400
committerAndrew Morrow <acm@mongodb.com>2017-09-20 22:20:27 -0400
commitafb519394774f497ad345927adb052dd74a43d93 (patch)
tree1a2843679213f281a7a8d06f602d4c2a7e6f2c62 /src/mongo/transport
parent937becf51cad52450a373bbbee1e2c76009143c6 (diff)
downloadmongo-afb519394774f497ad345927adb052dd74a43d93.tar.gz
SERVER-30737 Use launchServiceWorkerThread to launch threads in ServiceExecutorAdaptive
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/service_entry_point_utils.cpp17
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp18
-rw-r--r--src/mongo/transport/service_executor_adaptive.h1
3 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_entry_point_utils.cpp
index 16eadb64817..33df131f94a 100644
--- a/src/mongo/transport/service_entry_point_utils.cpp
+++ b/src/mongo/transport/service_entry_point_utils.cpp
@@ -59,24 +59,22 @@ void* runFunc(void* ctx) {
} // namespace
Status launchServiceWorkerThread(stdx::function<void()> task) {
- auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));
try {
-#ifndef __linux__ // TODO: consider making this ifdef _WIN32
- stdx::thread(stdx::bind(runFunc, ctx.get())).detach();
- ctx.release();
+#if defined(_WIN32)
+ stdx::thread(std::move(task)).detach();
#else
pthread_attr_t attrs;
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
- static const size_t STACK_SIZE =
+ static const size_t kStackSize =
1024 * 1024; // if we change this we need to update the warning
struct rlimit limits;
invariant(getrlimit(RLIMIT_STACK, &limits) == 0);
- if (limits.rlim_cur > STACK_SIZE) {
- size_t stackSizeToSet = STACK_SIZE;
+ if (limits.rlim_cur > kStackSize) {
+ size_t stackSizeToSet = kStackSize;
#if !__has_feature(address_sanitizer)
if (kDebugBuild)
stackSizeToSet /= 2;
@@ -90,8 +88,8 @@ Status launchServiceWorkerThread(stdx::function<void()> task) {
warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";
}
-
pthread_t thread;
+ auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));
int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());
pthread_attr_destroy(&attrs);
@@ -101,8 +99,9 @@ Status launchServiceWorkerThread(stdx::function<void()> task) {
throw std::system_error(
std::make_error_code(std::errc::resource_unavailable_try_again));
}
+
ctx.release();
-#endif // __linux__
+#endif
} catch (...) {
return {ErrorCodes::InternalError, "failed to create service entry worker thread"};
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index dbde5067e43..67760c4cf46 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -35,6 +35,7 @@
#include <random>
#include "mongo/db/server_parameters.h"
+#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
@@ -357,7 +358,19 @@ void ServiceExecutorAdaptive::_startWorkerThread() {
_threadsPending.addAndFetch(1);
_threadsRunning.addAndFetch(1);
- it->thread = stdx::thread(&ServiceExecutorAdaptive::_workerThreadRoutine, this, num, it);
+
+ lk.unlock();
+
+ const auto launchResult =
+ launchServiceWorkerThread([this, num, it] { _workerThreadRoutine(num, it); });
+
+ if (!launchResult.isOK()) {
+ warning() << "Failed to launch new worker thread: " << launchResult;
+ lk.lock();
+ _threadsPending.subtractAndFetch(1);
+ _threadsRunning.subtractAndFetch(1);
+ _threads.erase(it);
+ }
}
Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const {
@@ -416,7 +429,7 @@ void ServiceExecutorAdaptive::_workerThreadRoutine(
setThreadName(threadName);
}
- log() << "Starting new database worker thread " << threadId;
+ log() << "Started new database worker thread " << threadId;
// Whether a thread is "pending" reflects whether its had a chance to do any useful work.
// When a thread is pending, it will only try to run one task through ASIO, and report back
@@ -433,7 +446,6 @@ void ServiceExecutorAdaptive::_workerThreadRoutine(
{
stdx::lock_guard<stdx::mutex> lk(_threadsMutex);
- state->thread.detach();
_threads.erase(state);
}
_deathCondition.notify_one();
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h
index beca37276e0..2163d97f3e5 100644
--- a/src/mongo/transport/service_executor_adaptive.h
+++ b/src/mongo/transport/service_executor_adaptive.h
@@ -168,7 +168,6 @@ private:
TickSource::Tick executingCurRun;
CumulativeTickTimer executing;
int recursionDepth = 0;
- stdx::thread thread;
};
using ThreadList = stdx::list<ThreadState>;