summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRyan Timmons <ryan.timmons@mongodb.com>2020-02-12 17:54:40 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-12 23:06:59 +0000
commit994fdd99bb6adb2cf9c7dd4061c2035188c2c8da (patch)
treef145122195ba7977144a6b7e8ae35d14d5f43c5b /src
parent4085fa11f08ba092294304346b3d07f6e998b878 (diff)
downloadmongo-994fdd99bb6adb2cf9c7dd4061c2035188c2c8da.tar.gz
SERVER-29030 Announce new primary via heartbeat requests
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp2
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp19
-rw-r--r--src/mongo/db/repl/repl_set_heartbeat_args_v1.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp22
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp98
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp4
7 files changed, 140 insertions, 17 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index 3faf2b20fa5..8fad659a585 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -92,6 +92,8 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const {
if (isInitialConfig) {
hbArgs.setCheckEmpty();
}
+ // hbArgs allows (but doesn't require) us to pass the current primary id as an optimization,
+ // but it is not readily available within QuorumChecker.
hbArgs.setSenderHost(myConfig.getHostAndPort());
hbArgs.setSenderId(myConfig.getId().getData());
hbArgs.setTerm(_term);
diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp
index 57f85aa58f9..d1dd5528338 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp
+++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.cpp
@@ -49,6 +49,7 @@ const std::string kSenderHostFieldName = "from";
const std::string kSenderIdFieldName = "fromId";
const std::string kSetNameFieldName = "replSetHeartbeat";
const std::string kTermFieldName = "term";
+const std::string kPrimaryIdFieldName = "primaryId";
const std::string kLegalHeartbeatFieldNames[] = {kCheckEmptyFieldName,
kConfigVersionFieldName,
@@ -57,7 +58,8 @@ const std::string kLegalHeartbeatFieldNames[] = {kCheckEmptyFieldName,
kSenderHostFieldName,
kSenderIdFieldName,
kSetNameFieldName,
- kTermFieldName};
+ kTermFieldName,
+ kPrimaryIdFieldName};
} // namespace
@@ -110,6 +112,12 @@ Status ReplSetHeartbeatArgsV1::initialize(const BSONObj& argsObj) {
_hasSender = true;
}
+ // If sender is version < 4.4, argsObj won't have the primaryId field,
+ // but we still parse and allow it whenever it is present.
+ status = bsonExtractIntegerFieldWithDefault(argsObj, kPrimaryIdFieldName, -1, &_primaryId);
+ if (!status.isOK())
+ return status;
+
status = bsonExtractIntegerField(argsObj, kTermFieldName, &_term);
if (!status.isOK())
return status;
@@ -159,6 +167,10 @@ void ReplSetHeartbeatArgsV1::setCheckEmpty() {
_checkEmpty = true;
}
+void ReplSetHeartbeatArgsV1::setPrimaryId(long long primaryId) {
+ _primaryId = primaryId;
+}
+
BSONObj ReplSetHeartbeatArgsV1::toBSON(bool omitConfigTerm) const {
invariant(isInitialized());
BSONObjBuilder builder;
@@ -186,6 +198,11 @@ void ReplSetHeartbeatArgsV1::addToBSON(BSONObjBuilder* builder, bool omitConfigT
builder->append(kSenderHostFieldName, _hasSender ? _senderHost.toString() : "");
builder->appendIntOrLL(kSenderIdFieldName, _senderId);
builder->appendIntOrLL(kTermFieldName, _term);
+
+ if (serverGlobalParams.featureCompatibility.isVersion(
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44)) {
+ builder->append(kPrimaryIdFieldName, _primaryId);
+ }
}
} // namespace repl
diff --git a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h
index f183a22072a..400eb421fd0 100644
--- a/src/mongo/db/repl/repl_set_heartbeat_args_v1.h
+++ b/src/mongo/db/repl/repl_set_heartbeat_args_v1.h
@@ -115,6 +115,13 @@ public:
}
/**
+ * Gets the id of the node the sender believes to be primary or -1 if it is not known.
+ */
+ long long getPrimaryId() const {
+ return _primaryId;
+ }
+
+ /**
* Returns whether or not the sender is checking for emptiness.
*/
bool hasCheckEmpty() const {
@@ -145,6 +152,7 @@ public:
void setSenderHost(const HostAndPort& newVal);
void setSetName(const std::string& newVal);
void setTerm(long long newVal);
+ void setPrimaryId(long long primaryId);
void setCheckEmpty();
/**
@@ -163,6 +171,7 @@ private:
long long _heartbeatVersion = -1;
long long _senderId = -1;
long long _term = -1;
+ long long _primaryId = -1;
bool _checkEmpty = false;
bool _hasSender = false;
bool _hasHeartbeatVersion = false;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f1db12180a2..eedaeb364c6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -323,7 +323,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_replicationProcess(replicationProcess),
_storage(storage),
_random(prngSeed) {
-
_termShadow.store(OpTime::kUninitializedTerm);
invariant(_service);
@@ -1744,7 +1743,6 @@ BSONObj ReplicationCoordinatorImpl::_getReplicationProgress(WithLock wl) const {
SharedSemiFuture<void> ReplicationCoordinatorImpl::_startWaitingForReplication(
WithLock wl, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
-
const Mode replMode = getReplicationMode();
if (replMode == modeNone) {
// no replication check needed (validated above)
@@ -1835,7 +1833,6 @@ void ReplicationCoordinatorImpl::updateAndLogStateTransitionMetrics(
const ReplicationCoordinator::OpsKillingStateTransitionEnum stateTransition,
const size_t numOpsKilled,
const size_t numOpsRunning) const {
-
// Clear the current metrics before setting.
userOpsKilled.decrement(userOpsKilled.get());
userOpsRunning.decrement(userOpsRunning.get());
@@ -2142,7 +2139,6 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
const bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime) {
-
const Date_t startTime = _replExecutor->now();
const Date_t stepDownUntil = startTime + stepdownTime;
const Date_t waitUntil = startTime + waitTime;
@@ -2536,7 +2532,6 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm
Status ReplicationCoordinatorImpl::processReplSetGetStatus(
BSONObjBuilder* response, ReplSetGetStatusResponseStyle responseStyle) {
-
BSONObj initialSyncProgress;
if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) {
std::shared_ptr<InitialSyncer> initialSyncerCopy;
@@ -3774,7 +3769,6 @@ boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_chooseStableOpTi
WithLock lk,
const std::set<OpTimeAndWallTime>& candidates,
OpTimeAndWallTime maximumStableOpTime) {
-
// No optime candidates.
if (candidates.empty()) {
return boost::none;
@@ -4031,7 +4025,6 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
OperationContext* opCtx,
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) {
-
auto termStatus = updateTerm(opCtx, args.getTerm());
if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
return termStatus;
@@ -4089,7 +4082,6 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const {
-
bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName);
bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName);
// Don't take any locks if we do not need to.
@@ -4177,6 +4169,18 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
_scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
}
+ } else if (result.isOK() && args.getPrimaryId() >= 0 &&
+ (!response->hasPrimaryId() || response->getPrimaryId() != args.getPrimaryId())) {
+ // Restart heartbeats if the sender thinks the primary is different from what we think.
+ // And if the sender itself is the primary.
+ if (args.hasSender() && args.getSenderId() == args.getPrimaryId()) {
+ log() << "Restarting heartbeats to learn of new primary. Sender has primaryId "
+ << args.getPrimaryId() << " and we have "
+ << (response->hasPrimaryId()
+ ? (str::stream() << "primaryId " << response->getPrimaryId())
+ : std::string("no primaryId"));
+ _restartHeartbeats_inlock();
+ }
}
return result;
}
@@ -4229,7 +4233,6 @@ Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long
EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
long long term, TopologyCoordinator::UpdateTermResult* updateTermResult) {
-
auto now = _replExecutor->now();
TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now);
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) {
@@ -4420,7 +4423,6 @@ CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const std::function<voi
}
Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) {
-
auto reason = skipDryRun ? StartElectionReasonEnum::kStepUpRequestSkipDryRun
: StartElectionReasonEnum::kStepUpRequest;
_startElectSelfIfEligibleV1(reason);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 5532d72f701..a606491ca24 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -56,9 +56,11 @@ using executor::RemoteCommandResponse;
class ReplCoordHBV1Test : public ReplCoordTest {
protected:
void assertMemberState(MemberState expected, std::string msg = "");
- ReplSetHeartbeatResponse receiveHeartbeatFrom(const ReplSetConfig& rsConfig,
- int sourceId,
- const HostAndPort& source);
+ ReplSetHeartbeatResponse receiveHeartbeatFrom(
+ const ReplSetConfig& rsConfig,
+ int sourceId,
+ const HostAndPort& source,
+ const boost::optional<int> currentPrimaryId = boost::none);
};
void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) {
@@ -67,9 +69,11 @@ void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::strin
<< " but found " << actual.toString() << " - " << msg;
}
-ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(const ReplSetConfig& rsConfig,
- int sourceId,
- const HostAndPort& source) {
+ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(
+ const ReplSetConfig& rsConfig,
+ int sourceId,
+ const HostAndPort& source,
+ const boost::optional<int> currentPrimaryId) {
ReplSetHeartbeatArgsV1 hbArgs;
hbArgs.setConfigVersion(rsConfig.getConfigVersion());
hbArgs.setConfigTerm(rsConfig.getConfigTerm());
@@ -77,6 +81,9 @@ ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom(const ReplSetCo
hbArgs.setSenderHost(source);
hbArgs.setSenderId(sourceId);
hbArgs.setTerm(1);
+ if (currentPrimaryId) {
+ hbArgs.setPrimaryId(*currentPrimaryId);
+ }
ASSERT(hbArgs.isInitialized());
ReplSetHeartbeatResponse response;
@@ -153,6 +160,85 @@ TEST_F(ReplCoordHBV1Test,
ASSERT_TRUE(getExternalState()->threadsStarted());
}
+TEST_F(ReplCoordHBV1Test,
+ SecondaryReceivesHeartbeatRequestFromPrimaryWithDifferentPrimaryIdRestartsHeartbeats) {
+ setMinimumLoggedSeverity(logger::LogSeverity::Debug(3));
+ auto replConfigBson = BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345")));
+
+ assertStartSuccess(replConfigBson, HostAndPort("node1", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+
+ enterNetwork();
+ // Ignore the first 2 messages.
+ for (int j = 0; j < 2; ++j) {
+ const auto noi = getNet()->getNextReadyRequest();
+ noi->getRequest();
+ getNet()->blackHole(noi);
+ }
+ exitNetwork();
+
+ // We're a secondary and we receive a request from node3 saying it's the primary.
+ receiveHeartbeatFrom(getReplCoord()->getConfig(), 3, HostAndPort("node3", 12345), 3);
+
+ enterNetwork();
+ for (const auto& host : {"node2", "node3"}) {
+ const auto noi = getNet()->getNextReadyRequest();
+ // 'request' represents the request sent from self(node1) back to node3
+ const RemoteCommandRequest& request = noi->getRequest();
+ ReplSetHeartbeatArgsV1 args;
+ ASSERT_OK(args.initialize(request.cmdObj));
+ ASSERT_EQ(request.target, HostAndPort(host, 12345));
+ ASSERT_EQ(args.getPrimaryId(), -1);
+ }
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ exitNetwork();
+}
+
+TEST_F(
+ ReplCoordHBV1Test,
+ SecondaryReceivesHeartbeatRequestFromSecondaryWithDifferentPrimaryIdDoesNotRestartHeartbeats) {
+ setMinimumLoggedSeverity(logger::LogSeverity::Debug(3));
+ auto replConfigBson = BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345")));
+
+ assertStartSuccess(replConfigBson, HostAndPort("node1", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+
+ enterNetwork();
+ // Ignore the first 2 messages.
+ for (int j = 0; j < 2; ++j) {
+ const auto noi = getNet()->getNextReadyRequest();
+ noi->getRequest();
+ getNet()->blackHole(noi);
+ }
+ exitNetwork();
+
+ // Node 2 thinks 3 is the primary. We don't restart heartbeats for that.
+ receiveHeartbeatFrom(getReplCoord()->getConfig(), 2, HostAndPort("node3", 12345), 3);
+
+ {
+ enterNetwork();
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ exitNetwork();
+ }
+}
+
+
class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test {
public:
void setUp() {
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index f773b0d03ff..f2528b216d2 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -336,6 +336,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e
ReplSetHeartbeatArgsV1 hbArgs;
Status status = hbArgs.initialize(request.cmdObj);
if (status.isOK()) {
+ if (replCoord->getMemberState().primary()) {
+ ASSERT_EQ(hbArgs.getPrimaryId(), replCoord->getMyId());
+ }
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index a4e4ab8f9e9..96ea370cb9f 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -679,6 +679,10 @@ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHear
hbArgs.setSetName(_rsConfig.getReplSetName());
hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
hbArgs.setConfigTerm(_rsConfig.getConfigTerm());
+ if (_currentPrimaryIndex >= 0) {
+ // Send primary member id if one exists.
+ hbArgs.setPrimaryId(_memberData.at(_currentPrimaryIndex).getMemberId().getData());
+ }
if (_selfIndex >= 0) {
const MemberConfig& me = _selfConfig();
hbArgs.setSenderId(me.getId().getData());