summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2020-05-08 19:13:58 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-09 03:29:15 +0000
commit2546fe1c22b0777ca68e604376900ea11f10ee3a (patch)
tree77d10794642158a7d75544ca42587ea5d6cac888
parent276ac45d7cd76b17f20d2bbf8c7a4ab2d2795574 (diff)
downloadmongo-2546fe1c22b0777ca68e604376900ea11f10ee3a.tar.gz
SERVER-47949 Don't fetch or install a newer config via heartbeat while in drain mode
-rw-r--r--src/mongo/db/repl/optime.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp33
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp198
5 files changed, 239 insertions, 12 deletions
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index 9d045da574e..9cead491d25 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -56,7 +56,7 @@ public:
static const char kTermFieldName[];
// The term of an OpTime generated by old protocol version.
- static const long long kUninitializedTerm = -1;
+ static constexpr long long kUninitializedTerm = -1;
// The initial term after the first time upgrading from protocol version 0.
//
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f41c4145c61..9d1fc8502c5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -3337,6 +3337,14 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx,
}
auto topCoordTerm = _topCoord->getTerm();
+ if (!force) {
+ // For safety of reconfig, since we must commit a config in our own term before executing a
+ // reconfig, so we should never have a config in an older term. If the current config was
+ // installed via a force reconfig, we aren't concerned about this safety guarantee.
+ invariant(_rsConfig.getConfigTerm() == OpTime::kUninitializedTerm ||
+ _rsConfig.getConfigTerm() == topCoordTerm);
+ }
+
auto configWriteConcern = _getConfigReplicationWriteConcern();
// Construct a fake OpTime that can be accepted but isn't used.
OpTime fakeOpTime(Timestamp(1, 1), topCoordTerm);
@@ -5157,18 +5165,27 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
} else if (result.isOK() &&
response->getConfigVersionAndTerm() < args.getConfigVersionAndTerm()) {
+ logv2::DynamicAttributes attr;
+ attr.add("configTerm", args.getConfigTerm());
+ attr.add("configVersion", args.getConfigVersion());
+ attr.add("senderHost", senderHost);
+
+ // If we are currently in drain mode, we won't allow installing newer configs, so we don't
+ // schedule a heartbeat to fetch one. We do allow force reconfigs to proceed even if we are
+ // in drain mode.
+ if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) &&
+ args.getConfigTerm() != OpTime::kUninitializedTerm) {
+ LOGV2(4794901,
+ "Not scheduling a heartbeat to fetch a newer config since we are in PRIMARY "
+ "state but cannot accept writes yet.",
+ attr);
+ }
// Schedule a heartbeat to the sender to fetch the new config.
// Only send this if the sender's config is newer.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
- if (args.hasSender()) {
- LOGV2(21401,
- "Scheduling heartbeat to fetch a newer config with term {configTerm} and "
- "version {configVersion} from member: {senderHost}",
- "Scheduling heartbeat to fetch a newer config",
- "configTerm"_attr = args.getConfigTerm(),
- "configVersion"_attr = args.getConfigVersion(),
- "senderHost"_attr = senderHost);
+ else if (args.hasSender()) {
+ LOGV2(21401, "Scheduling heartbeat to fetch a newer config", attr);
int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
_scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 8da6a96854b..b3dacb2982b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1162,7 +1162,7 @@ private:
/**
* Schedules a replica set config change.
*/
- void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig);
+ void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig);
/**
* Method to write a configuration transmitted via heartbeat message to stable storage.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index aecbb54ccdd..4c7b35ca083 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -356,7 +356,7 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct
break;
case HeartbeatResponseAction::Reconfig:
invariant(responseStatus.isOK());
- _scheduleHeartbeatReconfig_inlock(responseStatus.getValue().getConfig());
+ _scheduleHeartbeatReconfig(lock, responseStatus.getValue().getConfig());
break;
case HeartbeatResponseAction::StepDownSelf:
invariant(action.getPrimaryConfigIndex() == _selfIndex);
@@ -530,7 +530,8 @@ bool ReplicationCoordinatorImpl::_shouldStepDownOnReconfig(WithLock,
!(myIndex.isOK() && newConfig.getMemberAt(myIndex.getValue()).isElectable());
}
-void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig) {
+void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
+ const ReplSetConfig& newConfig) {
if (_inShutdown) {
return;
}
@@ -566,6 +567,17 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet
"Aborting reconfiguration request",
"_rsConfigState"_attr = int(_rsConfigState));
}
+
+ // Allow force reconfigs to proceed even if we are not a writable primary yet.
+ if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) &&
+ newConfig.getConfigTerm() != OpTime::kUninitializedTerm) {
+ LOGV2_FOR_HEARTBEATS(
+ 4794900,
+ 1,
+ "Not scheduling a heartbeat reconfig since we are in PRIMARY state but "
+ "cannot accept writes yet.");
+ return;
+ }
_setConfigState_inlock(kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm());
diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
index 4079a6d196b..2c2919e830b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
@@ -2037,6 +2037,204 @@ TEST_F(ReplCoordReconfigTest, ForceReconfigShouldThrowIfArbiterNodesHaveNewlyAdd
getReplCoord()->processReplSetReconfig(opCtx.get(), args, &result));
}
+TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithHeartbeatReconfig) {
+ auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication,
+ logv2::LogSeverity::Debug(2)};
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "term" << 0 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345"))),
+ HostAndPort("node1", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_EQUALS(getReplCoord()->getTerm(), 0);
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
+
+ // Win election but don't exit drain mode.
+ auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
+ const auto opCtx = makeOperationContext();
+ simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
+
+ // Receive a heartbeat that should NOT schedule a new heartbeat to fetch a newer config.
+ ReplSetHeartbeatArgsV1 hbArgs;
+ auto rsConfig = getReplCoord()->getConfig();
+ hbArgs.setConfigVersion(3); // simulate a newer config version.
+ hbArgs.setConfigTerm(rsConfig.getConfigTerm());
+ hbArgs.setSetName(rsConfig.getReplSetName());
+ hbArgs.setSenderHost(HostAndPort("node2", 12345));
+ hbArgs.setSenderId(2);
+ hbArgs.setTerm(0);
+ ASSERT(hbArgs.isInitialized());
+
+ ReplSetHeartbeatResponse response;
+ ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
+
+ // No requests should have been scheduled.
+ getNet()->enterNetwork();
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ getNet()->exitNetwork();
+
+ // Receive a heartbeat that schedules a new heartbeat to fetch a newer config. We simulate a
+ // newer config version and an uninitialized term, so that a heartbeat will be scheduled to
+ // fetch a new config. When we mock the heartbeat response below, we will respond with a
+ // non-force config, which is to test the case where the sending node installed a non force
+ // config after we scheduled a heartbeat to it to fetch a force config. For safety, the
+ // important aspect is that we don't accept/install configs during drain mode, even if we try to
+ // fetch them.
+ hbArgs.setConfigVersion(3);
+ hbArgs.setConfigTerm(OpTime::kUninitializedTerm);
+ hbArgs.setSetName(rsConfig.getReplSetName());
+ hbArgs.setSenderHost(HostAndPort("node2", 12345));
+ hbArgs.setSenderId(2);
+ hbArgs.setTerm(0);
+ ASSERT(hbArgs.isInitialized());
+
+ ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
+
+ // Schedule a response with a newer config.
+ auto newerConfigVersion = 3;
+ auto newerConfig = BSON("_id"
+ << "mySet"
+ << "version" << newerConfigVersion << "term" << 0 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")));
+ auto net = getNet();
+ net->enterNetwork();
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+
+ ReplSetHeartbeatArgsV1 args;
+ ASSERT_OK(args.initialize(request.cmdObj));
+
+ startCapturingLogMessages();
+ OpTime lastApplied(Timestamp(100, 1), 0);
+ ReplSetHeartbeatResponse hbResp;
+ ASSERT_OK(rsConfig.initialize(newerConfig));
+ hbResp.setConfig(rsConfig);
+ hbResp.setSetName(rsConfig.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(rsConfig.getConfigVersion());
+ hbResp.setConfigTerm(rsConfig.getConfigTerm());
+ hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ stopCapturingLogMessages();
+
+ // Make sure the heartbeat reconfig has not been scheduled.
+ ASSERT_EQUALS(1, countTextFormatLogLinesContaining("Not scheduling a heartbeat reconfig"));
+
+ // Let drain mode complete.
+ signalDrainComplete(opCtx.get());
+
+ // We should have moved to a new term in the election, and our config should have the same term.
+ ASSERT_EQUALS(getReplCoord()->getTerm(), 1);
+ ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), 1);
+}
+
+TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithForceHeartbeatReconfig) {
+ auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication,
+ logv2::LogSeverity::Debug(2)};
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "term" << 0 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345"))),
+ HostAndPort("node1", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_EQUALS(getReplCoord()->getTerm(), 0);
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
+
+ // Win election but don't exit drain mode.
+ auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
+ const auto opCtx = makeOperationContext();
+ simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
+
+ // Receive a heartbeat that schedules a new heartbeat to fetch a newer config.
+ ReplSetHeartbeatArgsV1 hbArgs;
+ auto rsConfig = getReplCoord()->getConfig();
+ hbArgs.setConfigVersion(3); // simulate a newer config version.
+ hbArgs.setConfigTerm(OpTime::kUninitializedTerm); // force config.
+ hbArgs.setSetName(rsConfig.getReplSetName());
+ hbArgs.setSenderHost(HostAndPort("node2", 12345));
+ hbArgs.setSenderId(2);
+ hbArgs.setTerm(0);
+ ASSERT(hbArgs.isInitialized());
+
+ ReplSetHeartbeatResponse response;
+ ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
+
+ // Schedule a response with a newer config.
+ auto newerConfigVersion = 3;
+ auto newerConfig =
+ BSON("_id"
+ << "mySet"
+ << "version" << newerConfigVersion << "term" << OpTime::kUninitializedTerm << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")));
+ auto net = getNet();
+ net->enterNetwork();
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+
+ ReplSetHeartbeatArgsV1 args;
+ ASSERT_OK(args.initialize(request.cmdObj));
+
+ OpTime lastApplied(Timestamp(100, 1), 0);
+ ReplSetHeartbeatResponse hbResp;
+ ASSERT_OK(rsConfig.initialize(newerConfig));
+ hbResp.setConfig(rsConfig);
+ hbResp.setSetName(rsConfig.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(rsConfig.getConfigVersion());
+ hbResp.setConfigTerm(rsConfig.getConfigTerm());
+ hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
+ hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
+ net->exitNetwork();
+
+ {
+ // Prevent the heartbeat reconfig from completing.
+ FailPointEnableBlock fpb("blockHeartbeatReconfigFinish");
+
+ // Let the heartbeat reconfig begin.
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ // For force reconfigs, we do allow them to proceed even if we are in drain mode, so make
+ // sure it is in progress, stuck at the failpoint before completion.
+ fpb->waitForTimesEntered(1);
+
+ // At this point the heartbeat reconfig should be in progress but blocked from completion by
+ // the failpoint. We now let drain mode complete. The step up reconfig should be interrupted
+ // by the in progress heartbeat reconfig.
+ signalDrainComplete(opCtx.get());
+ }
+
+ // The failpoint should be released now, allowing the heartbeat reconfig to complete. We run the
+ // clock forward so the re-scheduled heartbeat reconfig will complete.
+ net->enterNetwork();
+ net->runUntil(net->now() + Milliseconds(100));
+ net->exitNetwork();
+
+ // We should have moved to a new term in the election, but our config should have the term from
+ // the force config.
+ ASSERT_EQUALS(getReplCoord()->getTerm(), 1);
+ ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), OpTime::kUninitializedTerm);
+}
+
} // anonymous namespace
} // namespace repl
} // namespace mongo