From 912735588386424712a1525da1574a4554bf1787 Mon Sep 17 00:00:00 2001 From: William Schultz Date: Mon, 18 May 2020 09:22:41 -0400 Subject: SERVER-29030 Announce new primary via heartbeat requests --- .../election_candidate_and_participant_metrics.js | 9 ++ .../db/repl/check_quorum_for_config_change.cpp | 2 + src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp | 17 +++ src/mongo/db/repl/repl_set_heartbeat_args_v1.h | 11 ++ src/mongo/db/repl/replication_coordinator_impl.cpp | 19 +++ ...lication_coordinator_impl_heartbeat_v1_test.cpp | 132 +++++++++++++++++++-- .../repl/replication_coordinator_test_fixture.cpp | 3 + src/mongo/db/repl/topology_coordinator.cpp | 5 +- 8 files changed, 190 insertions(+), 8 deletions(-) diff --git a/jstests/replsets/election_candidate_and_participant_metrics.js b/jstests/replsets/election_candidate_and_participant_metrics.js index 7eb5103052d..cdd3499b8dd 100644 --- a/jstests/replsets/election_candidate_and_participant_metrics.js +++ b/jstests/replsets/election_candidate_and_participant_metrics.js @@ -9,6 +9,7 @@ "use strict"; load("jstests/replsets/libs/election_handoff.js"); +load("jstests/replsets/rslib.js"); const testName = jsTestName(); const numNodes = 2; @@ -213,10 +214,18 @@ assert.eq(newPrimaryElectionParticipantMetrics.priorityAtElection, 1); assert.commandWorked(originalPrimary.adminCommand( {configureFailPoint: "voteYesInDryRunButNoInRealElection", mode: "alwaysOn"})); +// The new primary might still be processing the reconfig via heartbeat from the original primary's +// reconfig on step up. Wait for config replication first so it doesn't interfere with the step up +// on the new primary below. +waitForConfigReplication(originalPrimary); + // Attempt to step up the new primary a second time. Due to the failpoint, the current primary // should vote no, and as a result the election should fail. assert.commandWorked(newPrimary.adminCommand({replSetFreeze: 0})); +// Make sure the step up failed and for the right reason. assert.commandFailedWithCode(newPrimary.adminCommand({replSetStepUp: 1}), ErrorCodes.CommandFailed); +assert( + checkLog.checkContainsOnce(newPrimary, "Not becoming primary, we received insufficient votes")); originalPrimaryReplSetGetStatus = assert.commandWorked(originalPrimary.adminCommand({replSetGetStatus: 1})); diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 40ae42d86f4..7ae0dbea169 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -93,6 +93,8 @@ std::vector QuorumChecker::getRequests() const { if (isInitialConfig) { hbArgs.setCheckEmpty(); } + // hbArgs allows (but doesn't require) us to pass the current primary id as an optimization, + // but it is not readily available within QuorumChecker. hbArgs.setSenderHost(myConfig.getHostAndPort()); hbArgs.setSenderId(myConfig.getId().getData()); hbArgs.setTerm(_term); diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp index da65e24f829..92d0dd799c6 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp @@ -49,6 +49,7 @@ const std::string kSenderHostFieldName = "from"; const std::string kSenderIdFieldName = "fromId"; const std::string kSetNameFieldName = "replSetHeartbeat"; const std::string kTermFieldName = "term"; +const std::string kPrimaryIdFieldName = "primaryId"; } // namespace Status ReplSetHeartbeatArgsV1::initialize(const BSONObj& argsObj) { @@ -96,6 +97,13 @@ Status ReplSetHeartbeatArgsV1::initialize(const BSONObj& argsObj) { _hasSender = true; } + // If sender is in an older version, the request object may not have the 'primaryId' field, but + // we still parse and allow it whenever it is present. + status = bsonExtractIntegerFieldWithDefault( + argsObj, kPrimaryIdFieldName, kEmptyPrimaryId, &_primaryId); + if (!status.isOK()) + return status; + status = bsonExtractIntegerField(argsObj, kTermFieldName, &_term); if (!status.isOK()) return status; @@ -145,6 +153,10 @@ void ReplSetHeartbeatArgsV1::setCheckEmpty() { _checkEmpty = true; } +void ReplSetHeartbeatArgsV1::setPrimaryId(long long primaryId) { + _primaryId = primaryId; +} + BSONObj ReplSetHeartbeatArgsV1::toBSON() const { invariant(isInitialized()); BSONObjBuilder builder; @@ -165,6 +177,11 @@ void ReplSetHeartbeatArgsV1::addToBSON(BSONObjBuilder* builder) const { builder->append(kSenderHostFieldName, _hasSender ? _senderHost.toString() : ""); builder->appendIntOrLL(kSenderIdFieldName, _senderId); builder->appendIntOrLL(kTermFieldName, _term); + + if (serverGlobalParams.featureCompatibility.isVersion( + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo46)) { + builder->append(kPrimaryIdFieldName, _primaryId); + } } } // namespace repl diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h index 9b6123b2169..4687ff13c78 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h +++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h @@ -114,6 +114,13 @@ public: return _term; } + /** + * Gets the id of the node the sender believes to be primary or -1 if it is not known. + */ + long long getPrimaryId() const { + return _primaryId; + } + /** * Returns whether or not the sender is checking for emptiness. */ @@ -145,6 +152,7 @@ public: void setSenderHost(const HostAndPort& newVal); void setSetName(StringData newVal); void setTerm(long long newVal); + void setPrimaryId(long long primaryId); void setCheckEmpty(); /** @@ -157,12 +165,15 @@ public: void addToBSON(BSONObjBuilder* builder) const; private: + static const long long kEmptyPrimaryId = -1; + // look at the body of the isInitialized() function to see which fields are mandatory long long _configVersion = -1; long long _configTerm = OpTime::kUninitializedTerm; long long _heartbeatVersion = -1; long long _senderId = -1; long long _term = -1; + long long _primaryId = kEmptyPrimaryId; bool _checkEmpty = false; bool _hasSender = false; bool _hasHeartbeatVersion = false; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 4c52652bfcc..4b56ca6026e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -5204,6 +5204,25 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost); _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now); } + } else if (result.isOK() && args.getPrimaryId() >= 0 && + (!response->hasPrimaryId() || response->getPrimaryId() != args.getPrimaryId())) { + // If the sender thinks the primary is different from what we think and if the sender itself + // is the primary, then we want to update our view of primary by immediately sending out a + // new round of heartbeats, whose responses should inform us of the new primary. We only do + // this if the term of the heartbeat is greater than or equal to our own, to prevent + // updating our view to a stale primary. + if (args.hasSender() && args.getSenderId() == args.getPrimaryId() && + args.getTerm() >= _topCoord->getTerm()) { + std::string myPrimaryId = + (response->hasPrimaryId() ? (str::stream() << response->getPrimaryId()) + : std::string("none")); + LOGV2(2903000, + "Restarting heartbeats after learning of a new primary", + "myPrimaryId"_attr = myPrimaryId, + "senderAndPrimaryId"_attr = args.getPrimaryId(), + "senderTerm"_attr = args.getTerm()); + _restartHeartbeats_inlock(); + } } return result; } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 9160e037da7..277b8537863 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -80,9 +80,12 @@ TEST(ReplSetHeartbeatArgs, AcceptsUnknownField) { class ReplCoordHBV1Test : public ReplCoordTest { protected: void assertMemberState(MemberState expected, std::string msg = ""); - ReplSetHeartbeatResponse receiveHeartbeatFrom(const ReplSetConfig& rsConfig, - int sourceId, - const HostAndPort& source); + ReplSetHeartbeatResponse receiveHeartbeatFrom( + const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source, + const int term = 1, + const boost::optional currentPrimaryId = boost::none); }; void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) { @@ -91,16 +94,22 @@ void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::strin << " but found " << actual.toString() << " - " << msg; } -ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(const ReplSetConfig& rsConfig, - int sourceId, - const HostAndPort& source) { +ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom( + const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source, + const int term, + const boost::optional currentPrimaryId) { ReplSetHeartbeatArgsV1 hbArgs; hbArgs.setConfigVersion(rsConfig.getConfigVersion()); hbArgs.setConfigTerm(rsConfig.getConfigTerm()); hbArgs.setSetName(rsConfig.getReplSetName()); hbArgs.setSenderHost(source); hbArgs.setSenderId(sourceId); - hbArgs.setTerm(1); + hbArgs.setTerm(term); + if (currentPrimaryId) { + hbArgs.setPrimaryId(*currentPrimaryId); + } ASSERT(hbArgs.isInitialized()); ReplSetHeartbeatResponse response; @@ -177,6 +186,115 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } +TEST_F(ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) { + auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; + + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + getReplCoord()->updateTerm_forTest(1, nullptr); + ASSERT_EQ(getReplCoord()->getTerm(), 1); + + enterNetwork(); + // Ignore the first 2 messages. + for (int j = 0; j < 2; ++j) { + const auto noi = getNet()->getNextReadyRequest(); + noi->getRequest(); + getNet()->blackHole(noi); + } + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + + // We're a secondary and we receive a request from node3 saying it's the primary. + receiveHeartbeatFrom(getReplCoord()->getConfig(), + 3 /* senderId */, + HostAndPort("node3", 12345), + 1 /* term */, + 3 /* currentPrimaryId */); + + enterNetwork(); + std::set expectedHosts{"node2", "node3"}; + std::set actualHosts; + for (size_t i = 0; i < expectedHosts.size(); i++) { + const auto noi = getNet()->getNextReadyRequest(); + // 'request' represents the request sent from self(node1) back to node3 + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 args; + ASSERT_OK(args.initialize(request.cmdObj)); + actualHosts.insert(request.target.host()); + ASSERT_EQ(args.getPrimaryId(), -1); + // We don't need to respond. + getNet()->blackHole(noi); + } + ASSERT(expectedHosts == actualHosts); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + + // Heartbeat in a stale term shouldn't re-schedule heartbeats. + receiveHeartbeatFrom(getReplCoord()->getConfig(), + 3 /* senderId */, + HostAndPort("node3", 12345), + 0 /* term */, + 3 /* currentPrimaryId */); + enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); +} + +TEST_F( + ReplCoordHBV1Test, + SecondaryReceivesHeartbeatRequestFromSecondaryWithDifferentPrimaryIdDoesNotRestartHeartbeats) { + auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ + logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; + auto replConfigBson = BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + + assertStartSuccess(replConfigBson, HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_EQ(getReplCoord()->getTerm(), 0); + + enterNetwork(); + // Ignore the first 2 messages. + for (int j = 0; j < 2; ++j) { + const auto noi = getNet()->getNextReadyRequest(); + noi->getRequest(); + getNet()->blackHole(noi); + } + exitNetwork(); + + // Node 2 thinks 3 is the primary. We don't restart heartbeats for that. + receiveHeartbeatFrom(getReplCoord()->getConfig(), + 2 /* senderId */, + HostAndPort("node3", 12345), + 0 /* term */, + 3 /* currentPrimaryId */); + + { + enterNetwork(); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + } +} + class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test { public: void setUp() { diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 78be2ece8b1..f2147923ed9 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -359,6 +359,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e ReplSetHeartbeatArgsV1 hbArgs; Status status = hbArgs.initialize(request.cmdObj); if (status.isOK()) { + if (replCoord->getMemberState().primary()) { + ASSERT_EQ(hbArgs.getPrimaryId(), replCoord->getMyId()); + } ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 8605c2bcf27..bbe49703822 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -868,7 +868,10 @@ std::pair TopologyCoordinator::prepareHear if (_rsConfig.getConfigTerm() != OpTime::kUninitializedTerm) { hbArgs.setConfigTerm(_rsConfig.getConfigTerm()); } - + if (_currentPrimaryIndex >= 0) { + // Send primary member id if one exists. + hbArgs.setPrimaryId(_memberData.at(_currentPrimaryIndex).getMemberId().getData()); + } if (_selfIndex >= 0) { const MemberConfig& me = _selfConfig(); hbArgs.setSenderId(me.getId().getData()); -- cgit v1.2.1