diff options
author | William Schultz <william.schultz@mongodb.com> | 2020-05-08 19:13:58 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-09 03:29:15 +0000 |
commit | 2546fe1c22b0777ca68e604376900ea11f10ee3a (patch) | |
tree | 77d10794642158a7d75544ca42587ea5d6cac888 | |
parent | 276ac45d7cd76b17f20d2bbf8c7a4ab2d2795574 (diff) | |
download | mongo-2546fe1c22b0777ca68e604376900ea11f10ee3a.tar.gz |
SERVER-47949 Don't fetch or install a newer config via heartbeat while in drain mode
-rw-r--r-- | src/mongo/db/repl/optime.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp | 198 |
5 files changed, 239 insertions, 12 deletions
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index 9d045da574e..9cead491d25 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -56,7 +56,7 @@ public: static const char kTermFieldName[]; // The term of an OpTime generated by old protocol version. - static const long long kUninitializedTerm = -1; + static constexpr long long kUninitializedTerm = -1; // The initial term after the first time upgrading from protocol version 0. // diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f41c4145c61..9d1fc8502c5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3337,6 +3337,14 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx, } auto topCoordTerm = _topCoord->getTerm(); + if (!force) { + // For safety of reconfig, since we must commit a config in our own term before executing a + // reconfig, so we should never have a config in an older term. If the current config was + // installed via a force reconfig, we aren't concerned about this safety guarantee. + invariant(_rsConfig.getConfigTerm() == OpTime::kUninitializedTerm || + _rsConfig.getConfigTerm() == topCoordTerm); + } + auto configWriteConcern = _getConfigReplicationWriteConcern(); // Construct a fake OpTime that can be accepted but isn't used. OpTime fakeOpTime(Timestamp(1, 1), topCoordTerm); @@ -5157,18 +5165,27 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs } } else if (result.isOK() && response->getConfigVersionAndTerm() < args.getConfigVersionAndTerm()) { + logv2::DynamicAttributes attr; + attr.add("configTerm", args.getConfigTerm()); + attr.add("configVersion", args.getConfigVersion()); + attr.add("senderHost", senderHost); + + // If we are currently in drain mode, we won't allow installing newer configs, so we don't + // schedule a heartbeat to fetch one. We do allow force reconfigs to proceed even if we are + // in drain mode. + if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) && + args.getConfigTerm() != OpTime::kUninitializedTerm) { + LOGV2(4794901, + "Not scheduling a heartbeat to fetch a newer config since we are in PRIMARY " + "state but cannot accept writes yet.", + attr); + } // Schedule a heartbeat to the sender to fetch the new config. // Only send this if the sender's config is newer. // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat // will trigger reconfig, which cancels and reschedules all heartbeats. - if (args.hasSender()) { - LOGV2(21401, - "Scheduling heartbeat to fetch a newer config with term {configTerm} and " - "version {configVersion} from member: {senderHost}", - "Scheduling heartbeat to fetch a newer config", - "configTerm"_attr = args.getConfigTerm(), - "configVersion"_attr = args.getConfigVersion(), - "senderHost"_attr = senderHost); + else if (args.hasSender()) { + LOGV2(21401, "Scheduling heartbeat to fetch a newer config", attr); int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 8da6a96854b..b3dacb2982b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1162,7 +1162,7 @@ private: /** * Schedules a replica set config change. */ - void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig); + void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig); /** * Method to write a configuration transmitted via heartbeat message to stable storage. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index aecbb54ccdd..4c7b35ca083 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -356,7 +356,7 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct break; case HeartbeatResponseAction::Reconfig: invariant(responseStatus.isOK()); - _scheduleHeartbeatReconfig_inlock(responseStatus.getValue().getConfig()); + _scheduleHeartbeatReconfig(lock, responseStatus.getValue().getConfig()); break; case HeartbeatResponseAction::StepDownSelf: invariant(action.getPrimaryConfigIndex() == _selfIndex); @@ -530,7 +530,8 @@ bool ReplicationCoordinatorImpl::_shouldStepDownOnReconfig(WithLock, !(myIndex.isOK() && newConfig.getMemberAt(myIndex.getValue()).isElectable()); } -void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig) { +void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, + const ReplSetConfig& newConfig) { if (_inShutdown) { return; } @@ -566,6 +567,17 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet "Aborting reconfiguration request", "_rsConfigState"_attr = int(_rsConfigState)); } + + // Allow force reconfigs to proceed even if we are not a writable primary yet. + if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) && + newConfig.getConfigTerm() != OpTime::kUninitializedTerm) { + LOGV2_FOR_HEARTBEATS( + 4794900, + 1, + "Not scheduling a heartbeat reconfig since we are in PRIMARY state but " + "cannot accept writes yet."); + return; + } _setConfigState_inlock(kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index 4079a6d196b..2c2919e830b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -2037,6 +2037,204 @@ TEST_F(ReplCoordReconfigTest, ForceReconfigShouldThrowIfArbiterNodesHaveNewlyAdd getReplCoord()->processReplSetReconfig(opCtx.get(), args, &result)); } +TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithHeartbeatReconfig) { + auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication, + logv2::LogSeverity::Debug(2)}; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "term" << 0 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_EQUALS(getReplCoord()->getTerm(), 0); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + + // Win election but don't exit drain mode. + auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); + const auto opCtx = makeOperationContext(); + simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get()); + + // Receive a heartbeat that should NOT schedule a new heartbeat to fetch a newer config. + ReplSetHeartbeatArgsV1 hbArgs; + auto rsConfig = getReplCoord()->getConfig(); + hbArgs.setConfigVersion(3); // simulate a newer config version. + hbArgs.setConfigTerm(rsConfig.getConfigTerm()); + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setSenderHost(HostAndPort("node2", 12345)); + hbArgs.setSenderId(2); + hbArgs.setTerm(0); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + + // No requests should have been scheduled. + getNet()->enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + getNet()->exitNetwork(); + + // Receive a heartbeat that schedules a new heartbeat to fetch a newer config. We simulate a + // newer config version and an uninitialized term, so that a heartbeat will be scheduled to + // fetch a new config. When we mock the heartbeat response below, we will respond with a + // non-force config, which is to test the case where the sending node installed a non force + // config after we scheduled a heartbeat to it to fetch a force config. For safety, the + // important aspect is that we don't accept/install configs during drain mode, even if we try to + // fetch them. + hbArgs.setConfigVersion(3); + hbArgs.setConfigTerm(OpTime::kUninitializedTerm); + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setSenderHost(HostAndPort("node2", 12345)); + hbArgs.setSenderId(2); + hbArgs.setTerm(0); + ASSERT(hbArgs.isInitialized()); + + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + + // Schedule a response with a newer config. + auto newerConfigVersion = 3; + auto newerConfig = BSON("_id" + << "mySet" + << "version" << newerConfigVersion << "term" << 0 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))); + auto net = getNet(); + net->enterNetwork(); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + + startCapturingLogMessages(); + OpTime lastApplied(Timestamp(100, 1), 0); + ReplSetHeartbeatResponse hbResp; + ASSERT_OK(rsConfig.initialize(newerConfig)); + hbResp.setConfig(rsConfig); + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfigTerm(rsConfig.getConfigTerm()); + hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())}); + hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())}); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); + net->runReadyNetworkOperations(); + net->exitNetwork(); + stopCapturingLogMessages(); + + // Make sure the heartbeat reconfig has not been scheduled. + ASSERT_EQUALS(1, countTextFormatLogLinesContaining("Not scheduling a heartbeat reconfig")); + + // Let drain mode complete. + signalDrainComplete(opCtx.get()); + + // We should have moved to a new term in the election, and our config should have the same term. + ASSERT_EQUALS(getReplCoord()->getTerm(), 1); + ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), 1); +} + +TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithForceHeartbeatReconfig) { + auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication, + logv2::LogSeverity::Debug(2)}; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "term" << 0 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_EQUALS(getReplCoord()->getTerm(), 0); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + + // Win election but don't exit drain mode. + auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); + const auto opCtx = makeOperationContext(); + simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get()); + + // Receive a heartbeat that schedules a new heartbeat to fetch a newer config. + ReplSetHeartbeatArgsV1 hbArgs; + auto rsConfig = getReplCoord()->getConfig(); + hbArgs.setConfigVersion(3); // simulate a newer config version. + hbArgs.setConfigTerm(OpTime::kUninitializedTerm); // force config. + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setSenderHost(HostAndPort("node2", 12345)); + hbArgs.setSenderId(2); + hbArgs.setTerm(0); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + + // Schedule a response with a newer config. + auto newerConfigVersion = 3; + auto newerConfig = + BSON("_id" + << "mySet" + << "version" << newerConfigVersion << "term" << OpTime::kUninitializedTerm << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))); + auto net = getNet(); + net->enterNetwork(); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + + OpTime lastApplied(Timestamp(100, 1), 0); + ReplSetHeartbeatResponse hbResp; + ASSERT_OK(rsConfig.initialize(newerConfig)); + hbResp.setConfig(rsConfig); + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfigTerm(rsConfig.getConfigTerm()); + hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())}); + hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())}); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON())); + net->exitNetwork(); + + { + // Prevent the heartbeat reconfig from completing. + FailPointEnableBlock fpb("blockHeartbeatReconfigFinish"); + + // Let the heartbeat reconfig begin. + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + // For force reconfigs, we do allow them to proceed even if we are in drain mode, so make + // sure it is in progress, stuck at the failpoint before completion. + fpb->waitForTimesEntered(1); + + // At this point the heartbeat reconfig should be in progress but blocked from completion by + // the failpoint. We now let drain mode complete. The step up reconfig should be interrupted + // by the in progress heartbeat reconfig. + signalDrainComplete(opCtx.get()); + } + + // The failpoint should be released now, allowing the heartbeat reconfig to complete. We run the + // clock forward so the re-scheduled heartbeat reconfig will complete. + net->enterNetwork(); + net->runUntil(net->now() + Milliseconds(100)); + net->exitNetwork(); + + // We should have moved to a new term in the election, but our config should have the term from + // the force config. + ASSERT_EQUALS(getReplCoord()->getTerm(), 1); + ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), OpTime::kUninitializedTerm); +} + } // anonymous namespace } // namespace repl } // namespace mongo |