diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2018-08-14 10:50:21 -0400 |
---|---|---|
committer | Tess Avitabile <tess.avitabile@mongodb.com> | 2018-08-22 10:41:55 -0400 |
commit | f5e7c8f3e81fe0cd34d4952ed2b547f3c29e06a4 (patch) | |
tree | f6db97b38a429e65abe042b367afcd79cb08ce1e /src/mongo/db | |
parent | 76b46c9e1080dc2f80091f06591eb4775d66a340 (diff) | |
download | mongo-f5e7c8f3e81fe0cd34d4952ed2b547f3c29e06a4.tar.gz |
SERVER-33248 Allow choosing a sync source that we are up to date with if it has a higher lastOpCommitted
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/member_data.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/member_data.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_v1_test.cpp | 196 |
8 files changed, 299 insertions, 82 deletions
diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp index 9aed752d3d3..430e55424ce 100644 --- a/src/mongo/db/repl/member_data.cpp +++ b/src/mongo/db/repl/member_data.cpp @@ -45,7 +45,9 @@ MemberData::MemberData() : _health(-1), _authIssue(false), _configIndex(-1), _is _lastResponse.setAppliedOpTime(OpTime()); } -bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) { +bool MemberData::setUpValues(Date_t now, + ReplSetHeartbeatResponse&& hbResponse, + OpTime lastOpCommitted) { _health = 1; if (_upSince == Date_t()) { _upSince = now; @@ -57,6 +59,10 @@ bool MemberData::setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse) _updatedSinceRestart = true; _lastHeartbeatMessage.clear(); + if (!lastOpCommitted.isNull()) { + _lastOpCommitted = lastOpCommitted; + } + if (!hbResponse.hasState()) { hbResponse.setState(MemberState::RS_UNKNOWN); } diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h index 2598a34f30d..219a7e9f06b 100644 --- a/src/mongo/db/repl/member_data.h +++ b/src/mongo/db/repl/member_data.h @@ -74,6 +74,9 @@ public: OpTime getHeartbeatDurableOpTime() const { return _lastResponse.hasDurableOpTime() ? _lastResponse.getDurableOpTime() : OpTime(); } + OpTime getHeartbeatLastOpCommitted() const { + return _lastOpCommitted; + } int getConfigVersion() const { return _lastResponse.getConfigVersion(); } @@ -135,9 +138,10 @@ public: /** * Sets values in this object from the results of a successful heartbeat command. + * 'lastOpCommitted' should be extracted from the heartbeat metadata. * Returns whether or not the optimes advanced as a result of this heartbeat response. */ - bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse); + bool setUpValues(Date_t now, ReplSetHeartbeatResponse&& hbResponse, OpTime lastOpCommitted); /** * Sets values in this object from the results of a erroring/failed heartbeat command. @@ -256,6 +260,11 @@ private: // Last known OpTime that the replica has applied, whether journaled or unjournaled. OpTime _lastAppliedOpTime; + // OpTime of the most recently committed op of which the node was aware, extracted from the + // heartbeat metadata. Note that only arbiters should update their knowledge of the commit point + // from heartbeat data. + OpTime _lastOpCommitted; + // TODO(russotto): Since memberData is kept in config order, _configIndex // and _isSelf may not be necessary. // Index of this member in the replica set configuration. diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7a0ebb75d2e..e1e5a0beb63 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -110,8 +110,11 @@ BSONObj makeMetadataObject() { * Checks the first batch of results from query. * 'documents' are the first batch of results returned from tailing the remote oplog. * 'lastFetched' optime and hash should be consistent with the predicate in the query. + * 'lastOpCommitted' is the OpTime of the most recently committed op of which this node is aware. * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for * compatibility with 3.4 servers that do not send OplogQueryMetadata. + * 'remoteLastOpCommitted' is the OpTime of the most recently committed op of which the sync source + * is aware. * 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to * guarantee we have not rolled back since we confirmed the sync source had our minValid. * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional @@ -120,14 +123,17 @@ BSONObj makeMetadataObject() { * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point * as ours, but still cannot be behind ours. * - * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8. + * TODO (SERVER-27668): Make remoteLastOpApplied, remoteLastOpCommitted, and remoteRBID + * non-optional. * * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in * the remote oplog. */ Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched, + OpTime lastOpCommitted, boost::optional<OpTime> remoteLastOpApplied, + boost::optional<OpTime> remoteLastOpCommitted, int requiredRBID, boost::optional<int> remoteRBID, bool requireFresherSyncSource) { @@ -151,22 +157,11 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, } } - // The SyncSourceResolver never checks that the sync source candidate is actually ahead of - // us. Rather than have it check there with an extra network roundtrip, we check here. - if (requireFresherSyncSource && remoteLastOpApplied && - (*remoteLastOpApplied <= lastFetched.opTime)) { - return Status(ErrorCodes::InvalidSyncSource, - str::stream() << "Sync source's last applied OpTime " - << remoteLastOpApplied->toString() - << " is not greater than our last fetched OpTime " - << lastFetched.opTime.toString() - << ". Choosing new sync source."); - } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { - // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied - // since we fetch the sync source's last applied OpTime to determine where to start our - // OplogFetcher. This is fine since no other node can sync off of an initial syncing node - // and thus cannot form a sync source cycle. To account for this, we must relax the - // constraint on our sync source being fresher. + // The sync source could be behind us if it rolled back after we selected it. We could have + // failed to detect the rollback if it occurred between sync source selection (when we check the + // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the + // sync source is now behind us, choose a new sync source to prevent going into rollback. + if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { return Status(ErrorCodes::InvalidSyncSource, str::stream() << "Sync source's last applied OpTime " << remoteLastOpApplied->toString() @@ -175,10 +170,36 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, << ". Choosing new sync source."); } - // At this point we know that our sync source has our minValid and is ahead of us, so if our + // If 'requireFresherSyncSource' is true, we must check that the sync source's + // lastApplied/lastOpCommitted is ahead of us to prevent forming a cycle. Although we check for + // this condition in sync source selection, if an undetected rollback occurred between sync + // source selection and sync source resolution, this condition may no longer hold. + // 'requireFresherSyncSource' is false for initial sync, since no other node can sync off an + // initial syncing node, so we do not need to check for cycles. In addition, it would be + // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will + // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied + // OpTime to determine where to start our OplogFetcher. + if (requireFresherSyncSource && remoteLastOpApplied && remoteLastOpCommitted && + std::tie(*remoteLastOpApplied, *remoteLastOpCommitted) <= + std::tie(lastFetched.opTime, lastOpCommitted)) { + return Status(ErrorCodes::InvalidSyncSource, + str::stream() + << "Sync source cannot be behind me, and if I am up-to-date with the " + "sync source, it must have a higher lastOpCommitted. " + << "My last fetched oplog optime: " + << lastFetched.opTime.toString() + << ", latest oplog optime of sync source: " + << remoteLastOpApplied->toString() + << ", my lastOpCommitted: " + << lastOpCommitted.toString() + << ", lastOpCommitted of sync source: " + << remoteLastOpCommitted->toString()); + } + + // At this point we know that our sync source has our minValid and is not behind us, so if our // history diverges from our sync source's we should prefer its history and roll back ours. - // Since we checked for rollback and our sync source is ahead of us, an empty batch means that + // Since we checked for rollback and our sync source is not behind us, an empty batch means that // we have a higher timestamp on our last fetched OpTime than our sync source's last applied // OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry. if (documents.empty()) { @@ -420,12 +441,17 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; auto remoteLastApplied = oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; - auto status = checkRemoteOplogStart(documents, - lastFetched, - remoteLastApplied, - _requiredRBID, - remoteRBID, - _requireFresherSyncSource); + auto remoteLastOpCommitted = + oqMetadata ? boost::make_optional(oqMetadata->getLastOpCommitted()) : boost::none; + auto status = checkRemoteOplogStart( + documents, + lastFetched, + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().opTime, + remoteLastApplied, + remoteLastOpCommitted, + _requiredRBID, + remoteRBID, + _requireFresherSyncSource); if (!status.isOK()) { // Stop oplog fetcher and execute rollback if necessary. return status; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 8f03c6c50cb..ef2985b4d20 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -371,6 +371,22 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead } TEST_F(OplogFetcherTest, + MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadButHasHigherLastOpCommitted) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched.opTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + auto entry = makeNoopOplogEntry(lastFetched); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false); + ASSERT_OK(shutdownState->getStatus()); + ASSERT(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 2b06d004d0a..59f982a129c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -144,6 +144,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } ReplSetHeartbeatResponse hbResponse; + OpTime lastOpCommitted; BSONObj resp; if (responseStatus.isOK()) { resp = cbData.response.data; @@ -172,9 +173,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( replMetadata = responseStatus; } if (replMetadata.isOK()) { + lastOpCommitted = replMetadata.getValue().getLastOpCommitted(); + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. if (_getMemberState_inlock().arbiter()) { - _advanceCommitPoint_inlock(replMetadata.getValue().getLastOpCommitted()); + _advanceCommitPoint_inlock(lastOpCommitted); } // Asynchronous stepdown could happen, but it will wait for _mutex and execute // after this function, so we cannot and don't need to wait for it to finish. @@ -204,8 +207,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( hbStatusResponse = StatusWith<ReplSetHeartbeatResponse>(responseStatus); } - HeartbeatResponseAction action = - _topCoord->processHeartbeatResponse(now, networkTime, target, hbStatusResponse); + HeartbeatResponseAction action = _topCoord->processHeartbeatResponse( + now, networkTime, target, hbStatusResponse, lastOpCommitted); if (action.getAction() == HeartbeatResponseAction::NoAction && hbStatusResponse.isOK() && hbStatusResponse.getValue().hasState() && diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 0bc6fb7fcbf..a298f5e0ecc 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -377,13 +377,21 @@ HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, continue; } } - // only consider candidates that are ahead of where we are - if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) { - LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " - << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() - << ", latest oplog optime of sync candidate " - << itMemberConfig.getHostAndPort() << ": " - << it->getHeartbeatAppliedOpTime().toBSON(); + // Do not select a candidate that is behind me. If I am up to date with the candidate, + // only select them if they have a higher lastOpCommitted. + if (std::tuple<OpTime, OpTime>(it->getHeartbeatAppliedOpTime(), + it->getHeartbeatLastOpCommitted()) <= + std::tie(lastOpTimeFetched, _lastCommittedOpTime)) { + LOG(1) << "Cannot select this sync source. Sync source cannot be behind me, and if " + "I am up-to-date with the sync source, it must have a higher " + "lastOpCommitted. " + << "Sync candidate: " << itMemberConfig.getHostAndPort() + << ", my last fetched oplog optime: " << lastOpTimeFetched.toBSON() + << ", latest oplog optime of sync candidate: " + << it->getHeartbeatAppliedOpTime().toBSON() + << ", my lastOpCommitted: " << _lastCommittedOpTime + << ", lastOpCommitted of sync candidate: " + << it->getHeartbeatLastOpCommitted(); continue; } // Candidate cannot be more latent than anything we've already considered. @@ -684,7 +692,8 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse) { + const StatusWith<ReplSetHeartbeatResponse>& hbResponse, + OpTime lastOpCommitted) { const MemberState originalState = getMemberState(); PingStats& hbStats = _pings[target]; invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); @@ -794,7 +803,7 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( } else { ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue()); LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId(); - advancedOpTime = hbData.setUpValues(now, std::move(hbr)); + advancedOpTime = hbData.setUpValues(now, std::move(hbr), lastOpCommitted); } HeartbeatResponseAction nextAction; @@ -1349,7 +1358,8 @@ void TopologyCoordinator::setCurrentPrimary_forTest(int primaryIndex, hbResponse.setSyncingTo(HostAndPort()); _memberData.at(primaryIndex) .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(), - std::move(hbResponse)); + std::move(hbResponse), + _memberData.at(primaryIndex).getHeartbeatLastOpCommitted()); } _currentPrimaryIndex = primaryIndex; } @@ -1635,6 +1645,9 @@ void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) { const auto heartbeatDurableOpTime = memberData.getHeartbeatDurableOpTime(); entry.append("heartbeatDurableOpTime", heartbeatDurableOpTime.toBSON()); + const auto lastOpCommitted = memberData.getHeartbeatLastOpCommitted(); + entry.append("heartbeatLastOpCommitted", lastOpCommitted.toBSON()); + if (_selfIndex >= 0) { const int memberId = memberData.getMemberId(); invariant(memberId >= 0); @@ -2489,18 +2502,25 @@ bool TopologyCoordinator::shouldChangeSyncSource( // If OplogQueryMetadata was provided, use its values, otherwise use the ones in // ReplSetMetadata. OpTime currentSourceOpTime; + OpTime currentSourceLastOpCommitted; int syncSourceIndex = -1; int primaryIndex = -1; if (oqMetadata) { currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + currentSourceLastOpCommitted = + std::max(oqMetadata->getLastOpCommitted(), + _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted()); syncSourceIndex = oqMetadata->getSyncSourceIndex(); primaryIndex = oqMetadata->getPrimaryIndex(); } else { currentSourceOpTime = std::max(replMetadata.getLastOpVisible(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); + currentSourceLastOpCommitted = + std::max(replMetadata.getLastOpCommitted(), + _memberData.at(currentSourceIndex).getHeartbeatLastOpCommitted()); syncSourceIndex = replMetadata.getSyncSourceIndex(); primaryIndex = replMetadata.getPrimaryIndex(); } @@ -2514,13 +2534,21 @@ bool TopologyCoordinator::shouldChangeSyncSource( // Change sync source if they are not ahead of us, and don't have a sync source, // unless they are primary. const OpTime myLastOpTime = getMyLastAppliedOpTime(); - if (syncSourceIndex == -1 && currentSourceOpTime <= myLastOpTime && + if (syncSourceIndex == -1 && + std::tie(currentSourceOpTime, currentSourceLastOpCommitted) <= + std::tie(myLastOpTime, _lastCommittedOpTime) && primaryIndex != currentSourceIndex) { std::stringstream logMessage; - logMessage << "Choosing new sync source because our current sync source, " - << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime - << ") which is not ahead of ours (" << myLastOpTime - << "), it does not have a sync source, and it's not the primary"; + + logMessage << "Choosing new sync source. Our current sync source is not primary and does " + "not have a sync source, so we require that it is not behind us, and that if " + "we are up-to-date with it, it has a higher lastOpCommitted. " + << "Current sync source: " << currentSource.toString() + << ", my last fetched oplog optime: " << myLastOpTime + << ", latest oplog optime of sync source: " << currentSourceOpTime + << ", my lastOpCommitted: " << _lastCommittedOpTime + << ", lastOpCommitted of sync source: " << currentSourceLastOpCommitted; + if (primaryIndex >= 0) { logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; } else { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 9416a4653a0..117bc784f75 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -391,8 +391,8 @@ public: Date_t now, const std::string& ourSetName, const HostAndPort& target); /** - * Processes a heartbeat response from "target" that arrived around "now", having - * spent "networkRoundTripTime" millis on the network. + * Processes a heartbeat response from "target" that arrived around "now" with "lastOpCommitted" + * in the metadata, having spent "networkRoundTripTime" millis on the network. * * Updates internal topology coordinator state, and returns instructions about what action * to take next. @@ -414,7 +414,8 @@ public: Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, - const StatusWith<ReplSetHeartbeatResponse>& hbResponse); + const StatusWith<ReplSetHeartbeatResponse>& hbResponse, + OpTime lastOpCommitted); /** * Returns whether or not at least 'numNodes' have reached the given opTime. diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 89a8af0d74e..be14cb543bb 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -169,9 +169,10 @@ protected: // Only set visibleOpTime, primaryIndex and syncSourceIndex ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(), int primaryIndex = -1, - int syncSourceIndex = -1) { + int syncSourceIndex = -1, + OpTime committedOpTime = OpTime()) { return ReplSetMetadata(_topo->getTerm(), - OpTime(), + committedOpTime, visibleOpTime, _currentConfig.getConfigVersion(), OID(), @@ -183,8 +184,10 @@ protected: // Only set lastAppliedOpTime, primaryIndex and syncSourceIndex OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(), int primaryIndex = -1, - int syncSourceIndex = -1) { - return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); + int syncSourceIndex = -1, + OpTime committedOpTime = OpTime()) { + return OplogQueryMetadata( + committedOpTime, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); } HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, @@ -192,13 +195,15 @@ protected: MemberState memberState, const OpTime& electionTime, const OpTime& lastOpTimeSender, - const HostAndPort& syncingTo = HostAndPort()) { + const HostAndPort& syncingTo = HostAndPort(), + const OpTime& lastOpCommittedSender = OpTime()) { return _receiveHeartbeatHelper(Status::OK(), member, setName, memberState, electionTime.getTimestamp(), lastOpTimeSender, + lastOpCommittedSender, Milliseconds(1), syncingTo); } @@ -216,6 +221,7 @@ protected: MemberState::RS_UNKNOWN, Timestamp(), OpTime(), + OpTime(), roundTripTime, HostAndPort()); } @@ -224,13 +230,15 @@ protected: const std::string& setName, MemberState memberState, const OpTime& lastOpTimeSender, - Milliseconds roundTripTime = Milliseconds(1)) { + Milliseconds roundTripTime = Milliseconds(1), + const OpTime& lastOpCommittedSender = OpTime()) { return _receiveHeartbeatHelper(Status::OK(), member, setName, memberState, Timestamp(), lastOpTimeSender, + lastOpCommittedSender, roundTripTime, HostAndPort()); } @@ -242,6 +250,7 @@ private: MemberState memberState, Timestamp electionTime, const OpTime& lastOpTimeSender, + const OpTime& lastOpCommittedSender, Milliseconds roundTripTime, const HostAndPort& syncingTo) { ReplSetHeartbeatResponse hb; @@ -259,7 +268,8 @@ private: getTopoCoord().prepareHeartbeatRequestV1(now(), setName, member); now() += roundTripTime; - return getTopoCoord().processHeartbeatResponse(now(), roundTripTime, member, hbResponse); + return getTopoCoord().processHeartbeatResponse( + now(), roundTripTime, member, hbResponse, lastOpCommittedSender); } private: @@ -574,6 +584,80 @@ TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) { ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } +TEST_F(TopoCoordTest, NodeCanChooseSyncSourceWithSameLastAppliedAndHigherLastOpCommitted) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "hself") + << BSON("_id" << 10 << "host" + << "h1"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + OpTime lastApplied = OpTime(Timestamp(100, 3), 3); + OpTime ourLastOpCommitted = OpTime(Timestamp(100, 1), 3); + OpTime lastOpCommittedSyncSource = OpTime(Timestamp(100, 2), 3); + + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommittedSyncSource); + + // Record 2nd round of pings to allow choosing a new sync source. + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommittedSyncSource); + + ASSERT(getTopoCoord().advanceLastCommittedOpTime(ourLastOpCommitted)); + getTopoCoord().chooseNewSyncSource( + now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress()); +} + +TEST_F(TopoCoordTest, NodeCannotChooseSyncSourceWithSameLastAppliedAndSameLastOpCommitted) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "hself") + << BSON("_id" << 10 << "host" + << "h1"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + OpTime lastApplied = OpTime(Timestamp(100, 3), 3); + OpTime lastOpCommitted = OpTime(Timestamp(100, 1), 3); + + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommitted); + + // Record 2nd round of pings to allow choosing a new sync source. + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + lastApplied, + Milliseconds(100), + lastOpCommitted); + + ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted)); + getTopoCoord().chooseNewSyncSource( + now()++, lastApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); +} TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { updateConfig(BSON("_id" @@ -1613,7 +1697,7 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { HostAndPort member = HostAndPort("test0:1234"); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(1), setName, member); getTopoCoord().processHeartbeatResponse( - startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood); + startupTime + Milliseconds(2), Milliseconds(1), member, hbResponseGood, OpTime()); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(3), setName, member); Date_t timeoutTime = startupTime + Milliseconds(3) + ReplSetConfig::kDefaultHeartbeatTimeoutPeriod; @@ -1622,12 +1706,12 @@ TEST_F(TopoCoordTest, ReplSetGetStatus) { StatusWith<ReplSetHeartbeatResponse>(Status(ErrorCodes::HostUnreachable, "")); getTopoCoord().processHeartbeatResponse( - timeoutTime, Milliseconds(5000), member, hbResponseDown); + timeoutTime, Milliseconds(5000), member, hbResponseDown, OpTime()); member = HostAndPort("test1:1234"); getTopoCoord().prepareHeartbeatRequestV1(startupTime + Milliseconds(2), setName, member); getTopoCoord().processHeartbeatResponse( - heartbeatTime, Milliseconds(4000), member, hbResponseGood); + heartbeatTime, Milliseconds(4000), member, hbResponseGood, OpTime()); makeSelfPrimary(electionTime); getTopoCoord().setMyLastAppliedOpTime(oplogProgress, startupTime, false); getTopoCoord().setMyLastDurableOpTime(oplogDurable, startupTime, false); @@ -1814,7 +1898,7 @@ TEST_F(TopoCoordTest, HeartbeatFrequencyShouldBeHalfElectionTimeoutWhenArbiter) std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest = getTopoCoord().prepareHeartbeatRequestV1(requestDate, "myset", target); auto action = getTopoCoord().processHeartbeatResponse( - requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>()); + requestDate, Milliseconds(0), target, makeStatusWith<ReplSetHeartbeatResponse>(), OpTime()); Date_t expected(now() + Milliseconds(2500)); ASSERT_EQUALS(expected, action.getNextHeartbeatStartDate()); } @@ -3397,6 +3481,31 @@ TEST_F(HeartbeatResponseTestV1, now())); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now())); + + // If we are as up-to-date as this sync source, but it has a higher lastOpCommitted, we will not + // change sync sources. + OpTime lastOpCommitted = OpTime(Timestamp(100, 0), 0); + OpTime newerLastOpCommitted = OpTime(Timestamp(200, 0), 0); + ASSERT(getTopoCoord().advanceLastCommittedOpTime(lastOpCommitted)); + nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + lastOpTimeApplied, + HostAndPort(), + newerLastOpCommitted); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted), + now())); + + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(lastOpTimeApplied, -1, -1, newerLastOpCommitted), + boost::none, + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -3653,7 +3762,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRep hb.setElectionTime(election.getTimestamp()); StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse( - now()++, Milliseconds(0), HostAndPort("host3"), hbResponse); + now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime()); // primary should not be set and we should perform NoAction in response ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); @@ -3708,7 +3817,7 @@ TEST_F(HeartbeatResponseTestV1, ReconfigBetweenHeartbeatRequestAndRepsonse) { StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); getTopoCoord().setMyLastAppliedOpTime(lastOpTimeApplied, Date_t(), false); HeartbeatResponseAction action = getTopoCoord().processHeartbeatResponse( - now()++, Milliseconds(0), HostAndPort("host3"), hbResponse); + now()++, Milliseconds(0), HostAndPort("host3"), hbResponse, OpTime()); // now primary should be host3, index 1, and we should perform NoAction in response ASSERT_EQUALS(1, getCurrentPrimaryIndex()); @@ -3884,13 +3993,15 @@ TEST_F(TopoCoordTest, FreshestNodeDoesCatchupTakeover) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); ASSERT_OK(getTopoCoord().becomeCandidateIfElectable( @@ -3937,13 +4048,15 @@ TEST_F(TopoCoordTest, StaleNodeDoesntDoCatchupTakeover) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -3993,12 +4106,14 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverHeartbeatSaysPrimaryCaughtUp) { getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -4051,13 +4166,15 @@ TEST_F(TopoCoordTest, NodeDoesntDoCatchupTakeoverIfTermNumbersSayPrimaryCaughtUp getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host3:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); hbResp.setAppliedOpTime(behindOptime); hbResp.setState(MemberState::RS_PRIMARY); getTopoCoord().processHeartbeatResponse(firstRequestDate + Milliseconds(1000), Milliseconds(999), HostAndPort("host2:27017"), - StatusWith<ReplSetHeartbeatResponse>(hbResp)); + StatusWith<ReplSetHeartbeatResponse>(hbResp), + OpTime()); getTopoCoord().updateTerm(1, Date_t()); Status result = getTopoCoord().becomeCandidateIfElectable( @@ -5526,7 +5643,8 @@ TEST_F(HeartbeatResponseTestV1, NodeDoesNotRetryHeartbeatIfTheFirstFailureTakesT // no retry allowed. Milliseconds(4990), // Spent 4.99 of the 5 seconds in the network. target, - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5626,8 +5744,12 @@ public: Date_t _upRequestDate = unittest::assertGet(dateFromISOString("2014-08-29T12:55Z")); std::pair<ReplSetHeartbeatArgsV1, Milliseconds> uppingRequest = getTopoCoord().prepareHeartbeatRequestV1(_upRequestDate, "rs0", _target); - HeartbeatResponseAction upAction = getTopoCoord().processHeartbeatResponse( - _upRequestDate, Milliseconds(0), _target, makeStatusWith<ReplSetHeartbeatResponse>()); + HeartbeatResponseAction upAction = + getTopoCoord().processHeartbeatResponse(_upRequestDate, + Milliseconds(0), + _target, + makeStatusWith<ReplSetHeartbeatResponse>(), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, upAction.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5646,8 +5768,8 @@ public: _firstRequestDate + Seconds(4), // 4 seconds elapsed, retry allowed. Milliseconds(3990), // Spent 3.99 of the 4 seconds in the network. _target, - StatusWith<ReplSetHeartbeatResponse>( - ErrorCodes::ExceededTimeLimit, "Took too long")); // We've never applied anything. + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); // We've never applied anything. ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5704,8 +5826,8 @@ TEST_F(HeartbeatResponseTestOneRetryV1, // no retry allowed. Milliseconds(1000), // Spent 1 of the 1.01 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, - "Took too long")); // We've never applied anything. + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::ExceededTimeLimit, "Took too long"), + OpTime()); // We've never applied anything. ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5725,7 +5847,8 @@ public: // could retry. Milliseconds(400), // Spent 0.4 of the 0.5 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); // Because the first retry failed without timing out, we expect to retry immediately. @@ -5772,7 +5895,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, NodeDoesNotRetryHeartbeatsAfterFailing // could still retry. Milliseconds(100), // Spent 0.1 of the 0.3 seconds in the network. target(), - StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?")); + StatusWith<ReplSetHeartbeatResponse>(ErrorCodes::NodeNotFound, "Bad DNS?"), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); // Because this is the second retry, rather than retry again, we expect to wait for a quarter @@ -5811,7 +5935,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures) getTopoCoord().processHeartbeatResponse(firstRequestDate() + Milliseconds(4500), Milliseconds(400), target(), - StatusWith<ReplSetHeartbeatResponse>(response)); + StatusWith<ReplSetHeartbeatResponse>(response), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5828,7 +5953,8 @@ TEST_F(HeartbeatResponseTestTwoRetriesV1, HeartbeatThreeNonconsecutiveFailures) firstRequestDate() + Milliseconds(7100), Milliseconds(400), target(), - StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""})); + StatusWith<ReplSetHeartbeatResponse>(Status{ErrorCodes::HostUnreachable, ""}), + OpTime()); ASSERT_EQUALS(HeartbeatResponseAction::NoAction, action.getAction()); ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); @@ -5904,7 +6030,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, UpdateHeartbeatDataSameConfig) { now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host2"), - StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse)); + StatusWith<ReplSetHeartbeatResponse>(sameConfigResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, @@ -5929,7 +6056,8 @@ TEST_F(HeartbeatResponseHighVerbosityTestV1, now()++, // Time is left. Milliseconds(400), // Spent 0.4 of the 0.5 second in the network. HostAndPort("host5"), - StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse)); + StatusWith<ReplSetHeartbeatResponse>(memberMissingResponse), + OpTime()); stopCapturingLogMessages(); ASSERT_NO_ACTION(action.getAction()); ASSERT_EQUALS(1, countLogLinesContaining("Could not find host5:27017 in current config")); |