summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2020-03-17 08:38:36 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-17 12:49:40 +0000
commit8c1515929f34d41dbefbb9476e1dd893d523ad01 (patch)
treea970214331dfaeeaa3ae4215a7bc7d88ede706c0 /src
parent2563d9862806d35e4d9964bbefd5b79fd5a79c91 (diff)
downloadmongo-8c1515929f34d41dbefbb9476e1dd893d523ad01.tar.gz
SERVER-46381 Test concurrent stepdown and reconfig
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp18
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp236
4 files changed, 256 insertions, 1 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index ab6ee4dec9c..9bfbc4940b5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -119,7 +119,7 @@ MONGO_FAIL_POINT_DEFINE(waitForIsMasterResponse);
// Will cause an isMaster request to hang as it starts waiting.
MONGO_FAIL_POINT_DEFINE(hangWhileWaitingForIsMasterResponse);
MONGO_FAIL_POINT_DEFINE(skipDurableTimestampUpdates);
-// Will cause a reconfig to hang after completing the config quorum check.
+// Skip sending heartbeats to pre-check that a quorum is available before a reconfig.
MONGO_FAIL_POINT_DEFINE(omitConfigQuorumCheck);
// Will cause signal drain complete to hang after reconfig
MONGO_FAIL_POINT_DEFINE(hangAfterReconfigOnDrainComplete);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 23aa2940b46..c836835136f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -441,6 +441,7 @@ public:
void cleanupStableOpTimeCandidates_forTest(std::set<OpTimeAndWallTime>* candidates,
OpTimeAndWallTime stableOpTime);
std::set<OpTimeAndWallTime> getStableOpTimeCandidates_forTest();
+ void handleHeartbeatResponse_forTest(BSONObj response, int targetIndex);
/**
* Non-blocking version of updateTerm.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 10c0bc1b0de..76e2367cf3a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -138,6 +138,24 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd
}));
}
+void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj response,
+ int targetIndex) {
+ CallbackHandle handle;
+ RemoteCommandRequest request;
+ request.target = _rsConfig.getMemberAt(targetIndex).getHostAndPort();
+ executor::TaskExecutor::ResponseStatus status(response, Milliseconds(100));
+ executor::TaskExecutor::RemoteCommandCallbackArgs cbData(
+ _replExecutor.get(), handle, request, status);
+
+ {
+ // Pretend we sent a request so that _untrackHeartbeatHandle_inlock succeeds.
+ stdx::unique_lock<Latch> lk(_mutex);
+ _trackHeartbeatHandle_inlock(handle);
+ }
+
+ _handleHeartbeatResponse(cbData, targetIndex);
+}
+
void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
stdx::unique_lock<Latch> lk(_mutex);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 30e703d75a6..c8bb9b6c08c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -39,12 +39,14 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
+#include "mongo/db/repl/task_runner_test_fixture.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
namespace repl {
@@ -991,6 +993,240 @@ TEST_F(ReplCoordHBV1Test, LastCommittedOpTimeOnlyUpdatesFromHeartbeatIfNotInStar
}
}
+/**
+ * Test a concurrent stepdown and reconfig. The stepdown is triggered by a heartbeat response with
+ * a higher term, the reconfig is triggered either by a heartbeat with a new config, or by a user
+ * replSetReconfig command.
+ *
+ * In setUp, the replication coordinator is initialized so "self" is the primary of a 3-node set.
+ * The coordinator schedules heartbeats to the other nodes but this test doesn't respond to those
+ * heartbeats. Instead, it creates heartbeat responses that have no associated requests, and injects
+ * the responses via handleHeartbeatResponse_forTest.
+ *
+ * Each subclass of HBStepdownAndReconfigTest triggers some sequence of stepdown and reconfig steps.
+ * The exact sequences are nondeterministic, since we don't use failpoints or NetworkInterfaceMock
+ * to force a specific order.
+ *
+ * Tests assert that stepdown via heartbeat completed, and the tests that send the new config via
+ * heartbeat assert that the new config was stored. Tests that send the new config with the
+ * replSetReconfig command don't check that it was stored; if the stepdown finished first then the
+ * replSetReconfig was rejected with a NotMaster error.
+ */
+class HBStepdownAndReconfigTest : public ReplCoordHBV1Test {
+protected:
+ void setUp() override;
+ void tearDown() override;
+ void sendHBResponse(int targetIndex,
+ long long term,
+ long long configVersion,
+ long long configTerm,
+ bool includeConfig);
+ void sendHBResponseWithNewConfig();
+ void sendHBResponseWithNewTerm();
+ Future<void> startReconfigCommand();
+ void assertSteppedDown();
+ void assertConfigStored();
+
+ const BSONObj _initialConfig = BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node0:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node1:12345"
+ << "_id" << 1)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 2))
+ << "protocolVersion" << 1);
+
+ OpTime _commitPoint = OpTime(Timestamp(100, 1), 0);
+ Date_t _wallTime = Date_t() + Seconds(100);
+ std::unique_ptr<ThreadPool> _threadPool;
+};
+
+void HBStepdownAndReconfigTest::setUp() {
+ ReplCoordHBV1Test::setUp();
+
+ // We need one thread to run processReplSetReconfig, use a pool for convenience.
+ _threadPool = std::make_unique<ThreadPool>(ThreadPool::Options());
+ _threadPool->startup();
+
+ assertStartSuccess(_initialConfig, HostAndPort("node0", 12345));
+
+
+ auto replCoord = getReplCoord();
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ replCoordSetMyLastAppliedOpTime(_commitPoint, _wallTime);
+ replCoordSetMyLastDurableOpTime(_commitPoint, _wallTime);
+ simulateSuccessfulV1Election();
+
+ // New term.
+ ASSERT_EQUALS(1, replCoord->getTerm());
+ _wallTime = _wallTime + Seconds(1);
+ _commitPoint = OpTime(Timestamp(200, 2), 1);
+
+ // To complete a reconfig from Config 1 to Config 2 requires:
+ // Oplog Commitment: last write in previous Config 0 is majority-committed.
+ // Config Replication: Config 2 gossipped by heartbeat response to majority of Config 2 members.
+ //
+ // Catch up all members to the same OpTime to ensure Oplog Commitment in all tests.
+ // In tests that require it, we ensure Config Replication with acknowledgeReconfigCommand().
+ for (auto i = 0; i < 3; ++i) {
+ ASSERT_OK(replCoord->setLastAppliedOptime_forTest(2, i, _commitPoint, _wallTime));
+ ASSERT_OK(replCoord->setLastDurableOptime_forTest(2, i, _commitPoint, _wallTime));
+ }
+
+ setMinimumLoggedSeverity(logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(2));
+}
+
+void HBStepdownAndReconfigTest::tearDown() {
+ clearMinimumLoggedSeverity(logv2::LogComponent::kReplication);
+ _threadPool.reset();
+ ReplCoordHBV1Test::tearDown();
+}
+
+void HBStepdownAndReconfigTest::sendHBResponse(int targetIndex,
+ long long term,
+ long long configVersion,
+ long long configTerm,
+ bool includeConfig) {
+ auto replCoord = getReplCoord();
+ OpTime opTime(Timestamp(), 0);
+
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setTerm(term);
+ hbResp.setConfigVersion(configVersion);
+ hbResp.setConfigTerm(configTerm);
+ hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t()});
+ hbResp.setDurableOpTimeAndWallTime({opTime, Date_t()});
+
+ if (includeConfig) {
+ auto configDoc = MutableDocument(Document(_initialConfig));
+ configDoc["version"] = Value(configVersion);
+ ReplSetConfig newConfig;
+ ASSERT_OK(newConfig.initialize(configDoc.freeze().toBson()));
+ hbResp.setConfig(newConfig);
+ }
+
+ BSONObjBuilder responseBuilder;
+ responseBuilder.appendElements(hbResp.toBSON());
+ replCoord->handleHeartbeatResponse_forTest(responseBuilder.obj(), targetIndex);
+}
+
+void HBStepdownAndReconfigTest::sendHBResponseWithNewConfig() {
+ // Send a heartbeat response from a secondary, with newer config.
+ sendHBResponse(2 /* targetIndex */,
+ 1 /* term */,
+ 3 /* configVersion */,
+ 1 /* configTerm */,
+ true /* includeConfig */);
+}
+
+void HBStepdownAndReconfigTest::sendHBResponseWithNewTerm() {
+ // Send a heartbeat response from a secondary, with higher term.
+ sendHBResponse(1 /* targetIndex */,
+ 2 /* term */,
+ 2 /* configVersion */,
+ 1 /* configTerm */,
+ false /* includeConfig */);
+}
+
+Future<void> HBStepdownAndReconfigTest::startReconfigCommand() {
+ auto [promise, future] = makePromiseFuture<void>();
+
+ // Send a user replSetReconfig command.
+ auto coord = getReplCoord();
+ auto newConfig = MutableDocument(Document(_initialConfig));
+ newConfig["version"] = Value(3);
+ ReplicationCoordinator::ReplSetReconfigArgs args{
+ newConfig.freeze().toBson(), false /* force */
+ };
+
+ auto opCtx = ReplCoordHBV1Test::makeOperationContext();
+
+ _threadPool->schedule(
+ [promise = std::move(promise), coord, args, opCtx = std::move(opCtx)](Status) mutable {
+ // Avoid the need to respond to quorum-check heartbeats sent to the other two members.
+ // These heartbeats are sent *before* reconfiguring, they're distinct from the oplog
+ // commitment and config replication checks.
+ FailPointEnableBlock omitConfigQuorumCheck("omitConfigQuorumCheck");
+ BSONObjBuilder result;
+ auto status = Status::OK();
+ try {
+ // OK for processReplSetReconfig to return, throw NotMaster-like error, or succeed.
+ status = coord->processReplSetReconfig(opCtx.get(), args, &result);
+ } catch (const DBException&) {
+ status = exceptionToStatus();
+ }
+
+ if (!status.isOK()) {
+ ASSERT(ErrorCodes::isNotMasterError(status.code()));
+ LOGV2(463817,
+ "processReplSetReconfig threw expected error",
+ "errorCode"_attr = status.code(),
+ "message"_attr = status.reason());
+ }
+ promise.emplaceValue();
+ });
+
+ return std::move(future);
+}
+
+void HBStepdownAndReconfigTest::assertSteppedDown() {
+ LOGV2(463811, "Waiting for background jobs");
+ // The clock is mocked, we don't actually sleep.
+ sleepFor(Seconds(1));
+
+ // Primary stepped down.
+ ASSERT_EQUALS(2, getReplCoord()->getTerm());
+ assertMemberState(MemberState::RS_SECONDARY);
+}
+
+void HBStepdownAndReconfigTest::assertConfigStored() {
+ LOGV2(463812, "Waiting for background jobs");
+ // The clock is mocked, we don't actually sleep.
+ sleepFor(Seconds(1));
+
+ ASSERT_EQUALS(ConfigVersionAndTerm(3, 1),
+ getReplCoord()->getConfig().getConfigVersionAndTerm());
+}
+
+TEST_F(HBStepdownAndReconfigTest, HBStepdownThenHBReconfig) {
+ // A node has started to step down then learns about a new config via heartbeat.
+ sendHBResponseWithNewTerm();
+ sendHBResponseWithNewConfig();
+ assertSteppedDown();
+ assertConfigStored();
+}
+
+TEST_F(HBStepdownAndReconfigTest, HBReconfigThenHBStepdown) {
+ // A node has started to reconfig then learns about a new term via heartbeat.
+ sendHBResponseWithNewConfig();
+ sendHBResponseWithNewTerm();
+ assertSteppedDown();
+ assertConfigStored();
+}
+
+TEST_F(HBStepdownAndReconfigTest, HBStepdownThenReconfigCommand) {
+ // A node has started to step down then someone calls replSetReconfig.
+ sendHBResponseWithNewTerm();
+ auto future = startReconfigCommand();
+ future.get();
+ assertSteppedDown();
+}
+
+TEST_F(HBStepdownAndReconfigTest, ReconfigCommandThenHBStepdown) {
+ // Someone calls replSetReconfig then the node learns about a new term via heartbeat.
+ auto future = startReconfigCommand();
+ sendHBResponseWithNewTerm();
+ future.get();
+ assertSteppedDown();
+}
+
} // namespace
} // namespace repl
} // namespace mongo