diff options
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer_test.cpp | 45 |
2 files changed, 55 insertions, 1 deletions
diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp index 27fdbafadac..877466bf7f6 100644 --- a/src/mongo/db/repl/topology_version_observer.cpp +++ b/src/mongo/db/repl/topology_version_observer.cpp @@ -43,6 +43,8 @@ namespace repl { MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsInterruption); MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsShutdown); +MONGO_FAIL_POINT_DEFINE(topologyVersionObserverBeforeCheckingForShutdown); +MONGO_FAIL_POINT_DEFINE(topologyVersionObserverShutdownShouldWait); void TopologyVersionObserver::init(ServiceContext* serviceContext, ReplicationCoordinator* replCoordinator) noexcept { @@ -87,6 +89,7 @@ void TopologyVersionObserver::shutdown() noexcept { _serviceContext->killOperation(clientLk, _workerOpCtx, ErrorCodes::ShutdownInProgress); } + topologyVersionObserverShutdownShouldWait.pauseWhileSet(); _cv.wait(lk, [&] { return _state.load() != State::kRunning; }); invariant(_state.load() == State::kShutdown); @@ -197,6 +200,7 @@ void TopologyVersionObserver::_workerThreadBody() noexcept try { { stdx::lock_guard lk(_mutex); invariant(_state.load() == State::kRunning); + invariant(_workerOpCtx == nullptr); _state.store(State::kShutdown); // Invalidate the cache as it is no longer updated @@ -213,12 +217,17 @@ void TopologyVersionObserver::_workerThreadBody() noexcept try { topologyVersionObserverExpectsShutdown.pauseWhileSet(); }); - while (!_shouldShutdown.load()) { + while (true) { auto opCtxHandle = tc->makeOperationContext(); + topologyVersionObserverBeforeCheckingForShutdown.pauseWhileSet(); { // Set the _workerOpCtx to our newly formed opCtxHandle before we unlock. stdx::lock_guard lk(_mutex); + // Checking `_shouldShutdown` under the lock is necessary to ensure the shutdown + // method can interrupt the new operation. + if (_shouldShutdown.load()) + break; _workerOpCtx = opCtxHandle.get(); } diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp index 58c41718428..6d7d54d57b8 100644 --- a/src/mongo/db/repl/topology_version_observer_test.cpp +++ b/src/mongo/db/repl/topology_version_observer_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/topology_version_observer.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/clock_source.h" @@ -239,6 +240,50 @@ TEST_F(TopologyVersionObserverTest, HandleQuiesceMode) { ASSERT(observer->isShutdown()); } +class TopologyVersionObserverInterruptedTest : public TopologyVersionObserverTest { +public: + void setUp() override { + auto configObj = getConfigObj(); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + } + + void tearDown() override {} +}; + +TEST_F(TopologyVersionObserverInterruptedTest, ShutdownAlwaysInterruptsWorkerOperation) { + + std::unique_ptr<TopologyVersionObserver> observer; + unittest::Barrier b1(2), b2(2); + boost::optional<stdx::thread> observerThread; + boost::optional<stdx::thread> blockerThread; + { + FailPointEnableBlock workerFailBlock("topologyVersionObserverBeforeCheckingForShutdown"); + + observer = std::make_unique<TopologyVersionObserver>(); + observer->init(getServiceContext(), getReplCoord()); + + workerFailBlock->waitForTimesEntered(workerFailBlock.initialTimesEntered() + 1); + blockerThread = stdx::thread([&] { + FailPointEnableBlock requestFailBlock("topologyVersionObserverExpectsInterruption"); + b1.countDownAndWait(); + // Keeps the failpoint enabled until it receives a signal from themain thread. + b2.countDownAndWait(); + }); + b1.countDownAndWait(); // Wait for blocker thread to enable thefailpoint + { + FailPointEnableBlock shutdownFailBlock("topologyVersionObserverShutdownShouldWait"); + observerThread = stdx::thread([&] { observer->shutdown(); }); + + shutdownFailBlock->waitForTimesEntered(shutdownFailBlock.initialTimesEntered() + 1); + } + } + observerThread->join(); + b2.countDownAndWait(); // Unblock the blocker thread so that it can join + blockerThread->join(); + + ASSERT(observer->isShutdown()); +} + } // namespace } // namespace repl } // namespace mongo |