summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2022-07-11 12:00:36 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-16 01:12:22 +0000
commit610d9be50d6efa94a1012ca0af3afe6b67b56c19 (patch)
tree3764bd6c33c591b76384f8cd0470a1c173e1dc55
parent7466bae6d8080110fc9fd8a1913a17f3459d564d (diff)
downloadmongo-610d9be50d6efa94a1012ca0af3afe6b67b56c19.tar.gz
SERVER-67954 Notify BackgroundSync when data relevant to sync source selection has changed
-rw-r--r--src/mongo/db/repl/bgsync.cpp52
-rw-r--r--src/mongo/db/repl/bgsync.h21
-rw-r--r--src/mongo/db/repl/heartbeat_response_action.cpp4
-rw-r--r--src/mongo/db/repl/heartbeat_response_action.h20
-rw-r--r--src/mongo/db/repl/member_data.cpp8
-rw-r--r--src/mongo/db/repl/member_data.h33
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp17
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp152
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp7
15 files changed, 335 insertions, 23 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 0e5d3fa573b..730a5ecf6dd 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -187,6 +187,8 @@ void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<Latch> lock(_mutex);
setState(lock, ProducerState::Stopped);
+ // If we happen to be waiting for sync source data, stop.
+ _notifySyncSourceSelectionDataChanged(lock);
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
@@ -327,6 +329,11 @@ void BackgroundSync::_produce() {
lastOpTimeFetched,
OpTime(),
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
+ // It is possible for _syncSourceSelectionDataChanged to become true between when we release
+ // the lock at the end of this block and when the syncSourceResolver retrieves the relevant
+ // heartbeat data, which means if we don't get a sync source we won't sleep even though we
+ // used the relevant data. But that's OK because we'll only spin once.
+ _syncSourceSelectionDataChanged = false;
}
// This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls
// ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to
@@ -413,20 +420,18 @@ void BackgroundSync::_produce() {
source = _syncSourceHost;
}
// If our sync source has not changed, it is likely caused by our heartbeat data map being
- // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
- // for our map to update.
+ // out of date. In that case we sleep for up to 1 second to reduce the amount we spin
+ // waiting for our map to update. If we are notified of heartbeat data change, we will
+ // interrupt the wait early.
if (oldSource == source) {
long long sleepMS = _getRetrySleepMS();
LOGV2(21087,
- "Chose same sync source candidate as last time, {syncSource}. Sleeping for "
- "{sleepDurationMillis}ms to avoid immediately choosing a new sync source for the "
- "same reason as last time.",
"Chose same sync source candidate as last time. Sleeping to avoid immediately "
"choosing a new sync source for the same reason as last time",
"syncSource"_attr = source,
"sleepDurationMillis"_attr = sleepMS);
numTimesChoseSameSyncSource.increment(1);
- mongo::sleepmillis(sleepMS);
+ _waitForNewSyncSourceSelectionData(sleepMS);
} else {
LOGV2(21088,
"Changed sync source from {oldSyncSource} to {newSyncSource}",
@@ -449,12 +454,10 @@ void BackgroundSync::_produce() {
// No sync source found.
LOGV2_DEBUG(21090,
1,
- "Could not find a sync source. Sleeping for {sleepDurationMillis}ms before "
- "trying again.",
"Could not find a sync source. Sleeping before trying again",
"sleepDurationMillis"_attr = sleepMS);
numTimesCouldNotFindSyncSource.increment(1);
- mongo::sleepmillis(sleepMS);
+ _waitForNewSyncSourceSelectionData(sleepMS);
return;
}
@@ -863,6 +866,33 @@ void BackgroundSync::_fallBackOnRollbackViaRefetch(
rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, _replicationProcess);
}
+void BackgroundSync::notifySyncSourceSelectionDataChanged() {
+ stdx::lock_guard lock(_mutex);
+ _notifySyncSourceSelectionDataChanged(lock);
+}
+
+void BackgroundSync::_notifySyncSourceSelectionDataChanged(WithLock) {
+ if (!_syncSourceSelectionDataChanged) {
+ _syncSourceSelectionDataChanged = true;
+ _syncSourceSelectionDataCv.notify_one();
+ }
+}
+
+void BackgroundSync::_waitForNewSyncSourceSelectionData(long long waitTimeMillis) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ if (_syncSourceSelectionDataCv.wait_for(
+ lock, stdx::chrono::milliseconds(waitTimeMillis), [this] {
+ return _syncSourceSelectionDataChanged || _inShutdown;
+ })) {
+ LOGV2_DEBUG(6795401,
+ 1,
+ "Sync source wait interrupted early",
+ "syncSourceSelectionDataChanged"_attr = _syncSourceSelectionDataChanged,
+ "inShutdown"_attr = _inShutdown,
+ "waitTimeMillis"_attr = waitTimeMillis);
+ }
+}
+
HostAndPort BackgroundSync::getSyncTarget() const {
stdx::unique_lock<Latch> lock(_mutex);
return _syncSourceHost;
@@ -875,12 +905,16 @@ void BackgroundSync::clearSyncTarget() {
"Resetting sync source to empty",
"previousSyncSource"_attr = _syncSourceHost);
_syncSourceHost = HostAndPort();
+ _notifySyncSourceSelectionDataChanged(lock);
}
void BackgroundSync::_stop(WithLock lock, bool resetLastFetchedOptime) {
setState(lock, ProducerState::Stopped);
LOGV2(21107, "Stopping replication producer");
+ // If we happen to be waiting for sync source data, stop.
+ _notifySyncSourceSelectionDataChanged(lock);
+
_syncSourceHost = HostAndPort();
if (resetLastFetchedOptime) {
invariant(_oplogApplier->getBuffer()->isEmpty());
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 9bfec5c753b..6b4882e6905 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -122,8 +122,10 @@ public:
*/
bool tooStale() const;
- // starts the sync target notifying thread
- void notifierThread();
+ /**
+ * Informs us that data relevant to sync source selection has changed.
+ */
+ void notifySyncSourceSelectionDataChanged();
HostAndPort getSyncTarget() const;
@@ -214,6 +216,14 @@ private:
long long _getRetrySleepMS();
+ // Waits for the given time, or until we are notified that relevant sync source selection data
+ // has changed. Takes _mutex, so don't call with _mutex held.
+ void _waitForNewSyncSourceSelectionData(long long waitTimeMillis);
+
+ // Internal version of notifySyncSourceSelectionDataChanged(), to be used by callers
+ // which already hold _mutex.
+ void _notifySyncSourceSelectionDataChanged(WithLock);
+
// This OplogApplier applies oplog entries fetched from the sync source.
OplogApplier* const _oplogApplier;
@@ -275,6 +285,13 @@ private:
// operations in the local oplog in order to bring this server to a consistent state relative
// to the sync source.
std::unique_ptr<RollbackImpl> _rollback; // (PR)
+
+ // A condition variable used to wake us when sync source selection data changes.
+ stdx::condition_variable _syncSourceSelectionDataCv; // (S)
+
+ // A latch which tells us if sync source selection data has changed since we called
+ // the syncSourcSelector
+ bool _syncSourceSelectionDataChanged = true; // (M)
};
diff --git a/src/mongo/db/repl/heartbeat_response_action.cpp b/src/mongo/db/repl/heartbeat_response_action.cpp
index 43c1abed4a0..284b34def3a 100644
--- a/src/mongo/db/repl/heartbeat_response_action.cpp
+++ b/src/mongo/db/repl/heartbeat_response_action.cpp
@@ -83,5 +83,9 @@ void HeartbeatResponseAction::setBecameElectable(bool becameElectable) {
_becameElectable = becameElectable;
}
+void HeartbeatResponseAction::setChangedMemberState(bool changedMemberState) {
+ _changedMemberState = changedMemberState;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/heartbeat_response_action.h b/src/mongo/db/repl/heartbeat_response_action.h
index 8e100d9966d..8d1310b90ab 100644
--- a/src/mongo/db/repl/heartbeat_response_action.h
+++ b/src/mongo/db/repl/heartbeat_response_action.h
@@ -112,6 +112,11 @@ public:
*/
void setBecameElectable(bool becameElectable);
+ /*
+ * Sets whether or not the member has changed member state since the last heartbeat response.
+ */
+ void setChangedMemberState(bool changedMemberState);
+
/**
* Gets the action type of this action.
*/
@@ -151,12 +156,27 @@ public:
return _becameElectable;
}
+ /*
+ * Returns true if the heartbeat response results in the member changing member state.
+ */
+ bool getChangedMemberState() const {
+ return _changedMemberState;
+ }
+
+ /*
+ * Returns true if the heartbeat results in any significant change in member data.
+ */
+ bool getChangedSignificantly() const {
+ return _changedMemberState || _advancedOpTimeOrUpdatedConfig || _becameElectable;
+ }
+
private:
Action _action;
int _primaryIndex;
Date_t _nextHeartbeatStartDate;
bool _advancedOpTimeOrUpdatedConfig = false;
bool _becameElectable = false;
+ bool _changedMemberState = false;
};
} // namespace repl
diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp
index 054e93c6145..59fa6f057aa 100644
--- a/src/mongo/db/repl/member_data.cpp
+++ b/src/mongo/db/repl/member_data.cpp
@@ -47,7 +47,8 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is
_lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime());
}
-bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) {
+MemberData::HeartbeatChanges MemberData::setUpValues(Date_t now,
+ ReplSetHeartbeatResponse&& hbResponse) {
_health = 1;
if (_upSince == Date_t()) {
_upSince = now;
@@ -69,7 +70,8 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
hbResponse.setAppliedOpTimeAndWallTime(_lastResponse.getAppliedOpTimeAndWallTime());
}
// Log if the state changes
- if (_lastResponse.getState() != hbResponse.getState()) {
+ const bool memberStateChanged = _lastResponse.getState() != hbResponse.getState();
+ if (memberStateChanged) {
LOGV2(21215,
"Member {hostAndPort} is now in state {newState}",
"Member is in new state",
@@ -91,7 +93,7 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse)
_lastResponse = std::move(hbResponse);
- return (opTimeAdvanced || configChanged);
+ return {opTimeAdvanced, configChanged, memberStateChanged};
}
void MemberData::setDownValues(Date_t now, const std::string& heartbeatMessage) {
diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h
index 1903a1b448d..1b9bb68edb7 100644
--- a/src/mongo/db/repl/member_data.h
+++ b/src/mongo/db/repl/member_data.h
@@ -44,6 +44,31 @@ namespace repl {
**/
class MemberData {
public:
+ class HeartbeatChanges {
+ public:
+ HeartbeatChanges(bool opTimeAdvanced, bool configChanged, bool memberStateChanged)
+ : _opTimeAdvanced(opTimeAdvanced),
+ _configChanged(configChanged),
+ _memberStateChanged(memberStateChanged) {}
+
+ bool getOpTimeAdvanced() const {
+ return _opTimeAdvanced;
+ }
+
+ bool getConfigChanged() const {
+ return _configChanged;
+ }
+
+ bool getMemberStateChanged() const {
+ return _memberStateChanged;
+ }
+
+ private:
+ const bool _opTimeAdvanced;
+ const bool _configChanged;
+ const bool _memberStateChanged;
+ };
+
MemberData();
MemberState getState() const {
@@ -159,11 +184,11 @@ public:
}
/**
- * Sets values in this object from the results of a successful heartbeat command.
- * Returns true if the lastApplied/lastDurable values advanced or we've received a newer
- * config since the last heartbeat response.
+ * Sets values in this object from the results of a successful heartbeat command. Returns a
+ * value indicating whether the lastApplied/lastDurable values advanced, we've received a newer
+ * config, and/or the member state changed since the last heartbeat response.
*/
- bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse);
+ HeartbeatChanges setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse);
/**
* Sets values in this object from the results of a erroring/failed heartbeat command.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 9bf96582627..164dcd358c9 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -235,6 +235,11 @@ public:
virtual void startProducerIfStopped() = 0;
/**
+ * Notify interested parties that member data for other nodes has changed.
+ */
+ virtual void notifyOtherMemberDataChanged() = 0;
+
+ /**
* True if we have discovered that no sync source's oplog overlaps with ours.
*/
virtual bool tooStale() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 227e60b0444..d3d2ab66136 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -1030,6 +1030,13 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
}
}
+void ReplicationCoordinatorExternalStateImpl::notifyOtherMemberDataChanged() {
+ stdx::lock_guard<Latch> lk(_threadMutex);
+ if (_bgSync) {
+ _bgSync->notifySyncSourceSelectionDataChanged();
+ }
+}
+
bool ReplicationCoordinatorExternalStateImpl::tooStale() {
stdx::lock_guard<Latch> lk(_threadMutex);
if (_bgSync) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 9a1e448f636..9b85e38202d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -99,6 +99,7 @@ public:
virtual void signalApplierToChooseNewSyncSource();
virtual void stopProducer();
virtual void startProducerIfStopped();
+ void notifyOtherMemberDataChanged() final;
virtual bool tooStale();
void clearCommittedSnapshot() final;
void updateCommittedSnapshot(const OpTime& newCommitPoint) final;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 78f07512ac3..c2121897614 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -335,5 +335,17 @@ JournalListener* ReplicationCoordinatorExternalStateMock::getReplicationJournalL
MONGO_UNREACHABLE;
}
+void ReplicationCoordinatorExternalStateMock::notifyOtherMemberDataChanged() {
+ _otherMemberDataChanged = true;
+}
+
+void ReplicationCoordinatorExternalStateMock::clearOtherMemberDataChanged() {
+ _otherMemberDataChanged = false;
+}
+
+bool ReplicationCoordinatorExternalStateMock::getOtherMemberDataChanged() const {
+ return _otherMemberDataChanged;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index ecd0f072fed..2b3919c7310 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -90,6 +90,7 @@ public:
virtual void signalApplierToChooseNewSyncSource();
virtual void stopProducer();
virtual void startProducerIfStopped();
+ void notifyOtherMemberDataChanged() final;
virtual bool tooStale();
virtual void clearCommittedSnapshot();
virtual void updateCommittedSnapshot(const OpTime& newCommitPoint);
@@ -200,6 +201,13 @@ public:
virtual bool isShardPartOfShardedCluster(OperationContext* opCtx) const final;
+ /**
+ * Clear the _otherMemberDataChanged flag so we can check it later.
+ */
+ void clearOtherMemberDataChanged();
+
+ bool getOtherMemberDataChanged() const;
+
JournalListener* getReplicationJournalListener() final;
private:
@@ -221,6 +229,7 @@ private:
bool _threadsStarted;
bool _isReadCommittedSupported = true;
bool _areSnapshotsEnabled = true;
+ bool _otherMemberDataChanged = false;
OpTime _firstOpTimeOfMyTerm;
double _electionTimeoutOffsetLimitFraction = 0.15;
Timestamp _globalTimestamp;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 99db6940f3b..10ccfa11cdc 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -5061,10 +5061,21 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi
}
_updateStateAfterRemoteOpTimeUpdates(lock, maxRemoteOpTime);
- if (gotValidUpdate && !_getMemberState_inlock().primary()) {
+ if (gotValidUpdate) {
+ // If we become primary after the unlock below, the forwardSecondaryProgress will do nothing
+ // (slightly expensively). If we become secondary after the unlock below, BackgroundSync
+ // will take care of forwarding our progress by calling signalUpstreamUpdater() once we
+ // select a new sync source. So it's OK to depend on the stale value of wasPrimary here.
+ bool wasPrimary = _getMemberState_inlock().primary();
lock.unlock();
- // Must do this outside _mutex
- _externalState->forwardSecondaryProgress();
+ // maxRemoteOpTime is null here if we got valid updates but no downstream node had
+ // actually advanced any optime.
+ if (!maxRemoteOpTime.isNull())
+ _externalState->notifyOtherMemberDataChanged();
+ if (!wasPrimary) {
+ // Must do this outside _mutex
+ _externalState->forwardSecondaryProgress();
+ }
}
return status;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 1392cceb923..2051b421805 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -469,6 +469,11 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct
break;
}
}
+ if (action.getChangedSignificantly()) {
+ lock.unlock();
+ _externalState->notifyOtherMemberDataChanged();
+ lock.lock();
+ }
return lock;
}
@@ -1025,7 +1030,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
}
myIndex = StatusWith<int>(-1);
}
- const ReplSetConfig oldConfig = _rsConfig;
+ const bool contentChanged = !sameConfigContents(_rsConfig, newConfig);
// If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to
// the data structures inside of the TopologyCoordinator.
const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1;
@@ -1034,6 +1039,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
_setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue);
lk.unlock();
+ if (contentChanged) {
+ _externalState->notifyOtherMemberDataChanged();
+ }
_performPostMemberStateUpdateAction(action);
if (MONGO_unlikely(waitForPostActionCompleteInHbReconfig.shouldFail())) {
// Used in tests that wait for the post member state update action to complete.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index e619276b129..b01a5d51ea3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -1944,6 +1944,158 @@ TEST_F(ReplCoordHBV1Test, handleHeartbeatResponseForTestEnqueuesValidHandle) {
heartbeatReponseThread.join();
}
+TEST_F(ReplCoordHBV1Test, NotifiesExternalStateOfChangeOnlyWhenDataChanges) {
+ unittest::MinimumLoggedSeverityGuard replLogSeverityGuard{logv2::LogComponent::kReplication,
+ logv2::LogSeverity::Debug(3)};
+ // Ensure that the metadata is processed if it is contained in a heartbeat response.
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "term" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1))
+ << "protocolVersion" << 1),
+ HostAndPort("node1", 12345));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_EQUALS(OpTime(), getReplCoord()->getLastCommittedOpTime());
+
+ auto config = getReplCoord()->getConfig();
+
+ auto net = getNet();
+ ReplSetHeartbeatResponse hbResp;
+ OpTimeAndWallTime appliedOpTimeAndWallTime = {OpTime({11, 1}, 1), Date_t::now()};
+ OpTimeAndWallTime durableOpTimeAndWallTime = {OpTime({10, 1}, 1), Date_t::now()};
+ hbResp.setConfigVersion(config.getConfigVersion());
+ hbResp.setConfigTerm(config.getConfigTerm());
+ hbResp.setSetName(config.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setElectable(false);
+ hbResp.setAppliedOpTimeAndWallTime(appliedOpTimeAndWallTime);
+ hbResp.setDurableOpTimeAndWallTime(durableOpTimeAndWallTime);
+ auto hbRespObj = hbResp.toBSON();
+ // First heartbeat, to set the stored data for the node.
+ {
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS(config.getMemberAt(1).getHostAndPort(), request.target);
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ }
+
+ // Second heartbeat, same as the first, should not trigger external notification.
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_FALSE(getExternalState()->getOtherMemberDataChanged());
+ }
+
+ // Change electability, should signal data changed.
+ hbResp.setElectable(true);
+ hbRespObj = hbResp.toBSON();
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged());
+ }
+
+ // Change applied optime, should signal data changed.
+ appliedOpTimeAndWallTime.opTime = OpTime({11, 2}, 1);
+ hbResp.setAppliedOpTimeAndWallTime(appliedOpTimeAndWallTime);
+ hbRespObj = hbResp.toBSON();
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged());
+ }
+
+ // Change durable optime, should signal data changed.
+ durableOpTimeAndWallTime.opTime = OpTime({10, 2}, 1);
+ hbResp.setDurableOpTimeAndWallTime(durableOpTimeAndWallTime);
+ hbRespObj = hbResp.toBSON();
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged());
+ }
+
+ // Change member state, should signal data changed.
+ hbResp.setState(MemberState::RS_PRIMARY);
+ hbRespObj = hbResp.toBSON();
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged());
+ }
+
+ // Change nothing again, should see no change.
+ getExternalState()->clearOtherMemberDataChanged();
+ {
+ net->enterNetwork();
+ net->runUntil(net->now() + config.getHeartbeatInterval());
+ auto noi = net->getNextReadyRequest();
+ auto& request = noi->getRequest();
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_FALSE(getExternalState()->getOtherMemberDataChanged());
+ }
+}
/**
* Test a concurrent stepdown and reconfig. The stepdown is triggered by a heartbeat response
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 6f3daeb392a..4f7ebd4f350 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -1077,6 +1077,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
bool advancedOpTimeOrUpdatedConfig = false;
bool becameElectable = false;
+ bool changedMemberState = false;
if (!hbResponse.isOK()) {
if (isUnauthorized) {
hbData.setAuthIssue(now);
@@ -1105,7 +1106,10 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
"memberId"_attr = member.getId());
pingsInConfig++;
auto wasUnelectable = hbData.isUnelectable();
- advancedOpTimeOrUpdatedConfig = hbData.setUpValues(now, std::move(hbr));
+ auto hbChanges = hbData.setUpValues(now, std::move(hbr));
+ advancedOpTimeOrUpdatedConfig =
+ hbChanges.getOpTimeAdvanced() || hbChanges.getConfigChanged();
+ changedMemberState = hbChanges.getMemberStateChanged();
becameElectable = wasUnelectable && !hbData.isUnelectable();
}
@@ -1121,6 +1125,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
nextAction.setAdvancedOpTimeOrUpdatedConfig(advancedOpTimeOrUpdatedConfig);
nextAction.setBecameElectable(becameElectable);
+ nextAction.setChangedMemberState(changedMemberState);
return nextAction;
}