diff options
author | Ryan Timmons <ryan.timmons@mongodb.com> | 2020-02-12 17:54:40 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-12 23:06:59 +0000 |
commit | 994fdd99bb6adb2cf9c7dd4061c2035188c2c8da (patch) | |
tree | f145122195ba7977144a6b7e8ae35d14d5f43c5b /src | |
parent | 4085fa11f08ba092294304346b3d07f6e998b878 (diff) | |
download | mongo-994fdd99bb6adb2cf9c7dd4061c2035188c2c8da.tar.gz |
SERVER-29030 Announce new primary via heartbeat requests
Diffstat (limited to 'src')
7 files changed, 140 insertions, 17 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 3faf2b20fa5..8fad659a585 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -92,6 +92,8 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const { if (isInitialConfig) { hbArgs.setCheckEmpty(); } + // hbArgs allows (but doesn't require) us to pass the current primary id as an optimization, + // but it is not readily available within QuorumChecker. hbArgs.setSenderHost(myConfig.getHostAndPort()); hbArgs.setSenderId(myConfig.getId().getData()); hbArgs.setTerm(_term); diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp index 57f85aa58f9..d1dd5528338 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp @@ -49,6 +49,7 @@ const std::string kSenderHostFieldName = "from"; const std::string kSenderIdFieldName = "fromId"; const std::string kSetNameFieldName = "replSetHeartbeat"; const std::string kTermFieldName = "term"; +const std::string kPrimaryIdFieldName = "primaryId"; const std::string kLegalHeartbeatFieldNames[] = {kCheckEmptyFieldName, kConfigVersionFieldName, @@ -57,7 +58,8 @@ const std::string kLegalHeartbeatFieldNames[] = {kCheckEmptyFieldName, kSenderHostFieldName, kSenderIdFieldName, kSetNameFieldName, - kTermFieldName}; + kTermFieldName, + kPrimaryIdFieldName}; } // namespace @@ -110,6 +112,12 @@ Status ReplSetHeartbeatArgsV1::initialize(const BSONObj& argsObj) { _hasSender = true; } + // If sender is version < 4.4, argsObj won't have the primaryId field, + // but we still parse and allow it whenever it is present. + status = bsonExtractIntegerFieldWithDefault(argsObj, kPrimaryIdFieldName, -1, &_primaryId); + if (!status.isOK()) + return status; + status = bsonExtractIntegerField(argsObj, kTermFieldName, &_term); if (!status.isOK()) return status; @@ -159,6 +167,10 @@ void ReplSetHeartbeatArgsV1::setCheckEmpty() { _checkEmpty = true; } +void ReplSetHeartbeatArgsV1::setPrimaryId(long long primaryId) { + _primaryId = primaryId; +} + BSONObj ReplSetHeartbeatArgsV1::toBSON(bool omitConfigTerm) const { invariant(isInitialized()); BSONObjBuilder builder; @@ -186,6 +198,11 @@ void ReplSetHeartbeatArgsV1::addToBSON(BSONObjBuilder* builder, bool omitConfigT builder->append(kSenderHostFieldName, _hasSender ? _senderHost.toString() : ""); builder->appendIntOrLL(kSenderIdFieldName, _senderId); builder->appendIntOrLL(kTermFieldName, _term); + + if (serverGlobalParams.featureCompatibility.isVersion( + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44)) { + builder->append(kPrimaryIdFieldName, _primaryId); + } } } // namespace repl diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h index f183a22072a..400eb421fd0 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h +++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h @@ -115,6 +115,13 @@ public: } /** + * Gets the id of the node the sender believes to be primary or -1 if it is not known. + */ + long long getPrimaryId() const { + return _primaryId; + } + + /** * Returns whether or not the sender is checking for emptiness. */ bool hasCheckEmpty() const { @@ -145,6 +152,7 @@ public: void setSenderHost(const HostAndPort& newVal); void setSetName(const std::string& newVal); void setTerm(long long newVal); + void setPrimaryId(long long primaryId); void setCheckEmpty(); /** @@ -163,6 +171,7 @@ private: long long _heartbeatVersion = -1; long long _senderId = -1; long long _term = -1; + long long _primaryId = -1; bool _checkEmpty = false; bool _hasSender = false; bool _hasHeartbeatVersion = false; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f1db12180a2..eedaeb364c6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -323,7 +323,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _replicationProcess(replicationProcess), _storage(storage), _random(prngSeed) { - _termShadow.store(OpTime::kUninitializedTerm); invariant(_service); @@ -1744,7 +1743,6 @@ BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const { SharedSemiFuture<void> ReplicationCoordinatorImpl::_startWaitingForReplication( WithLock wl, const OpTime& opTime, const WriteConcernOptions& writeConcern) { - const Mode replMode = getReplicationMode(); if (replMode == modeNone) { // no replication check needed (validated above) @@ -1835,7 +1833,6 @@ void ReplicationCoordinatorImpl::updateAndLogStateTransitionMetrics( const ReplicationCoordinator::OpsKillingStateTransitionEnum stateTransition, const size_t numOpsKilled, const size_t numOpsRunning) const { - // Clear the current metrics before setting. userOpsKilled.decrement(userOpsKilled.get()); userOpsRunning.decrement(userOpsRunning.get()); @@ -2142,7 +2139,6 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, const bool force, const Milliseconds& waitTime, const Milliseconds& stepdownTime) { - const Date_t startTime = _replExecutor->now(); const Date_t stepDownUntil = startTime + stepdownTime; const Date_t waitUntil = startTime + waitTime; @@ -2536,7 +2532,6 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm Status ReplicationCoordinatorImpl::processReplSetGetStatus( BSONObjBuilder* response, ReplSetGetStatusResponseStyle responseStyle) { - BSONObj initialSyncProgress; if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) { std::shared_ptr<InitialSyncer> initialSyncerCopy; @@ -3774,7 +3769,6 @@ boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_chooseStableOpTi WithLock lk, const std::set<OpTimeAndWallTime>& candidates, OpTimeAndWallTime maximumStableOpTime) { - // No optime candidates. if (candidates.empty()) { return boost::none; @@ -4031,7 +4025,6 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( OperationContext* opCtx, const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) { - auto termStatus = updateTerm(opCtx, args.getTerm()); if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm) return termStatus; @@ -4089,7 +4082,6 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const { - bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName); bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName); // Don't take any locks if we do not need to. @@ -4177,6 +4169,18 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now); } + } else if (result.isOK() && args.getPrimaryId() >= 0 && + (!response->hasPrimaryId() || response->getPrimaryId() != args.getPrimaryId())) { + // Restart heartbeats if the sender thinks the primary is different from what we think. + // And if the sender itself is the primary. + if (args.hasSender() && args.getSenderId() == args.getPrimaryId()) { + log() << "Restarting heartbeats to learn of new primary. Sender has primaryId " + << args.getPrimaryId() << " and we have " + << (response->hasPrimaryId() + ? (str::stream() << "primaryId " << response->getPrimaryId()) + : std::string("no primaryId")); + _restartHeartbeats_inlock(); + } } return result; } @@ -4229,7 +4233,6 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( long long term, TopologyCoordinator::UpdateTermResult* updateTermResult) { - auto now = _replExecutor->now(); TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now); if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) { @@ -4420,7 +4423,6 @@ CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const std::function<voi } Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { - auto reason = skipDryRun ? StartElectionReasonEnum::kStepUpRequestSkipDryRun : StartElectionReasonEnum::kStepUpRequest; _startElectSelfIfEligibleV1(reason); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 5532d72f701..a606491ca24 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -56,9 +56,11 @@ using executor::RemoteCommandResponse; class ReplCoordHBV1Test : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); - ReplSetHeartbeatResponse receiveHeartbeatFrom(const ReplSetConfig& rsConfig, - int sourceId, - const HostAndPort& source); + ReplSetHeartbeatResponse receiveHeartbeatFrom( + const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source, + const boost::optional<int> currentPrimaryId = boost::none); }; void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) { @@ -67,9 +69,11 @@ void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::strin << " but found " << actual.toString() << " - " << msg; } -ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(const ReplSetConfig& rsConfig, - int sourceId, - const HostAndPort& source) { +ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom( + const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source, + const boost::optional<int> currentPrimaryId) { ReplSetHeartbeatArgsV1 hbArgs; hbArgs.setConfigVersion(rsConfig.getConfigVersion()); hbArgs.setConfigTerm(rsConfig.getConfigTerm()); @@ -77,6 +81,9 @@ ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(const ReplSetCo hbArgs.setSenderHost(source); hbArgs.setSenderId(sourceId); hbArgs.setTerm(1); + if (currentPrimaryId) { + hbArgs.setPrimaryId(*currentPrimaryId); + } ASSERT(hbArgs.isInitialized()); ReplSetHeartbeatResponse response; @@ -153,6 +160,85 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } +TEST_F(ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { + setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + enterNetwork(); + // Ignore the first 2 messages. + for (int j = 0; j < 2; ++j) { + const auto noi = getNet()->getNextReadyRequest(); + noi->getRequest(); + getNet()->blackHole(noi); + } + exitNetwork(); + + // We're a secondary and we receive a request from node3 saying it's the primary. + receiveHeartbeatFrom(getReplCoord()->getConfig(), 3, HostAndPort("node3", 12345), 3); + + enterNetwork(); + for (const auto& host : {"node2", "node3"}) { + const auto noi = getNet()->getNextReadyRequest(); + // 'request' represents the request sent from self(node1) back to node3 + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + ASSERT_EQ(request.target, HostAndPort(host, 12345)); + ASSERT_EQ(args.getPrimaryId(), -1); + } + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); +} + +TEST_F( + ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromSecondaryWithDifferentPrimaryIdDoesNotRestartHeartbeats) { + setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + enterNetwork(); + // Ignore the first 2 messages. + for (int j = 0; j < 2; ++j) { + const auto noi = getNet()->getNextReadyRequest(); + noi->getRequest(); + getNet()->blackHole(noi); + } + exitNetwork(); + + // Node 2 thinks 3 is the primary. We don't restart heartbeats for that. + receiveHeartbeatFrom(getReplCoord()->getConfig(), 2, HostAndPort("node3", 12345), 3); + + { + enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + } +} + + class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test { public: void setUp() { diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index f773b0d03ff..f2528b216d2 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -336,6 +336,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e ReplSetHeartbeatArgsV1 hbArgs; Status status = hbArgs.initialize(request.cmdObj); if (status.isOK()) { + if (replCoord->getMemberState().primary()) { + ASSERT_EQ(hbArgs.getPrimaryId(), replCoord->getMyId()); + } ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index a4e4ab8f9e9..96ea370cb9f 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -679,6 +679,10 @@ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHear hbArgs.setSetName(_rsConfig.getReplSetName()); hbArgs.setConfigVersion(_rsConfig.getConfigVersion()); hbArgs.setConfigTerm(_rsConfig.getConfigTerm()); + if (_currentPrimaryIndex >= 0) { + // Send primary member id if one exists. + hbArgs.setPrimaryId(_memberData.at(_currentPrimaryIndex).getMemberId().getData()); + } if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderId(me.getId().getData()); |