diff options
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 29 |
3 files changed, 51 insertions, 10 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index 1cd8c213afe..b9b5424872e 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -42,6 +42,8 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingServiceExecutorFixedTask); +MONGO_FAIL_POINT_DEFINE(hangAfterServiceExecutorFixedExecutorThreadsStart); +MONGO_FAIL_POINT_DEFINE(hangBeforeServiceExecutorFixedLastExecutorThreadReturns); namespace transport { namespace { @@ -187,5 +189,19 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { return _executorContext->getRecursionDepth(); } +ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext( + std::weak_ptr<ServiceExecutorFixed> serviceExecutor) + : _executor(std::move(serviceExecutor)) { + _adjustRunningExecutorThreads(1); + hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet(); +} + +ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() { + if (auto threadsRunning = _adjustRunningExecutorThreads(-1); + threadsRunning.has_value() && threadsRunning.value() == 0) { + hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet(); + } +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 4e6f3b4f9cc..a68994d9fb3 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -29,6 +29,7 @@ #pragma once +#include <boost/optional.hpp> #include <memory> #include "mongo/base/status.h" @@ -80,18 +81,12 @@ private: // Maintains the execution state (e.g., recursion depth) for executor threads class ExecutorThreadContext { public: - ExecutorThreadContext(std::weak_ptr<ServiceExecutorFixed> serviceExecutor) - : _executor(std::move(serviceExecutor)) { - _adjustRunningExecutorThreads(1); - } + ExecutorThreadContext(std::weak_ptr<ServiceExecutorFixed> serviceExecutor); + ~ExecutorThreadContext(); ExecutorThreadContext(ExecutorThreadContext&&) = delete; ExecutorThreadContext(const ExecutorThreadContext&) = delete; - ~ExecutorThreadContext() { - _adjustRunningExecutorThreads(-1); - } - void run(ServiceExecutor::Task task) { // Yield here to improve concurrency, especially when there are more executor threads // than CPU cores. @@ -106,10 +101,11 @@ private: } private: - void _adjustRunningExecutorThreads(int adjustment) { + boost::optional<int> _adjustRunningExecutorThreads(int adjustment) { if (auto executor = _executor.lock()) { - executor->_numRunningExecutorThreads.fetchAndAdd(adjustment); + return executor->_numRunningExecutorThreads.addAndFetch(adjustment); } + return boost::none; } int _recursionDepth = 0; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 747c18532da..4af83003930 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -221,8 +221,12 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) { } DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") { + FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor | ServiceExecutorHandle::kSkipShutdown); + // The following ensures `executorHandle` holds the only reference to the service executor, thus + // returning from this block would trigger destruction of the executor. + failpoint->waitForTimesEntered(kNumExecutorThreads); } TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { @@ -369,5 +373,30 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { ASSERT(ranOnDataAvailable.load()); } +TEST_F(ServiceExecutorFixedFixture, StartAndShutdownAreDeterministic) { + + std::unique_ptr<ServiceExecutorHandle> handle; + + // Ensure starting the executor results in spawning the specified number of executor threads. + { + FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); + handle = std::make_unique<ServiceExecutorHandle>(ServiceExecutorHandle::kNone); + ASSERT_OK((*handle)->start()); + failpoint->waitForTimesEntered(kNumExecutorThreads); + } + + // Since destroying ServiceExecutorFixed is blocking, spawn a thread to issue the destruction + // off of the main execution path. + stdx::thread shutdownThread; + + // Ensure all executor threads return after receiving the shutdown signal. + { + FailPointEnableBlock failpoint("hangBeforeServiceExecutorFixedLastExecutorThreadReturns"); + shutdownThread = stdx::thread{[handle = std::move(handle)]() mutable { handle.reset(); }}; + failpoint->waitForTimesEntered(1); + } + shutdownThread.join(); +} + } // namespace } // namespace mongo |