diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-12-07 20:52:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-10 18:43:34 +0000 |
commit | 059ac99e325648c29a4203a17782f5891c59f5ad (patch) | |
tree | 2117fc6caa4704fd2667f9de970607ad5f5624e7 /src/mongo/transport | |
parent | 4b8f297ca8da46eab9fd668a2c3f4df70724c2ac (diff) | |
download | mongo-059ac99e325648c29a4203a17782f5891c59f5ad.tar.gz |
SERVER-53281 Make ServiceExecutorFixed join more reliably
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 142 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 96 |
3 files changed, 139 insertions, 104 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index b999e59072d..36af3056231 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -56,16 +56,23 @@ constexpr auto kClientsInTotal = "clientsInTotal"_sd; constexpr auto kClientsRunning = "clientsRunning"_sd; constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; -const auto getServiceExecutorFixed = - ServiceContext::declareDecoration<std::shared_ptr<ServiceExecutorFixed>>(); +struct Handle { + ~Handle() { + if (ptr) { + ptr->join(); + } + } + + std::shared_ptr<ServiceExecutorFixed> ptr; +}; +const auto getHandle = ServiceContext::declareDecoration<Handle>(); const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ "ServiceExecutorFixed", [](ServiceContext* ctx) { auto limits = ThreadPool::Limits{}; limits.minThreads = 0; limits.maxThreads = fixedServiceExecutorThreadLimit; - getServiceExecutorFixed(ctx) = - std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits)); + getHandle(ctx).ptr = std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits)); }}; } // namespace @@ -134,28 +141,28 @@ ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limi } ServiceExecutorFixed::~ServiceExecutorFixed() { - switch (_state) { - case State::kNotStarted: - return; - case State::kRunning: { - // We should not be running while in this destructor. - MONGO_UNREACHABLE; - } - case State::kStopping: - case State::kStopped: { - // We can go ahead and attempt to join our thread pool. - } break; - default: { MONGO_UNREACHABLE; } - } + join(); +} +void ServiceExecutorFixed::join() noexcept { LOGV2_DEBUG(4910502, kDiagnosticLogLevel, - "Shutting down pool for fixed thread-pool service executor", + "Joining fixed thread-pool service executor", "name"_attr = _options.poolName); - // We only can desturct when we have joined all of our tasks and canceled all of our sessions. - // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are - // stiil blocking. If so, we block until they finish here. + { + auto lk = stdx::unique_lock(_mutex); + _beginShutdown(lk); + + _shutdownCondition.wait(lk, [this]() { return _state == State::kStopped; }); + if (std::exchange(_isJoined, true)) { + return; + } + } + + // We only can join when we have joined all of our tasks and canceled all of our sessions. This + // thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are stiil + // blocking. If so, we block until they finish here. _threadPool->shutdown(); _threadPool->join(); @@ -198,7 +205,11 @@ Status ServiceExecutorFixed::start() { } auto tl = _svcCtx->getTransportLayer(); - invariant(tl); + if (!tl) { + // For some tests, we do not have a TransportLayer. + invariant(TestingProctor::instance().isEnabled()); + return Status::OK(); + } auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); invariant(reactor); @@ -221,9 +232,9 @@ Status ServiceExecutorFixed::start() { } ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorFixed(ctx); - invariant(ref); - return ref.get(); + auto& handle = getHandle(ctx); + invariant(handle.ptr); + return handle.ptr.get(); } Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { @@ -234,59 +245,58 @@ Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { { auto lk = stdx::unique_lock(_mutex); - - switch (_state) { - case State::kNotStarted: - case State::kRunning: { - _state = State::kStopping; - - for (auto& waiter : _waiters) { - // Cancel any session we own. - waiter.session->cancelAsyncOperations(); - } - - // There may not be outstanding threads, check for shutdown now. - _checkForShutdown(lk); - - if (_state == State::kStopped) { - // We were able to become stopped immediately. - return Status::OK(); - } - } break; - case State::kStopping: { - // Just nead to wait it out. - } break; - case State::kStopped: { - // Totally done. - return Status::OK(); - } break; - default: { MONGO_UNREACHABLE; } + _beginShutdown(lk); + + // There is a world where we are able to simply do a timed wait upon a future chain. + // However, that world likely requires an OperationContext available through shutdown. + if (!_shutdownCondition.wait_for( + lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) { + return Status(ErrorCodes::ExceededTimeLimit, + "Failed to shutdown all executor threads within the time limit"); } } LOGV2_DEBUG(4910504, kDiagnosticLogLevel, - "Waiting for shutdown of fixed thread-pool service executor", + "Shutdown fixed thread-pool service executor", "name"_attr = _options.poolName); - // There is a world where we are able to simply do a timed wait upon a future chain. However, - // that world likely requires an OperationContext available through shutdown. - auto lk = stdx::unique_lock(_mutex); - if (!_shutdownCondition.wait_for( - lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) { - return Status(ErrorCodes::ExceededTimeLimit, - "Failed to shutdown all executor threads within the time limit"); - } - return Status::OK(); } +void ServiceExecutorFixed::_beginShutdown(WithLock lk) { + switch (_state) { + case State::kNotStarted: { + invariant(_waiters.empty()); + invariant(_tasksLeft() == 0); + _state = State::kStopped; + } break; + case State::kRunning: { + _state = State::kStopping; + + for (auto& waiter : _waiters) { + // Cancel any session we own. + waiter.session->cancelAsyncOperations(); + } + + // There may not be outstanding threads, check for shutdown now. + _checkForShutdown(lk); + } break; + case State::kStopping: { + // Just nead to wait it out. + } break; + case State::kStopped: { + // Totally done. + } break; + default: { MONGO_UNREACHABLE; } + } +} + void ServiceExecutorFixed::_checkForShutdown(WithLock) { if (_state == State::kRunning) { // We're actively running. return; } - invariant(_state != State::kNotStarted); if (!_waiters.empty()) { // We still have some in wait. @@ -321,7 +331,11 @@ void ServiceExecutorFixed::_checkForShutdown(WithLock) { } auto tl = _svcCtx->getTransportLayer(); - invariant(tl); + if (!tl) { + // For some tests, we do not have a TransportLayer. + invariant(TestingProctor::instance().isEnabled()); + return; + } auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); invariant(reactor); diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 543edc2e8a7..96f46f6f163 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -70,6 +70,8 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; + void join() noexcept; + Status scheduleTask(Task task, ScheduleFlags flags) override; void schedule(OutOfLineExecutor::Task task) override { _schedule(std::move(task)); @@ -96,8 +98,8 @@ private: // Maintains the execution state (e.g., recursion depth) for executor threads class ExecutorThreadContext; -private: void _checkForShutdown(WithLock); + void _beginShutdown(WithLock); void _schedule(OutOfLineExecutor::Task task) noexcept; auto _threadsRunning() const { @@ -152,6 +154,7 @@ private: * State transition diagram: kNotStarted ---> kRunning ---> kStopping ---> kStopped */ enum State { kNotStarted, kRunning, kStopping, kStopped } _state = kNotStarted; + bool _isJoined = false; ThreadPool::Options _options; std::shared_ptr<ThreadPool> _threadPool; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 46deb4d00fd..6193a97d01d 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -169,38 +169,39 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) { scheduleBasicTask(executor.get(), false); } -class ServiceExecutorFixedFixture : public ServiceContextTest { +class ServiceExecutorFixedFixture : public unittest::Test { public: static constexpr auto kNumExecutorThreads = 2; class ServiceExecutorHandle { public: - static constexpr int kNone = 0b00; - static constexpr int kStartExecutor = 0b01; - static constexpr int kSkipShutdown = 0b10; - - ServiceExecutorHandle() = delete; ServiceExecutorHandle(const ServiceExecutorHandle&) = delete; ServiceExecutorHandle(ServiceExecutorHandle&&) = delete; - - explicit ServiceExecutorHandle(int flags = kNone) : _skipShutdown(flags & kSkipShutdown) { + ServiceExecutorHandle() { ThreadPool::Limits limits; limits.minThreads = limits.maxThreads = kNumExecutorThreads; _executor = std::make_shared<ServiceExecutorFixed>(std::move(limits)); - - if (flags & kStartExecutor) { - ASSERT_OK(_executor->start()); - } } ~ServiceExecutorHandle() { - if (_skipShutdown) { - LOGV2(4987901, "Skipped shutting down the executor"); - } else { - invariant(_executor->shutdown(kShutdownTime)); + join(); + } + + void start() { + ASSERT_OK(_executor->start()); + } + + void join() { + if (auto exec = std::exchange(_executor, {})) { + ASSERT_OK(exec->shutdown(kShutdownTime)); + exec->join(); } } + void joinWithoutShutdown() { + _executor->join(); + } + std::shared_ptr<ServiceExecutorFixed> operator->() noexcept { return _executor; } @@ -210,27 +211,38 @@ public: } private: - const bool _skipShutdown; std::shared_ptr<ServiceExecutorFixed> _executor; }; }; TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kSkipShutdown); + auto executorHandle = ServiceExecutorHandle(); ASSERT_NOT_OK(executorHandle->scheduleTask([] {}, ServiceExecutor::kEmptyFlags)); } -DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") { - FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor | - ServiceExecutorHandle::kSkipShutdown); - // The following ensures `executorHandle` holds the only reference to the service executor, thus - // returning from this block would trigger destruction of the executor. - failpoint->waitForTimesEntered(kNumExecutorThreads); +TEST_F(ServiceExecutorFixedFixture, JoinWorksBeforeShutdown) { + auto executorHandle = ServiceExecutorHandle(); + + { + FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); + executorHandle.start(); + + // The following ensures `executorHandle` holds the only reference to the service executor, + // thus returning from this block would trigger destruction of the executor. + failpoint->waitForTimesEntered(kNumExecutorThreads); + } + + // Explicitly skip shutdown. + executorHandle.joinWithoutShutdown(); + + // Poke the executor to make sure it is shutdown. + ASSERT_NOT_OK(executorHandle->scheduleTask([] {}, ServiceExecutor::kEmptyFlags)); } TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); + auto barrier = std::make_shared<unittest::Barrier>(2); ASSERT_OK(executorHandle->scheduleTask([barrier]() mutable { barrier->countDownAndWait(); }, ServiceExecutor::kEmptyFlags)); @@ -238,7 +250,9 @@ TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { } TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); + auto barrier = std::make_shared<unittest::Barrier>(2); std::function<void()> recursiveTask; @@ -258,7 +272,9 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { } TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); + auto barrier = std::make_shared<unittest::Barrier>(2); AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3}; @@ -283,12 +299,14 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { } TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); + auto invoked = std::make_shared<SharedPromise<void>>(); auto mayReturn = std::make_shared<SharedPromise<void>>(); ASSERT_OK(executorHandle->scheduleTask( - [executor = *executorHandle, invoked, mayReturn]() mutable { + [invoked, mayReturn]() mutable { invoked->emplaceValue(); mayReturn->getFuture().get(); }, @@ -302,14 +320,14 @@ TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { } TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); auto thread = stdx::thread(); auto barrier = std::make_shared<unittest::Barrier>(2); { FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask"); - // The executor accepts the work, but hasn't used the underlying pool yet. thread = stdx::thread([&] { ASSERT_OK(executorHandle->scheduleTask([&, barrier] { barrier->countDownAndWait(); }, @@ -331,7 +349,8 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) { } TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); ASSERT_OK(executorHandle->shutdown(kShutdownTime)); ASSERT_NOT_OK( @@ -342,7 +361,8 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { auto tl = std::make_unique<TransportLayerMock>(); auto session = tl->createSession(); - ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + auto executorHandle = ServiceExecutorHandle(); + executorHandle.start(); const auto mainThreadId = stdx::this_thread::get_id(); AtomicWord<bool> ranOnDataAvailable{false}; @@ -361,14 +381,12 @@ TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { } TEST_F(ServiceExecutorFixedFixture, StartAndShutdownAreDeterministic) { - - std::unique_ptr<ServiceExecutorHandle> handle; + auto handle = ServiceExecutorHandle(); // Ensure starting the executor results in spawning the specified number of executor threads. { FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); - handle = std::make_unique<ServiceExecutorHandle>(ServiceExecutorHandle::kNone); - ASSERT_OK((*handle)->start()); + handle.start(); failpoint->waitForTimesEntered(kNumExecutorThreads); } @@ -379,7 +397,7 @@ TEST_F(ServiceExecutorFixedFixture, StartAndShutdownAreDeterministic) { // Ensure all executor threads return after receiving the shutdown signal. { FailPointEnableBlock failpoint("hangBeforeServiceExecutorFixedLastExecutorThreadReturns"); - shutdownThread = stdx::thread{[handle = std::move(handle)]() mutable { handle.reset(); }}; + shutdownThread = stdx::thread{[&]() mutable { handle.join(); }}; failpoint->waitForTimesEntered(1); } shutdownThread.join(); |