summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuis Osta <luis.osta@mongodb.com>2021-05-03 15:03:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-03 17:01:39 +0000
commit8146418360d299d9398696bbb1eac8deeb436f3a (patch)
treeeca7531d9e61e05529dbd3cf2fc9b92d9b4648df
parent44ca57b15d7b94da9e2201a9135982ded21be0ce (diff)
downloadmongo-8146418360d299d9398696bbb1eac8deeb436f3a.tar.gz
SERVER-54671 Ensure TopologyVersionObserver is always interrupted at shutdown
-rw-r--r--src/mongo/db/repl/topology_version_observer.cpp11
-rw-r--r--src/mongo/db/repl/topology_version_observer_test.cpp45
2 files changed, 55 insertions, 1 deletions
diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp
index 27fdbafadac..877466bf7f6 100644
--- a/src/mongo/db/repl/topology_version_observer.cpp
+++ b/src/mongo/db/repl/topology_version_observer.cpp
@@ -43,6 +43,8 @@ namespace repl {
MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsInterruption);
MONGO_FAIL_POINT_DEFINE(topologyVersionObserverExpectsShutdown);
+MONGO_FAIL_POINT_DEFINE(topologyVersionObserverBeforeCheckingForShutdown);
+MONGO_FAIL_POINT_DEFINE(topologyVersionObserverShutdownShouldWait);
void TopologyVersionObserver::init(ServiceContext* serviceContext,
ReplicationCoordinator* replCoordinator) noexcept {
@@ -87,6 +89,7 @@ void TopologyVersionObserver::shutdown() noexcept {
_serviceContext->killOperation(clientLk, _workerOpCtx, ErrorCodes::ShutdownInProgress);
}
+ topologyVersionObserverShutdownShouldWait.pauseWhileSet();
_cv.wait(lk, [&] { return _state.load() != State::kRunning; });
invariant(_state.load() == State::kShutdown);
@@ -197,6 +200,7 @@ void TopologyVersionObserver::_workerThreadBody() noexcept try {
{
stdx::lock_guard lk(_mutex);
invariant(_state.load() == State::kRunning);
+ invariant(_workerOpCtx == nullptr);
_state.store(State::kShutdown);
// Invalidate the cache as it is no longer updated
@@ -213,12 +217,17 @@ void TopologyVersionObserver::_workerThreadBody() noexcept try {
topologyVersionObserverExpectsShutdown.pauseWhileSet();
});
- while (!_shouldShutdown.load()) {
+ while (true) {
auto opCtxHandle = tc->makeOperationContext();
+ topologyVersionObserverBeforeCheckingForShutdown.pauseWhileSet();
{
// Set the _workerOpCtx to our newly formed opCtxHandle before we unlock.
stdx::lock_guard lk(_mutex);
+ // Checking `_shouldShutdown` under the lock is necessary to ensure the shutdown
+ // method can interrupt the new operation.
+ if (_shouldShutdown.load())
+ break;
_workerOpCtx = opCtxHandle.get();
}
diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp
index 58c41718428..6d7d54d57b8 100644
--- a/src/mongo/db/repl/topology_version_observer_test.cpp
+++ b/src/mongo/db/repl/topology_version_observer_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_version_observer.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
@@ -239,6 +240,50 @@ TEST_F(TopologyVersionObserverTest, HandleQuiesceMode) {
ASSERT(observer->isShutdown());
}
+class TopologyVersionObserverInterruptedTest : public TopologyVersionObserverTest {
+public:
+ void setUp() override {
+ auto configObj = getConfigObj();
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ }
+
+ void tearDown() override {}
+};
+
+TEST_F(TopologyVersionObserverInterruptedTest, ShutdownAlwaysInterruptsWorkerOperation) {
+
+ std::unique_ptr<TopologyVersionObserver> observer;
+ unittest::Barrier b1(2), b2(2);
+ boost::optional<stdx::thread> observerThread;
+ boost::optional<stdx::thread> blockerThread;
+ {
+ FailPointEnableBlock workerFailBlock("topologyVersionObserverBeforeCheckingForShutdown");
+
+ observer = std::make_unique<TopologyVersionObserver>();
+ observer->init(getServiceContext(), getReplCoord());
+
+ workerFailBlock->waitForTimesEntered(workerFailBlock.initialTimesEntered() + 1);
+ blockerThread = stdx::thread([&] {
+ FailPointEnableBlock requestFailBlock("topologyVersionObserverExpectsInterruption");
+ b1.countDownAndWait();
+ // Keeps the failpoint enabled until it receives a signal from themain thread.
+ b2.countDownAndWait();
+ });
+ b1.countDownAndWait(); // Wait for blocker thread to enable thefailpoint
+ {
+ FailPointEnableBlock shutdownFailBlock("topologyVersionObserverShutdownShouldWait");
+ observerThread = stdx::thread([&] { observer->shutdown(); });
+
+ shutdownFailBlock->waitForTimesEntered(shutdownFailBlock.initialTimesEntered() + 1);
+ }
+ }
+ observerThread->join();
+ b2.countDownAndWait(); // Unblock the blocker thread so that it can join
+ blockerThread->join();
+
+ ASSERT(observer->isShutdown());
+}
+
} // namespace
} // namespace repl
} // namespace mongo