summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-12-07 20:52:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-10 18:43:34 +0000
commit059ac99e325648c29a4203a17782f5891c59f5ad (patch)
tree2117fc6caa4704fd2667f9de970607ad5f5624e7 /src/mongo
parent4b8f297ca8da46eab9fd668a2c3f4df70724c2ac (diff)
downloadmongo-059ac99e325648c29a4203a17782f5891c59f5ad.tar.gz
SERVER-53281 Make ServiceExecutorFixed join more reliably
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp142
-rw-r--r--src/mongo/transport/service_executor_fixed.h5
-rw-r--r--src/mongo/transport/service_executor_test.cpp96
3 files changed, 139 insertions, 104 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index b999e59072d..36af3056231 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -56,16 +56,23 @@ constexpr auto kClientsInTotal = "clientsInTotal"_sd;
constexpr auto kClientsRunning = "clientsRunning"_sd;
constexpr auto kClientsWaiting = "clientsWaitingForData"_sd;
-const auto getServiceExecutorFixed =
- ServiceContext::declareDecoration<std::shared_ptr<ServiceExecutorFixed>>();
+struct Handle {
+ ~Handle() {
+ if (ptr) {
+ ptr->join();
+ }
+ }
+
+ std::shared_ptr<ServiceExecutorFixed> ptr;
+};
+const auto getHandle = ServiceContext::declareDecoration<Handle>();
const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{
"ServiceExecutorFixed", [](ServiceContext* ctx) {
auto limits = ThreadPool::Limits{};
limits.minThreads = 0;
limits.maxThreads = fixedServiceExecutorThreadLimit;
- getServiceExecutorFixed(ctx) =
- std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits));
+ getHandle(ctx).ptr = std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits));
}};
} // namespace
@@ -134,28 +141,28 @@ ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limi
}
ServiceExecutorFixed::~ServiceExecutorFixed() {
- switch (_state) {
- case State::kNotStarted:
- return;
- case State::kRunning: {
- // We should not be running while in this destructor.
- MONGO_UNREACHABLE;
- }
- case State::kStopping:
- case State::kStopped: {
- // We can go ahead and attempt to join our thread pool.
- } break;
- default: { MONGO_UNREACHABLE; }
- }
+ join();
+}
+void ServiceExecutorFixed::join() noexcept {
LOGV2_DEBUG(4910502,
kDiagnosticLogLevel,
- "Shutting down pool for fixed thread-pool service executor",
+ "Joining fixed thread-pool service executor",
"name"_attr = _options.poolName);
- // We only can desturct when we have joined all of our tasks and canceled all of our sessions.
- // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are
- // stiil blocking. If so, we block until they finish here.
+ {
+ auto lk = stdx::unique_lock(_mutex);
+ _beginShutdown(lk);
+
+ _shutdownCondition.wait(lk, [this]() { return _state == State::kStopped; });
+ if (std::exchange(_isJoined, true)) {
+ return;
+ }
+ }
+
+ // We only can join when we have joined all of our tasks and canceled all of our sessions. This
+ // thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are stiil
+ // blocking. If so, we block until they finish here.
_threadPool->shutdown();
_threadPool->join();
@@ -198,7 +205,11 @@ Status ServiceExecutorFixed::start() {
}
auto tl = _svcCtx->getTransportLayer();
- invariant(tl);
+ if (!tl) {
+ // For some tests, we do not have a TransportLayer.
+ invariant(TestingProctor::instance().isEnabled());
+ return Status::OK();
+ }
auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
invariant(reactor);
@@ -221,9 +232,9 @@ Status ServiceExecutorFixed::start() {
}
ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorFixed(ctx);
- invariant(ref);
- return ref.get();
+ auto& handle = getHandle(ctx);
+ invariant(handle.ptr);
+ return handle.ptr.get();
}
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
@@ -234,59 +245,58 @@ Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
{
auto lk = stdx::unique_lock(_mutex);
-
- switch (_state) {
- case State::kNotStarted:
- case State::kRunning: {
- _state = State::kStopping;
-
- for (auto& waiter : _waiters) {
- // Cancel any session we own.
- waiter.session->cancelAsyncOperations();
- }
-
- // There may not be outstanding threads, check for shutdown now.
- _checkForShutdown(lk);
-
- if (_state == State::kStopped) {
- // We were able to become stopped immediately.
- return Status::OK();
- }
- } break;
- case State::kStopping: {
- // Just nead to wait it out.
- } break;
- case State::kStopped: {
- // Totally done.
- return Status::OK();
- } break;
- default: { MONGO_UNREACHABLE; }
+ _beginShutdown(lk);
+
+ // There is a world where we are able to simply do a timed wait upon a future chain.
+ // However, that world likely requires an OperationContext available through shutdown.
+ if (!_shutdownCondition.wait_for(
+ lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) {
+ return Status(ErrorCodes::ExceededTimeLimit,
+ "Failed to shutdown all executor threads within the time limit");
}
}
LOGV2_DEBUG(4910504,
kDiagnosticLogLevel,
- "Waiting for shutdown of fixed thread-pool service executor",
+ "Shutdown fixed thread-pool service executor",
"name"_attr = _options.poolName);
- // There is a world where we are able to simply do a timed wait upon a future chain. However,
- // that world likely requires an OperationContext available through shutdown.
- auto lk = stdx::unique_lock(_mutex);
- if (!_shutdownCondition.wait_for(
- lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) {
- return Status(ErrorCodes::ExceededTimeLimit,
- "Failed to shutdown all executor threads within the time limit");
- }
-
return Status::OK();
}
+void ServiceExecutorFixed::_beginShutdown(WithLock lk) {
+ switch (_state) {
+ case State::kNotStarted: {
+ invariant(_waiters.empty());
+ invariant(_tasksLeft() == 0);
+ _state = State::kStopped;
+ } break;
+ case State::kRunning: {
+ _state = State::kStopping;
+
+ for (auto& waiter : _waiters) {
+ // Cancel any session we own.
+ waiter.session->cancelAsyncOperations();
+ }
+
+ // There may not be outstanding threads, check for shutdown now.
+ _checkForShutdown(lk);
+ } break;
+ case State::kStopping: {
+ // Just nead to wait it out.
+ } break;
+ case State::kStopped: {
+ // Totally done.
+ } break;
+ default: { MONGO_UNREACHABLE; }
+ }
+}
+
void ServiceExecutorFixed::_checkForShutdown(WithLock) {
if (_state == State::kRunning) {
// We're actively running.
return;
}
- invariant(_state != State::kNotStarted);
if (!_waiters.empty()) {
// We still have some in wait.
@@ -321,7 +331,11 @@ void ServiceExecutorFixed::_checkForShutdown(WithLock) {
}
auto tl = _svcCtx->getTransportLayer();
- invariant(tl);
+ if (!tl) {
+ // For some tests, we do not have a TransportLayer.
+ invariant(TestingProctor::instance().isEnabled());
+ return;
+ }
auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
invariant(reactor);
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index 543edc2e8a7..96f46f6f163 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -70,6 +70,8 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
+ void join() noexcept;
+
Status scheduleTask(Task task, ScheduleFlags flags) override;
void schedule(OutOfLineExecutor::Task task) override {
_schedule(std::move(task));
@@ -96,8 +98,8 @@ private:
// Maintains the execution state (e.g., recursion depth) for executor threads
class ExecutorThreadContext;
-private:
void _checkForShutdown(WithLock);
+ void _beginShutdown(WithLock);
void _schedule(OutOfLineExecutor::Task task) noexcept;
auto _threadsRunning() const {
@@ -152,6 +154,7 @@ private:
* State transition diagram: kNotStarted ---> kRunning ---> kStopping ---> kStopped
*/
enum State { kNotStarted, kRunning, kStopping, kStopped } _state = kNotStarted;
+ bool _isJoined = false;
ThreadPool::Options _options;
std::shared_ptr<ThreadPool> _threadPool;
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 46deb4d00fd..6193a97d01d 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -169,38 +169,39 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) {
scheduleBasicTask(executor.get(), false);
}
-class ServiceExecutorFixedFixture : public ServiceContextTest {
+class ServiceExecutorFixedFixture : public unittest::Test {
public:
static constexpr auto kNumExecutorThreads = 2;
class ServiceExecutorHandle {
public:
- static constexpr int kNone = 0b00;
- static constexpr int kStartExecutor = 0b01;
- static constexpr int kSkipShutdown = 0b10;
-
- ServiceExecutorHandle() = delete;
ServiceExecutorHandle(const ServiceExecutorHandle&) = delete;
ServiceExecutorHandle(ServiceExecutorHandle&&) = delete;
-
- explicit ServiceExecutorHandle(int flags = kNone) : _skipShutdown(flags & kSkipShutdown) {
+ ServiceExecutorHandle() {
ThreadPool::Limits limits;
limits.minThreads = limits.maxThreads = kNumExecutorThreads;
_executor = std::make_shared<ServiceExecutorFixed>(std::move(limits));
-
- if (flags & kStartExecutor) {
- ASSERT_OK(_executor->start());
- }
}
~ServiceExecutorHandle() {
- if (_skipShutdown) {
- LOGV2(4987901, "Skipped shutting down the executor");
- } else {
- invariant(_executor->shutdown(kShutdownTime));
+ join();
+ }
+
+ void start() {
+ ASSERT_OK(_executor->start());
+ }
+
+ void join() {
+ if (auto exec = std::exchange(_executor, {})) {
+ ASSERT_OK(exec->shutdown(kShutdownTime));
+ exec->join();
}
}
+ void joinWithoutShutdown() {
+ _executor->join();
+ }
+
std::shared_ptr<ServiceExecutorFixed> operator->() noexcept {
return _executor;
}
@@ -210,27 +211,38 @@ public:
}
private:
- const bool _skipShutdown;
std::shared_ptr<ServiceExecutorFixed> _executor;
};
};
TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kSkipShutdown);
+ auto executorHandle = ServiceExecutorHandle();
ASSERT_NOT_OK(executorHandle->scheduleTask([] {}, ServiceExecutor::kEmptyFlags));
}
-DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") {
- FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart");
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor |
- ServiceExecutorHandle::kSkipShutdown);
- // The following ensures `executorHandle` holds the only reference to the service executor, thus
- // returning from this block would trigger destruction of the executor.
- failpoint->waitForTimesEntered(kNumExecutorThreads);
+TEST_F(ServiceExecutorFixedFixture, JoinWorksBeforeShutdown) {
+ auto executorHandle = ServiceExecutorHandle();
+
+ {
+ FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart");
+ executorHandle.start();
+
+ // The following ensures `executorHandle` holds the only reference to the service executor,
+ // thus returning from this block would trigger destruction of the executor.
+ failpoint->waitForTimesEntered(kNumExecutorThreads);
+ }
+
+ // Explicitly skip shutdown.
+ executorHandle.joinWithoutShutdown();
+
+ // Poke the executor to make sure it is shutdown.
+ ASSERT_NOT_OK(executorHandle->scheduleTask([] {}, ServiceExecutor::kEmptyFlags));
}
TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
+
auto barrier = std::make_shared<unittest::Barrier>(2);
ASSERT_OK(executorHandle->scheduleTask([barrier]() mutable { barrier->countDownAndWait(); },
ServiceExecutor::kEmptyFlags));
@@ -238,7 +250,9 @@ TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) {
}
TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
+
auto barrier = std::make_shared<unittest::Barrier>(2);
std::function<void()> recursiveTask;
@@ -258,7 +272,9 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
}
TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
+
auto barrier = std::make_shared<unittest::Barrier>(2);
AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3};
@@ -283,12 +299,14 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
}
TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
+
auto invoked = std::make_shared<SharedPromise<void>>();
auto mayReturn = std::make_shared<SharedPromise<void>>();
ASSERT_OK(executorHandle->scheduleTask(
- [executor = *executorHandle, invoked, mayReturn]() mutable {
+ [invoked, mayReturn]() mutable {
invoked->emplaceValue();
mayReturn->getFuture().get();
},
@@ -302,14 +320,14 @@ TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) {
}
TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
auto thread = stdx::thread();
auto barrier = std::make_shared<unittest::Barrier>(2);
{
FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask");
-
// The executor accepts the work, but hasn't used the underlying pool yet.
thread = stdx::thread([&] {
ASSERT_OK(executorHandle->scheduleTask([&, barrier] { barrier->countDownAndWait(); },
@@ -331,7 +349,8 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) {
}
TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
ASSERT_OK(executorHandle->shutdown(kShutdownTime));
ASSERT_NOT_OK(
@@ -342,7 +361,8 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) {
auto tl = std::make_unique<TransportLayerMock>();
auto session = tl->createSession();
- ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+ auto executorHandle = ServiceExecutorHandle();
+ executorHandle.start();
const auto mainThreadId = stdx::this_thread::get_id();
AtomicWord<bool> ranOnDataAvailable{false};
@@ -361,14 +381,12 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) {
}
TEST_F(ServiceExecutorFixedFixture, StartAndShutdownAreDeterministic) {
-
- std::unique_ptr<ServiceExecutorHandle> handle;
+ auto handle = ServiceExecutorHandle();
// Ensure starting the executor results in spawning the specified number of executor threads.
{
FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart");
- handle = std::make_unique<ServiceExecutorHandle>(ServiceExecutorHandle::kNone);
- ASSERT_OK((*handle)->start());
+ handle.start();
failpoint->waitForTimesEntered(kNumExecutorThreads);
}
@@ -379,7 +397,7 @@ TEST_F(ServiceExecutorFixedFixture, StartAndShutdownAreDeterministic) {
// Ensure all executor threads return after receiving the shutdown signal.
{
FailPointEnableBlock failpoint("hangBeforeServiceExecutorFixedLastExecutorThreadReturns");
- shutdownThread = stdx::thread{[handle = std::move(handle)]() mutable { handle.reset(); }};
+ shutdownThread = stdx::thread{[&]() mutable { handle.join(); }};
failpoint->waitForTimesEntered(1);
}
shutdownThread.join();