diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-05-07 18:17:41 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-01 19:34:36 +0000 |
commit | 7f491a6b9f16fca3b919e2d185bdcba09dcb58e2 (patch) | |
tree | e1810a8c73040f58f836c5b9dd025f9775ebb7e9 /src | |
parent | 41cff016562656589bd3a3c114d6dfb31dd3271f (diff) | |
download | mongo-7f491a6b9f16fca3b919e2d185bdcba09dcb58e2.tar.gz |
SERVER-47946 End TopologyVersionObserver loop on replication shutdown
This patch also introduces new failpoints to better synchronize the
TopologyVersionObserverTest.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer_test.cpp | 71 |
4 files changed, 107 insertions, 37 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 31ec95bd5f4..70fdbe63ae5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -929,6 +929,8 @@ void ReplicationCoordinatorImpl::enterTerminalShutdown() { } bool ReplicationCoordinatorImpl::enterQuiesceModeIfSecondary() { + LOGV2_INFO(4794602, "Attempting to enter quiesce mode"); + stdx::lock_guard lk(_mutex); if (!_memberState.secondary()) { diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp index 7f86c1ae6b9..4821dd452f4 100644 --- a/src/mongo/db/repl/topology_version_observer.cpp +++ b/src/mongo/db/repl/topology_version_observer.cpp @@ -36,10 +36,14 @@ #include "mongo/db/operation_context.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" +#include "mongo/util/fail_point.h" namespace mongo { namespace repl { +MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsInterruption); +MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsShutdown); + void TopologyVersionObserver::init(ServiceContext* serviceContext, ReplicationCoordinator* replCoordinator) noexcept { LOGV2_INFO(40440, "Starting the TopologyVersionObserver"); @@ -116,9 +120,11 @@ std::string TopologyVersionObserver::toString() const { } void TopologyVersionObserver::_cacheIsMasterResponse( - OperationContext* opCtx, boost::optional<TopologyVersion> topologyVersion) noexcept try { + OperationContext* opCtx, boost::optional<TopologyVersion> topologyVersion) try { invariant(opCtx); + LOGV2_DEBUG(4794600, 3, "Waiting for a topology change"); + { auto cacheGuard = makeGuard([&] { // If we're not dismissed, reset the _cache. @@ -143,23 +149,22 @@ void TopologyVersionObserver::_cacheIsMasterResponse( return; } + LOGV2_DEBUG(4794601, 3, "Observed a topology change"); + // We could be a PeriodicRunner::Job someday. For now, OperationContext::sleepFor() will serve // the same purpose. opCtx->sleepFor(kDelayMS); -} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { - LOGV2_DEBUG(40443, - 1, - "Observer was interrupted by {error}", - "Observer was interrupted by an exception", - "error"_attr = e.toString()); -} catch (DBException& e) { - LOGV2_WARNING(40444, - "Observer could not retrieve isMasterResponse: {error}", - "Observer could not retrieve isMasterResponse", - "error"_attr = e.toString()); +} catch (const DBException& e) { + if (ErrorCodes::isShutdownError(e)) { + // Rethrow if we've experienced shutdown. + throw; + } + + LOGV2_WARNING( + 40444, "Observer could not retrieve isMasterResponse", "error"_attr = e.toString()); } -void TopologyVersionObserver::_workerThreadBody() noexcept { +void TopologyVersionObserver::_workerThreadBody() noexcept try { invariant(_serviceContext); ThreadClient tc(kTopologyVersionObserverName, _serviceContext); @@ -203,22 +208,34 @@ void TopologyVersionObserver::_workerThreadBody() noexcept { } LOGV2_INFO(40447, "Stopped TopologyVersionObserver"); + + // Pause here to confirm that we do not depend upon shutdown() being invoked for + // isShutdown() to be true. + topologyVersionObserverExpectsShutdown.pauseWhileSet(); }); - stdx::unique_lock lk(_mutex); while (!_shouldShutdown.load()) { auto opCtxHandle = tc->makeOperationContext(); - // Set the _workerOpCtx to our newly formed opCtxHandle before we unlock. - _workerOpCtx = opCtxHandle.get(); + { + // Set the _workerOpCtx to our newly formed opCtxHandle before we unlock. + stdx::lock_guard lk(_mutex); + _workerOpCtx = opCtxHandle.get(); + } - lk.unlock(); - _cacheIsMasterResponse(opCtxHandle.get(), getTopologyVersion()); - lk.lock(); + ON_BLOCK_EXIT([&] { + // We're done with our opCtxHandle, unset _workerOpCtx. + stdx::lock_guard lk(_mutex); + _workerOpCtx = nullptr; + }); + + // Pause here so that we can force there to be an opCtx to be interrupted. + topologyVersionObserverExpectsInterruption.pauseWhileSet(); - // We're done with our opCtxHandle, unset _workerOpCtx. - _workerOpCtx = nullptr; + _cacheIsMasterResponse(opCtxHandle.get(), getTopologyVersion()); } +} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { + LOGV2_DEBUG(40443, 3, "Observer thread stopped due to shutdown", "error"_attr = e.toString()); } } // namespace repl diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h index 66872d2a087..ff5c2e8564f 100644 --- a/src/mongo/db/repl/topology_version_observer.h +++ b/src/mongo/db/repl/topology_version_observer.h @@ -93,6 +93,16 @@ public: std::string toString() const; + /** + * Returns true if this TopologyVersionObserver background thread has stopped. + * + * Note that this funtion only returns true after _thread has started and ended, thus implies + * that getCached() will never return a valid IsMasterResponse again. + */ + bool isShutdown() const noexcept { + return _state.loadRelaxed() == State::kShutdown; + } + private: enum class State { kUninitialized, @@ -100,7 +110,7 @@ private: kShutdown, }; - void _cacheIsMasterResponse(OperationContext*, boost::optional<TopologyVersion>) noexcept; + void _cacheIsMasterResponse(OperationContext*, boost::optional<TopologyVersion>); void _workerThreadBody() noexcept; diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp index e9e03d52f42..a3ba3717eac 100644 --- a/src/mongo/db/repl/topology_version_observer_test.cpp +++ b/src/mongo/db/repl/topology_version_observer_test.cpp @@ -45,6 +45,7 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/clock_source.h" +#include "mongo/util/fail_point.h" #include "mongo/util/time_support.h" namespace mongo { @@ -71,7 +72,7 @@ protected: } public: - TopologyVersionObserverTest() { + virtual void setUp() { auto configObj = getConfigObj(); assertStartSuccess(configObj, HostAndPort("node1", 12345)); ReplSetConfig config = assertMakeRSConfig(configObj); @@ -92,8 +93,10 @@ public: observer->init(serviceContext, replCoord); } - ~TopologyVersionObserverTest() { + virtual void tearDown() { observer->shutdown(); + ASSERT(observer->isShutdown()); + observer.reset(); } auto getObserverCache() { @@ -172,24 +175,29 @@ TEST_F(TopologyVersionObserverTest, HandleDBException) { // The client should not go out-of-scope as it is attached to the observer thread. ASSERT(observerClient); - ClockSource::StopWatch timer; - constexpr auto maxWait = Seconds(10); - - // Kill the operation waiting on the `isMaster` future to make it throw - bool wasAbleToKillOpCtx = false; - while (!wasAbleToKillOpCtx) { - if (timer.elapsed() > maxWait) { - FAIL(str::stream() << "Timed out while waiting for the observer to create OpCtx."); - } - + auto tryKillOperation = [&] { stdx::lock_guard clientLock(*observerClient); + if (auto opCtx = observerClient->getOperationContext()) { observerClient->getServiceContext()->killOperation(clientLock, opCtx); - wasAbleToKillOpCtx = true; - continue; + return true; } - sleepFor(sleepTime); + return false; + }; + + { + // Set the failpoint here so that if there is no opCtx we catch the next one. + FailPointEnableBlock failBlock("topologyVersionObserverExpectsInterruption"); + + // Kill the operation waiting on the `isMaster` future to make it throw + if (!tryKillOperation()) { + // If we weren't able to kill, then block until there is an opCtx again. + failBlock->waitForTimesEntered(failBlock.initialTimesEntered() + 1); + + // Try again to kill now that we've waited for the failpoint. + ASSERT(tryKillOperation()) << "Unable to acquire and kill observer OpCtx"; + } } // Observer thread must handle the exception and fetch the most recent IMR @@ -198,6 +206,39 @@ TEST_F(TopologyVersionObserverTest, HandleDBException) { cachedResponse->getTopologyVersion()->getCounter()); } +TEST_F(TopologyVersionObserverTest, HandleQuiesceMode) { + // Start out as a secondary to transition to quiesce mode easily. + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + auto cachedResponse = getObserverCache(); + ASSERT(cachedResponse); + + // Set a failpoint so we can observe the background thread shutting down. + FailPointEnableBlock failBlock("topologyVersionObserverExpectsShutdown"); + + { + // Enter quiesce mode in the replication coordinator to make shutdown errors come from + // awaitIsMasterResponseFuture()/getIsMasterResponseFuture(). + auto opCtx = makeOperationContext(); + getReplCoord()->enterQuiesceModeIfSecondary(); + + getNet()->enterNetwork(); + getNet()->advanceTime(getNet()->now() + sleepTime); + getNet()->exitNetwork(); + + ASSERT_THROWS_CODE(replCoord->getIsMasterResponseFuture({}, boost::none).get(opCtx.get()), + AssertionException, + ErrorCodes::ShutdownInProgress); + } + + // Wait for the background thread to fully shutdown. + failBlock->waitForTimesEntered(failBlock.initialTimesEntered() + 1); + + // In quiescence, the observer should be shutdown and have nothing in cache. + ASSERT(!observer->getCached()); + ASSERT(observer->isShutdown()); +} + } // namespace } // namespace repl } // namespace mongo |