diff options
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/heartbeat_response_action.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/heartbeat_response_action.h | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/member_data.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/member_data.h | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 7 |
15 files changed, 335 insertions, 23 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 0e5d3fa573b..730a5ecf6dd 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -187,6 +187,8 @@ void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<Latch> lock(_mutex); setState(lock, ProducerState::Stopped); + // If we happen to be waiting for sync source data, stop. + _notifySyncSourceSelectionDataChanged(lock); if (_syncSourceResolver) { _syncSourceResolver->shutdown(); @@ -327,6 +329,11 @@ void BackgroundSync::_produce() { lastOpTimeFetched, OpTime(), [&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; }); + // It is possible for _syncSourceSelectionDataChanged to become true between when we release + // the lock at the end of this block and when the syncSourceResolver retrieves the relevant + // heartbeat data, which means if we don't get a sync source we won't sleep even though we + // used the relevant data. But that's OK because we'll only spin once. + _syncSourceSelectionDataChanged = false; } // This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls // ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to @@ -413,20 +420,18 @@ void BackgroundSync::_produce() { source = _syncSourceHost; } // If our sync source has not changed, it is likely caused by our heartbeat data map being - // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting - // for our map to update. + // out of date. In that case we sleep for up to 1 second to reduce the amount we spin + // waiting for our map to update. If we are notified of heartbeat data change, we will + // interrupt the wait early. if (oldSource == source) { long long sleepMS = _getRetrySleepMS(); LOGV2(21087, - "Chose same sync source candidate as last time, {syncSource}. Sleeping for " - "{sleepDurationMillis}ms to avoid immediately choosing a new sync source for the " - "same reason as last time.", "Chose same sync source candidate as last time. Sleeping to avoid immediately " "choosing a new sync source for the same reason as last time", "syncSource"_attr = source, "sleepDurationMillis"_attr = sleepMS); numTimesChoseSameSyncSource.increment(1); - mongo::sleepmillis(sleepMS); + _waitForNewSyncSourceSelectionData(sleepMS); } else { LOGV2(21088, "Changed sync source from {oldSyncSource} to {newSyncSource}", @@ -449,12 +454,10 @@ void BackgroundSync::_produce() { // No sync source found. LOGV2_DEBUG(21090, 1, - "Could not find a sync source. Sleeping for {sleepDurationMillis}ms before " - "trying again.", "Could not find a sync source. Sleeping before trying again", "sleepDurationMillis"_attr = sleepMS); numTimesCouldNotFindSyncSource.increment(1); - mongo::sleepmillis(sleepMS); + _waitForNewSyncSourceSelectionData(sleepMS); return; } @@ -863,6 +866,33 @@ void BackgroundSync::_fallBackOnRollbackViaRefetch( rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, _replicationProcess); } +void BackgroundSync::notifySyncSourceSelectionDataChanged() { + stdx::lock_guard lock(_mutex); + _notifySyncSourceSelectionDataChanged(lock); +} + +void BackgroundSync::_notifySyncSourceSelectionDataChanged(WithLock) { + if (!_syncSourceSelectionDataChanged) { + _syncSourceSelectionDataChanged = true; + _syncSourceSelectionDataCv.notify_one(); + } +} + +void BackgroundSync::_waitForNewSyncSourceSelectionData(long long waitTimeMillis) { + stdx::unique_lock<Latch> lock(_mutex); + if (_syncSourceSelectionDataCv.wait_for( + lock, stdx::chrono::milliseconds(waitTimeMillis), [this] { + return _syncSourceSelectionDataChanged || _inShutdown; + })) { + LOGV2_DEBUG(6795401, + 1, + "Sync source wait interrupted early", + "syncSourceSelectionDataChanged"_attr = _syncSourceSelectionDataChanged, + "inShutdown"_attr = _inShutdown, + "waitTimeMillis"_attr = waitTimeMillis); + } +} + HostAndPort BackgroundSync::getSyncTarget() const { stdx::unique_lock<Latch> lock(_mutex); return _syncSourceHost; @@ -875,12 +905,16 @@ void BackgroundSync::clearSyncTarget() { "Resetting sync source to empty", "previousSyncSource"_attr = _syncSourceHost); _syncSourceHost = HostAndPort(); + _notifySyncSourceSelectionDataChanged(lock); } void BackgroundSync::_stop(WithLock lock, bool resetLastFetchedOptime) { setState(lock, ProducerState::Stopped); LOGV2(21107, "Stopping replication producer"); + // If we happen to be waiting for sync source data, stop. + _notifySyncSourceSelectionDataChanged(lock); + _syncSourceHost = HostAndPort(); if (resetLastFetchedOptime) { invariant(_oplogApplier->getBuffer()->isEmpty()); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 9bfec5c753b..6b4882e6905 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -122,8 +122,10 @@ public: */ bool tooStale() const; - // starts the sync target notifying thread - void notifierThread(); + /** + * Informs us that data relevant to sync source selection has changed. + */ + void notifySyncSourceSelectionDataChanged(); HostAndPort getSyncTarget() const; @@ -214,6 +216,14 @@ private: long long _getRetrySleepMS(); + // Waits for the given time, or until we are notified that relevant sync source selection data + // has changed. Takes _mutex, so don't call with _mutex held. + void _waitForNewSyncSourceSelectionData(long long waitTimeMillis); + + // Internal version of notifySyncSourceSelectionDataChanged(), to be used by callers + // which already hold _mutex. + void _notifySyncSourceSelectionDataChanged(WithLock); + // This OplogApplier applies oplog entries fetched from the sync source. OplogApplier* const _oplogApplier; @@ -275,6 +285,13 @@ private: // operations in the local oplog in order to bring this server to a consistent state relative // to the sync source. std::unique_ptr<RollbackImpl> _rollback; // (PR) + + // A condition variable used to wake us when sync source selection data changes. + stdx::condition_variable _syncSourceSelectionDataCv; // (S) + + // A latch which tells us if sync source selection data has changed since we called + // the syncSourcSelector + bool _syncSourceSelectionDataChanged = true; // (M) }; diff --git a/src/mongo/db/repl/heartbeat_response_action.cpp b/src/mongo/db/repl/heartbeat_response_action.cpp index 43c1abed4a0..284b34def3a 100644 --- a/src/mongo/db/repl/heartbeat_response_action.cpp +++ b/src/mongo/db/repl/heartbeat_response_action.cpp @@ -83,5 +83,9 @@ void HeartbeatResponseAction::setBecameElectable(bool becameElectable) { _becameElectable = becameElectable; } +void HeartbeatResponseAction::setChangedMemberState(bool changedMemberState) { + _changedMemberState = changedMemberState; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/heartbeat_response_action.h b/src/mongo/db/repl/heartbeat_response_action.h index 8e100d9966d..8d1310b90ab 100644 --- a/src/mongo/db/repl/heartbeat_response_action.h +++ b/src/mongo/db/repl/heartbeat_response_action.h @@ -112,6 +112,11 @@ public: */ void setBecameElectable(bool becameElectable); + /* + * Sets whether or not the member has changed member state since the last heartbeat response. + */ + void setChangedMemberState(bool changedMemberState); + /** * Gets the action type of this action. */ @@ -151,12 +156,27 @@ public: return _becameElectable; } + /* + * Returns true if the heartbeat response results in the member changing member state. + */ + bool getChangedMemberState() const { + return _changedMemberState; + } + + /* + * Returns true if the heartbeat results in any significant change in member data. + */ + bool getChangedSignificantly() const { + return _changedMemberState || _advancedOpTimeOrUpdatedConfig || _becameElectable; + } + private: Action _action; int _primaryIndex; Date_t _nextHeartbeatStartDate; bool _advancedOpTimeOrUpdatedConfig = false; bool _becameElectable = false; + bool _changedMemberState = false; }; } // namespace repl diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp index 054e93c6145..59fa6f057aa 100644 --- a/src/mongo/db/repl/member_data.cpp +++ b/src/mongo/db/repl/member_data.cpp @@ -47,7 +47,8 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is _lastResponse.setAppliedOpTimeAndWallTime(OpTimeAndWallTime()); } -bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) { +MemberData::HeartbeatChanges MemberData::setUpValues(Date_t now, + ReplSetHeartbeatResponse&& hbResponse) { _health = 1; if (_upSince == Date_t()) { _upSince = now; @@ -69,7 +70,8 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) hbResponse.setAppliedOpTimeAndWallTime(_lastResponse.getAppliedOpTimeAndWallTime()); } // Log if the state changes - if (_lastResponse.getState() != hbResponse.getState()) { + const bool memberStateChanged = _lastResponse.getState() != hbResponse.getState(); + if (memberStateChanged) { LOGV2(21215, "Member {hostAndPort} is now in state {newState}", "Member is in new state", @@ -91,7 +93,7 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) _lastResponse = std::move(hbResponse); - return (opTimeAdvanced || configChanged); + return {opTimeAdvanced, configChanged, memberStateChanged}; } void MemberData::setDownValues(Date_t now, const std::string& heartbeatMessage) { diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h index 1903a1b448d..1b9bb68edb7 100644 --- a/src/mongo/db/repl/member_data.h +++ b/src/mongo/db/repl/member_data.h @@ -44,6 +44,31 @@ namespace repl { **/ class MemberData { public: + class HeartbeatChanges { + public: + HeartbeatChanges(bool opTimeAdvanced, bool configChanged, bool memberStateChanged) + : _opTimeAdvanced(opTimeAdvanced), + _configChanged(configChanged), + _memberStateChanged(memberStateChanged) {} + + bool getOpTimeAdvanced() const { + return _opTimeAdvanced; + } + + bool getConfigChanged() const { + return _configChanged; + } + + bool getMemberStateChanged() const { + return _memberStateChanged; + } + + private: + const bool _opTimeAdvanced; + const bool _configChanged; + const bool _memberStateChanged; + }; + MemberData(); MemberState getState() const { @@ -159,11 +184,11 @@ public: } /** - * Sets values in this object from the results of a successful heartbeat command. - * Returns true if the lastApplied/lastDurable values advanced or we've received a newer - * config since the last heartbeat response. + * Sets values in this object from the results of a successful heartbeat command. Returns a + * value indicating whether the lastApplied/lastDurable values advanced, we've received a newer + * config, and/or the member state changed since the last heartbeat response. */ - bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse); + HeartbeatChanges setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse); /** * Sets values in this object from the results of a erroring/failed heartbeat command. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 9bf96582627..164dcd358c9 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -235,6 +235,11 @@ public: virtual void startProducerIfStopped() = 0; /** + * Notify interested parties that member data for other nodes has changed. + */ + virtual void notifyOtherMemberDataChanged() = 0; + + /** * True if we have discovered that no sync source's oplog overlaps with ours. */ virtual bool tooStale() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 227e60b0444..d3d2ab66136 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -1030,6 +1030,13 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { } } +void ReplicationCoordinatorExternalStateImpl::notifyOtherMemberDataChanged() { + stdx::lock_guard<Latch> lk(_threadMutex); + if (_bgSync) { + _bgSync->notifySyncSourceSelectionDataChanged(); + } +} + bool ReplicationCoordinatorExternalStateImpl::tooStale() { stdx::lock_guard<Latch> lk(_threadMutex); if (_bgSync) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 9a1e448f636..9b85e38202d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -99,6 +99,7 @@ public: virtual void signalApplierToChooseNewSyncSource(); virtual void stopProducer(); virtual void startProducerIfStopped(); + void notifyOtherMemberDataChanged() final; virtual bool tooStale(); void clearCommittedSnapshot() final; void updateCommittedSnapshot(const OpTime& newCommitPoint) final; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 78f07512ac3..c2121897614 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -335,5 +335,17 @@ JournalListener* ReplicationCoordinatorExternalStateMock::getReplicationJournalL MONGO_UNREACHABLE; } +void ReplicationCoordinatorExternalStateMock::notifyOtherMemberDataChanged() { + _otherMemberDataChanged = true; +} + +void ReplicationCoordinatorExternalStateMock::clearOtherMemberDataChanged() { + _otherMemberDataChanged = false; +} + +bool ReplicationCoordinatorExternalStateMock::getOtherMemberDataChanged() const { + return _otherMemberDataChanged; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index ecd0f072fed..2b3919c7310 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -90,6 +90,7 @@ public: virtual void signalApplierToChooseNewSyncSource(); virtual void stopProducer(); virtual void startProducerIfStopped(); + void notifyOtherMemberDataChanged() final; virtual bool tooStale(); virtual void clearCommittedSnapshot(); virtual void updateCommittedSnapshot(const OpTime& newCommitPoint); @@ -200,6 +201,13 @@ public: virtual bool isShardPartOfShardedCluster(OperationContext* opCtx) const final; + /** + * Clear the _otherMemberDataChanged flag so we can check it later. + */ + void clearOtherMemberDataChanged(); + + bool getOtherMemberDataChanged() const; + JournalListener* getReplicationJournalListener() final; private: @@ -221,6 +229,7 @@ private: bool _threadsStarted; bool _isReadCommittedSupported = true; bool _areSnapshotsEnabled = true; + bool _otherMemberDataChanged = false; OpTime _firstOpTimeOfMyTerm; double _electionTimeoutOffsetLimitFraction = 0.15; Timestamp _globalTimestamp; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 99db6940f3b..10ccfa11cdc 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -5061,10 +5061,21 @@ Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePosi } _updateStateAfterRemoteOpTimeUpdates(lock, maxRemoteOpTime); - if (gotValidUpdate && !_getMemberState_inlock().primary()) { + if (gotValidUpdate) { + // If we become primary after the unlock below, the forwardSecondaryProgress will do nothing + // (slightly expensively). If we become secondary after the unlock below, BackgroundSync + // will take care of forwarding our progress by calling signalUpstreamUpdater() once we + // select a new sync source. So it's OK to depend on the stale value of wasPrimary here. + bool wasPrimary = _getMemberState_inlock().primary(); lock.unlock(); - // Must do this outside _mutex - _externalState->forwardSecondaryProgress(); + // maxRemoteOpTime is null here if we got valid updates but no downstream node had + // actually advanced any optime. + if (!maxRemoteOpTime.isNull()) + _externalState->notifyOtherMemberDataChanged(); + if (!wasPrimary) { + // Must do this outside _mutex + _externalState->forwardSecondaryProgress(); + } } return status; } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 1392cceb923..2051b421805 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -469,6 +469,11 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct break; } } + if (action.getChangedSignificantly()) { + lock.unlock(); + _externalState->notifyOtherMemberDataChanged(); + lock.lock(); + } return lock; } @@ -1025,7 +1030,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( } myIndex = StatusWith<int>(-1); } - const ReplSetConfig oldConfig = _rsConfig; + const bool contentChanged = !sameConfigContents(_rsConfig, newConfig); // If we do not have an index, we should pass -1 as our index to avoid falsely adding ourself to // the data structures inside of the TopologyCoordinator. const int myIndexValue = myIndex.getStatus().isOK() ? myIndex.getValue() : -1; @@ -1034,6 +1039,9 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( _setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue); lk.unlock(); + if (contentChanged) { + _externalState->notifyOtherMemberDataChanged(); + } _performPostMemberStateUpdateAction(action); if (MONGO_unlikely(waitForPostActionCompleteInHbReconfig.shouldFail())) { // Used in tests that wait for the post member state update action to complete. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index e619276b129..b01a5d51ea3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -1944,6 +1944,158 @@ TEST_F(ReplCoordHBV1Test, handleHeartbeatResponseForTestEnqueuesValidHandle) { heartbeatReponseThread.join(); } +TEST_F(ReplCoordHBV1Test, NotifiesExternalStateOfChangeOnlyWhenDataChanges) { + unittest::MinimumLoggedSeverityGuard replLogSeverityGuard{logv2::LogComponent::kReplication, + logv2::LogSeverity::Debug(3)}; + // Ensure that the metadata is processed if it is contained in a heartbeat response. + assertStartSuccess(BSON("_id" + << "mySet" + << "term" << 1 << "version" << 2 << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0) + << BSON("host" + << "node2:12345" + << "_id" << 1)) + << "protocolVersion" << 1), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_EQUALS(OpTime(), getReplCoord()->getLastCommittedOpTime()); + + auto config = getReplCoord()->getConfig(); + + auto net = getNet(); + ReplSetHeartbeatResponse hbResp; + OpTimeAndWallTime appliedOpTimeAndWallTime = {OpTime({11, 1}, 1), Date_t::now()}; + OpTimeAndWallTime durableOpTimeAndWallTime = {OpTime({10, 1}, 1), Date_t::now()}; + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setConfigTerm(config.getConfigTerm()); + hbResp.setSetName(config.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setElectable(false); + hbResp.setAppliedOpTimeAndWallTime(appliedOpTimeAndWallTime); + hbResp.setDurableOpTimeAndWallTime(durableOpTimeAndWallTime); + auto hbRespObj = hbResp.toBSON(); + // First heartbeat, to set the stored data for the node. + { + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS(config.getMemberAt(1).getHostAndPort(), request.target); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + } + + // Second heartbeat, same as the first, should not trigger external notification. + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_FALSE(getExternalState()->getOtherMemberDataChanged()); + } + + // Change electability, should signal data changed. + hbResp.setElectable(true); + hbRespObj = hbResp.toBSON(); + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged()); + } + + // Change applied optime, should signal data changed. + appliedOpTimeAndWallTime.opTime = OpTime({11, 2}, 1); + hbResp.setAppliedOpTimeAndWallTime(appliedOpTimeAndWallTime); + hbRespObj = hbResp.toBSON(); + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged()); + } + + // Change durable optime, should signal data changed. + durableOpTimeAndWallTime.opTime = OpTime({10, 2}, 1); + hbResp.setDurableOpTimeAndWallTime(durableOpTimeAndWallTime); + hbRespObj = hbResp.toBSON(); + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged()); + } + + // Change member state, should signal data changed. + hbResp.setState(MemberState::RS_PRIMARY); + hbRespObj = hbResp.toBSON(); + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_TRUE(getExternalState()->getOtherMemberDataChanged()); + } + + // Change nothing again, should see no change. + getExternalState()->clearOtherMemberDataChanged(); + { + net->enterNetwork(); + net->runUntil(net->now() + config.getHeartbeatInterval()); + auto noi = net->getNextReadyRequest(); + auto& request = noi->getRequest(); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbRespObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_FALSE(getExternalState()->getOtherMemberDataChanged()); + } +} /** * Test a concurrent stepdown and reconfig. The stepdown is triggered by a heartbeat response diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 6f3daeb392a..4f7ebd4f350 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1077,6 +1077,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( const MemberConfig member = _rsConfig.getMemberAt(memberIndex); bool advancedOpTimeOrUpdatedConfig = false; bool becameElectable = false; + bool changedMemberState = false; if (!hbResponse.isOK()) { if (isUnauthorized) { hbData.setAuthIssue(now); @@ -1105,7 +1106,10 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( "memberId"_attr = member.getId()); pingsInConfig++; auto wasUnelectable = hbData.isUnelectable(); - advancedOpTimeOrUpdatedConfig = hbData.setUpValues(now, std::move(hbr)); + auto hbChanges = hbData.setUpValues(now, std::move(hbr)); + advancedOpTimeOrUpdatedConfig = + hbChanges.getOpTimeAdvanced() || hbChanges.getConfigChanged(); + changedMemberState = hbChanges.getMemberStateChanged(); becameElectable = wasUnelectable && !hbData.isUnelectable(); } @@ -1121,6 +1125,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); nextAction.setAdvancedOpTimeOrUpdatedConfig(advancedOpTimeOrUpdatedConfig); nextAction.setBecameElectable(becameElectable); + nextAction.setChangedMemberState(changedMemberState); return nextAction; } |