summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-05-07 18:17:41 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-01 19:34:36 +0000
commit7f491a6b9f16fca3b919e2d185bdcba09dcb58e2 (patch)
treee1810a8c73040f58f836c5b9dd025f9775ebb7e9 /src
parent41cff016562656589bd3a3c114d6dfb31dd3271f (diff)
downloadmongo-7f491a6b9f16fca3b919e2d185bdcba09dcb58e2.tar.gz
SERVER-47946 End TopologyVersionObserver loop on replication shutdown
This patch also introduces new failpoints to better synchronize the TopologyVersionObserverTest.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/topology_version_observer.cpp59
-rw-r--r--src/mongo/db/repl/topology_version_observer.h12
-rw-r--r--src/mongo/db/repl/topology_version_observer_test.cpp71
4 files changed, 107 insertions, 37 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 31ec95bd5f4..70fdbe63ae5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -929,6 +929,8 @@ void ReplicationCoordinatorImpl::enterTerminalShutdown() {
}
bool ReplicationCoordinatorImpl::enterQuiesceModeIfSecondary() {
+ LOGV2_INFO(4794602, "Attempting to enter quiesce mode");
+
stdx::lock_guard lk(_mutex);
if (!_memberState.secondary()) {
diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp
index 7f86c1ae6b9..4821dd452f4 100644
--- a/src/mongo/db/repl/topology_version_observer.cpp
+++ b/src/mongo/db/repl/topology_version_observer.cpp
@@ -36,10 +36,14 @@
#include "mongo/db/operation_context.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace repl {
+MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsInterruption);
+MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsShutdown);
+
void TopologyVersionObserver::init(ServiceContext* serviceContext,
ReplicationCoordinator* replCoordinator) noexcept {
LOGV2_INFO(40440, "Starting the TopologyVersionObserver");
@@ -116,9 +120,11 @@ std::string TopologyVersionObserver::toString() const {
}
void TopologyVersionObserver::_cacheIsMasterResponse(
- OperationContext* opCtx, boost::optional<TopologyVersion> topologyVersion) noexcept try {
+ OperationContext* opCtx, boost::optional<TopologyVersion> topologyVersion) try {
invariant(opCtx);
+ LOGV2_DEBUG(4794600, 3, "Waiting for a topology change");
+
{
auto cacheGuard = makeGuard([&] {
// If we're not dismissed, reset the _cache.
@@ -143,23 +149,22 @@ void TopologyVersionObserver::_cacheIsMasterResponse(
return;
}
+ LOGV2_DEBUG(4794601, 3, "Observed a topology change");
+
// We could be a PeriodicRunner::Job someday. For now, OperationContext::sleepFor() will serve
// the same purpose.
opCtx->sleepFor(kDelayMS);
-} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
- LOGV2_DEBUG(40443,
- 1,
- "Observer was interrupted by {error}",
- "Observer was interrupted by an exception",
- "error"_attr = e.toString());
-} catch (DBException& e) {
- LOGV2_WARNING(40444,
- "Observer could not retrieve isMasterResponse: {error}",
- "Observer could not retrieve isMasterResponse",
- "error"_attr = e.toString());
+} catch (const DBException& e) {
+ if (ErrorCodes::isShutdownError(e)) {
+ // Rethrow if we've experienced shutdown.
+ throw;
+ }
+
+ LOGV2_WARNING(
+ 40444, "Observer could not retrieve isMasterResponse", "error"_attr = e.toString());
}
-void TopologyVersionObserver::_workerThreadBody() noexcept {
+void TopologyVersionObserver::_workerThreadBody() noexcept try {
invariant(_serviceContext);
ThreadClient tc(kTopologyVersionObserverName, _serviceContext);
@@ -203,22 +208,34 @@ void TopologyVersionObserver::_workerThreadBody() noexcept {
}
LOGV2_INFO(40447, "Stopped TopologyVersionObserver");
+
+ // Pause here to confirm that we do not depend upon shutdown() being invoked for
+ // isShutdown() to be true.
+ topologyVersionObserverExpectsShutdown.pauseWhileSet();
});
- stdx::unique_lock lk(_mutex);
while (!_shouldShutdown.load()) {
auto opCtxHandle = tc->makeOperationContext();
- // Set the _workerOpCtx to our newly formed opCtxHandle before we unlock.
- _workerOpCtx = opCtxHandle.get();
+ {
+ // Set the _workerOpCtx to our newly formed opCtxHandle before we unlock.
+ stdx::lock_guard lk(_mutex);
+ _workerOpCtx = opCtxHandle.get();
+ }
- lk.unlock();
- _cacheIsMasterResponse(opCtxHandle.get(), getTopologyVersion());
- lk.lock();
+ ON_BLOCK_EXIT([&] {
+ // We're done with our opCtxHandle, unset _workerOpCtx.
+ stdx::lock_guard lk(_mutex);
+ _workerOpCtx = nullptr;
+ });
+
+ // Pause here so that we can force there to be an opCtx to be interrupted.
+ topologyVersionObserverExpectsInterruption.pauseWhileSet();
- // We're done with our opCtxHandle, unset _workerOpCtx.
- _workerOpCtx = nullptr;
+ _cacheIsMasterResponse(opCtxHandle.get(), getTopologyVersion());
}
+} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2_DEBUG(40443, 3, "Observer thread stopped due to shutdown", "error"_attr = e.toString());
}
} // namespace repl
diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h
index 66872d2a087..ff5c2e8564f 100644
--- a/src/mongo/db/repl/topology_version_observer.h
+++ b/src/mongo/db/repl/topology_version_observer.h
@@ -93,6 +93,16 @@ public:
std::string toString() const;
+ /**
+ * Returns true if this TopologyVersionObserver background thread has stopped.
+ *
+ * Note that this funtion only returns true after _thread has started and ended, thus implies
+ * that getCached() will never return a valid IsMasterResponse again.
+ */
+ bool isShutdown() const noexcept {
+ return _state.loadRelaxed() == State::kShutdown;
+ }
+
private:
enum class State {
kUninitialized,
@@ -100,7 +110,7 @@ private:
kShutdown,
};
- void _cacheIsMasterResponse(OperationContext*, boost::optional<TopologyVersion>) noexcept;
+ void _cacheIsMasterResponse(OperationContext*, boost::optional<TopologyVersion>);
void _workerThreadBody() noexcept;
diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp
index e9e03d52f42..a3ba3717eac 100644
--- a/src/mongo/db/repl/topology_version_observer_test.cpp
+++ b/src/mongo/db/repl/topology_version_observer_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -71,7 +72,7 @@ protected:
}
public:
- TopologyVersionObserverTest() {
+ virtual void setUp() {
auto configObj = getConfigObj();
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
@@ -92,8 +93,10 @@ public:
observer->init(serviceContext, replCoord);
}
- ~TopologyVersionObserverTest() {
+ virtual void tearDown() {
observer->shutdown();
+ ASSERT(observer->isShutdown());
+ observer.reset();
}
auto getObserverCache() {
@@ -172,24 +175,29 @@ TEST_F(TopologyVersionObserverTest, HandleDBException) {
// The client should not go out-of-scope as it is attached to the observer thread.
ASSERT(observerClient);
- ClockSource::StopWatch timer;
- constexpr auto maxWait = Seconds(10);
-
- // Kill the operation waiting on the `isMaster` future to make it throw
- bool wasAbleToKillOpCtx = false;
- while (!wasAbleToKillOpCtx) {
- if (timer.elapsed() > maxWait) {
- FAIL(str::stream() << "Timed out while waiting for the observer to create OpCtx.");
- }
-
+ auto tryKillOperation = [&] {
stdx::lock_guard clientLock(*observerClient);
+
if (auto opCtx = observerClient->getOperationContext()) {
observerClient->getServiceContext()->killOperation(clientLock, opCtx);
- wasAbleToKillOpCtx = true;
- continue;
+ return true;
}
- sleepFor(sleepTime);
+ return false;
+ };
+
+ {
+ // Set the failpoint here so that if there is no opCtx we catch the next one.
+ FailPointEnableBlock failBlock("topologyVersionObserverExpectsInterruption");
+
+ // Kill the operation waiting on the `isMaster` future to make it throw
+ if (!tryKillOperation()) {
+ // If we weren't able to kill, then block until there is an opCtx again.
+ failBlock->waitForTimesEntered(failBlock.initialTimesEntered() + 1);
+
+ // Try again to kill now that we've waited for the failpoint.
+ ASSERT(tryKillOperation()) << "Unable to acquire and kill observer OpCtx";
+ }
}
// Observer thread must handle the exception and fetch the most recent IMR
@@ -198,6 +206,39 @@ TEST_F(TopologyVersionObserverTest, HandleDBException) {
cachedResponse->getTopologyVersion()->getCounter());
}
+TEST_F(TopologyVersionObserverTest, HandleQuiesceMode) {
+ // Start out as a secondary to transition to quiesce mode easily.
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+
+ auto cachedResponse = getObserverCache();
+ ASSERT(cachedResponse);
+
+ // Set a failpoint so we can observe the background thread shutting down.
+ FailPointEnableBlock failBlock("topologyVersionObserverExpectsShutdown");
+
+ {
+ // Enter quiesce mode in the replication coordinator to make shutdown errors come from
+ // awaitIsMasterResponseFuture()/getIsMasterResponseFuture().
+ auto opCtx = makeOperationContext();
+ getReplCoord()->enterQuiesceModeIfSecondary();
+
+ getNet()->enterNetwork();
+ getNet()->advanceTime(getNet()->now() + sleepTime);
+ getNet()->exitNetwork();
+
+ ASSERT_THROWS_CODE(replCoord->getIsMasterResponseFuture({}, boost::none).get(opCtx.get()),
+ AssertionException,
+ ErrorCodes::ShutdownInProgress);
+ }
+
+ // Wait for the background thread to fully shutdown.
+ failBlock->waitForTimesEntered(failBlock.initialTimesEntered() + 1);
+
+ // In quiescence, the observer should be shutdown and have nothing in cache.
+ ASSERT(!observer->getCached());
+ ASSERT(observer->isShutdown());
+}
+
} // namespace
} // namespace repl
} // namespace mongo