diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-10-12 08:07:25 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-10-14 05:07:49 -0400 |
commit | c860db7d39f8559054d1cebb83a6838accef8d94 (patch) | |
tree | b3110c22e4afcdf6313144797957c8f6c4efe929 | |
parent | 7d43b0dba28b4b8e0184579a75c3dddab9d86e1f (diff) | |
download | mongo-c860db7d39f8559054d1cebb83a6838accef8d94.tar.gz |
SERVER-20822 make sync source decisions based on ReplSetMetadata
19 files changed, 382 insertions, 221 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 96deab1683a..118a9e02046 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -426,6 +426,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } const auto& queryResponse = result.getValue(); + bool syncSourceHasSyncSource = false; + OpTime sourcesLastOp; // Forward metadata (containing liveness information) to replication coordinator. bool receivedMetadata = @@ -443,6 +445,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { _replCoord->cancelAndRescheduleElectionTimeout(); } + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + sourcesLastOp = metadata.getLastOpVisible(); } const auto& documents = queryResponse.documents; @@ -542,7 +546,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } // re-evaluate quality of sync target - if (_shouldChangeSyncSource(source)) { + if (_shouldChangeSyncSource(source, sourcesLastOp, syncSourceHasSyncSource)) { return; } @@ -562,7 +566,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } } -bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) { +bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource) { // is it even still around? if (getSyncTarget().empty() || syncSource.empty()) { return true; @@ -570,7 +576,8 @@ bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) { // check other members: is any member's optime more than MaxSyncSourceLag seconds // ahead of the current sync source? - return _replCoord->shouldChangeSyncSource(syncSource); + return _replCoord->shouldChangeSyncSource( + syncSource, syncSourceLastOpTime, syncSourceHasSyncSource); } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 936a18c1209..56bedc5d8cf 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -191,8 +191,19 @@ private: const HostAndPort& source, stdx::function<DBClientBase*()> getConnection); - // Evaluate if the current sync target is still good - bool _shouldChangeSyncSource(const HostAndPort& syncSource); + /** + * Evaluate if the current sync source is still good. + * "syncSource" is the name of the current sync source, which will be used to look up the + * member's heartbeat data. + * "syncSourceLastOpTime" is the last OpTime the sync source has. This is passed in because the + * data stored from heartbeats could be too stale and would cause unnecessary sync source + * changes. + * "syncSourceHasSyncSource" indicates whether our sync source is currently syncing from another + * member. + */ + bool _shouldChangeSyncSource(const HostAndPort& syncSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource); // restart syncing void start(OperationContext* txn); diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 887a7a924bf..adff8d96782 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -76,7 +76,9 @@ public: void blacklistSyncSource(const HostAndPort& host, Date_t until) override { _blacklistedSource = host; } - bool shouldChangeSyncSource(const HostAndPort& currentSource) override { + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& sourcesOpTime, + bool syncSourceHasSyncSource) override { return false; } HostAndPort _syncSource; @@ -112,8 +114,11 @@ public: void blacklistSyncSource(const HostAndPort& host, Date_t until) override { _syncSourceSelector->blacklistSyncSource(host, until); } - bool shouldChangeSyncSource(const HostAndPort& currentSource) override { - return _syncSourceSelector->shouldChangeSyncSource(currentSource); + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& sourcesOpTime, + bool syncSourceHasSyncSource) override { + return _syncSourceSelector->shouldChangeSyncSource( + currentSource, sourcesOpTime, syncSourceHasSyncSource); } void scheduleNetworkResponse(const BSONObj& obj) { @@ -554,7 +559,9 @@ public: LockGuard lk(_mutex); _blacklistedSource = host; } - bool shouldChangeSyncSource(const HostAndPort& currentSource) override { + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& sourcesOpTime, + bool syncSourceHasSyncSource) override { return false; } mutable stdx::mutex _mutex; @@ -680,7 +687,9 @@ public: return HostAndPort(); } void blacklistSyncSource(const HostAndPort& host, Date_t until) override {} - bool shouldChangeSyncSource(const HostAndPort& currentSource) override { + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& sourcesOpTime, + bool syncSourceHasSyncSource) override { return false; } ReplicationExecutor* _exec; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 087a74ec523..20394ff090b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2679,22 +2679,31 @@ void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) void ReplicationCoordinatorImpl::_shouldChangeSyncSource( const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource, bool* shouldChange) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } - *shouldChange = - _topCoord->shouldChangeSyncSource(currentSource, getMyLastOptime(), _replExecutor.now()); + *shouldChange = _topCoord->shouldChangeSyncSource(currentSource, + getMyLastOptime(), + syncSourceLastOpTime, + syncSourceHasSyncSource, + _replExecutor.now()); } -bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) { +bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource) { bool shouldChange(false); CBHStatus cbh = _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource, this, stdx::placeholders::_1, currentSource, + syncSourceLastOpTime, + syncSourceHasSyncSource, &shouldChange)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return false; diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 018cc7a8539..eb107186bdf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -255,7 +255,9 @@ public: virtual void resetLastOpTimeFromOplog(OperationContext* txn) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override; + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource) override; virtual OpTime getLastCommittedOpTime() const override; @@ -946,6 +948,8 @@ private: */ void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource, bool* shouldChange); /** diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 32f85354f85..d0d57a626bd 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -338,12 +338,13 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeat) { auto test = [this](OpTime committedOpTime, OpTime expected) { // process heartbeat metadata directly StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName << BSON( - "lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t" - << committedOpTime.getTerm()) << "lastOpVisible" - << BSON("ts" << committedOpTime.getTimestamp() << "t" - << committedOpTime.getTerm()) << "configVersion" << 1 - << "primaryIndex" << 1 << "term" << committedOpTime.getTerm()))); + rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t" + << committedOpTime.getTerm()) << "lastOpVisible" + << BSON("ts" << committedOpTime.getTimestamp() << "t" + << committedOpTime.getTerm()) << "configVersion" + << 1 << "primaryIndex" << 1 << "term" + << committedOpTime.getTerm() << "syncSourceIndex" << 1))); ASSERT_OK(metadata.getStatus()); getReplCoord()->processReplSetMetadata(metadata.getValue()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 674f50bfc9d..bf4c680f29a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2617,19 +2617,20 @@ TEST_F(ReplCoordTest, MetadataWrongConfigVersion) { // lower configVersion StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" - << 1 << "primaryIndex" << 2 << "term" << 2))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1 + << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // higher configVersion - StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" - << 100 << "primaryIndex" << 2 << "term" << 2))); + StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata( + BSON(rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" + << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100 + << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); } @@ -2660,20 +2661,20 @@ TEST_F(ReplCoordTest, MetadataUpdatesLastCommittedOpTime) { // higher OpTime, should change StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" - << 2 << "primaryIndex" << 2 << "term" << 1))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2 + << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); // lower OpTime, should not change StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible" - << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" - << 2 << "primaryIndex" << 2 << "term" << 1))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible" + << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2 + << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); } @@ -2700,10 +2701,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) { // higher term, should change StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" - << 2 << "primaryIndex" << 2 << "term" << 3))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2 + << "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); @@ -2711,10 +2712,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) { // lower term, should not change StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" - << 2 << "primaryIndex" << 1 << "term" << 2))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 + << "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); @@ -2722,10 +2723,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) { // same term, should not change StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" - << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" - << 2 << "primaryIndex" << 1 << "term" << 3))); + rpc::kReplSetMetadataFieldName << BSON( + "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 + << "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1))); getReplCoord()->processReplSetMetadata(metadata3.getValue()); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 4750fa28602..b9e9bdbd26c 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -320,7 +320,9 @@ void ReplicationCoordinatorMock::resetLastOpTimeFromOplog(OperationContext* txn) invariant(false); } -bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource) { +bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource) { invariant(false); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 5fbbef2fc51..362ef8cba73 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -185,7 +185,9 @@ public: virtual void resetLastOpTimeFromOplog(OperationContext* txn); - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 71eccdb8a30..1155ef0be20 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -38,6 +38,8 @@ class Timestamp; namespace repl { +class OpTime; + /** * Manage list of viable and blocked sync sources that we can replicate from. */ @@ -64,10 +66,18 @@ public: virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0; /** - * Determines if a new sync source should be considered. - * currentSource: the current sync source + * Determines if a new sync source should be chosen, if a better candidate sync source is + * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than + * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are + * running in ProtocolVersion 1, our current sync source is not primary, has no sync source + * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. + * + * "now" is used to skip over currently blacklisted sync sources. */ - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0; + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index b0ff312c2c9..89f8aca3c5f 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -153,14 +153,18 @@ public: /** * Determines if a new sync source should be chosen, if a better candidate sync source is - * available. If the current sync source's last optime is more than _maxSyncSourceLagSecs - * behind any syncable source, this function returns true. If our current sync source is not - * primary, has no sync source, and only has data up to "myLastOpTime", returns true. + * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than + * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are + * running in ProtocolVersion 1, our current sync source is not primary, has no sync source + * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource, Date_t now) const = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 739c523b853..d7220c4fca4 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2300,6 +2300,8 @@ long long TopologyCoordinatorImpl::getTerm() { bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource, Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has @@ -2319,8 +2321,8 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS } invariant(currentSourceIndex != _selfIndex); - const auto& currentSourceHBData = _hbdata[currentSourceIndex]; - OpTime currentSourceOpTime = currentSourceHBData.getOpTime(); + OpTime currentSourceOpTime = + std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getOpTime()); if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should @@ -2328,8 +2330,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } - if (currentSourceHBData.getSyncSource().empty() && currentSourceOpTime <= myLastOpTime && - currentSourceHBData.getState() != MemberState::RS_PRIMARY) { + if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource && + currentSourceOpTime <= myLastOpTime && + _hbdata[currentSourceIndex].getState() != MemberState::RS_PRIMARY) { return true; } @@ -2359,11 +2362,13 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const { - *metadata = rpc::ReplSetMetadata(_term, - lastCommittedOpTime, - lastVisibleOpTime, - _rsConfig.getConfigVersion(), - _currentPrimaryIndex); + *metadata = + rpc::ReplSetMetadata(_term, + lastCommittedOpTime, + lastVisibleOpTime, + _rsConfig.getConfigVersion(), + _currentPrimaryIndex, + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 49263136f49..9509285080c 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -160,6 +160,8 @@ public: virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, + const OpTime& syncSourceLastOpTime, + bool syncSourceHasSyncSource, Date_t now) const; virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); virtual void setElectionSleepUntil(Date_t newTime); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 0301abe215d..73e3e794c4e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -620,8 +620,10 @@ TEST_F(TopoCoordTest, ForceSyncSource) { getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), now())); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), now())); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now())); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -4189,19 +4191,22 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host4"), OpTime(), OpTime(), false, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceMemberHasYetToHeartbeat) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime) // for "host2" - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); } -TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) { - // In this test, the TopologyCoordinator should tell us to change sync sources away from - // "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3" +TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) { + // In this test, the TopologyCoordinator should not tell us to change sync sources away from + // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind + // "host3" according to metadata, not heartbeat data. OpTime election = OpTime(); OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); // ahead by more than maxSyncSourceLagSecs (30) @@ -4211,7 +4216,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) "rs0", MemberState::RS_SECONDARY, election, - lastOpTimeApplied, + fresherLastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); @@ -4225,18 +4230,18 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); + ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } -TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed) { +TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbeatButNotMetadata) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from - // "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind - // "host3", since "host3" is blacklisted - // Then, confirm that unblacklisting only works if time has passed the blacklist time. + // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind + // "host3" according to heartbeat data, not metadata. OpTime election = OpTime(); - OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0); + OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); // ahead by more than maxSyncSourceLagSecs (30) OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0); @@ -4255,83 +4260,90 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed) fresherLastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); - - // unblacklist with too early a time (node should remained blacklisted) - getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); - - // unblacklist and it should succeed - getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); + ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); } -TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceCurrentTargetNoLongerPrimary) { - // In this test, the TopologyCoordinator will tell us change our sync source away from "host2" - // when it is not ahead of us, unless it is PRIMARY or has a sync source of its own. +TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) { + // In this test, the TopologyCoordinator should tell us to change sync sources away from + // "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3" OpTime election = OpTime(); - OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0); + OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); + // ahead by more than maxSyncSourceLagSecs (30) + OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0); HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"), "rs0", - MemberState::RS_PRIMARY, + MemberState::RS_SECONDARY, election, lastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - // Show we like host2 while it is primary. - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); - - // Show that we also like host2 while it has a sync source. - // Cannot use receiveUpHeartbeat helper because syncingTo must be set. - ReplSetHeartbeatResponse hb; - hb.setConfigVersion(1); - hb.setState(MemberState::RS_SECONDARY); - hb.setOpTime(lastOpTimeApplied); - hb.setElectionTime(election.getTimestamp()); - hb.setSyncingTo(HostAndPort("host2")); - - StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); - getTopoCoord().prepareHeartbeatRequest(now(), "rs0", HostAndPort("host2")); - now() += Milliseconds(1); - ASSERT_NO_ACTION( - getTopoCoord() - .processHeartbeatResponse( - now(), Milliseconds(1), HostAndPort("host2"), hbResponse, lastOpTimeApplied) - .getAction()); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); - - // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress - // beyond our own. - nextAction = receiveUpHeartbeat(HostAndPort("host2"), + nextAction = receiveUpHeartbeat(HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, - lastOpTimeApplied, + fresherLastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); - // But if it has some progress beyond our own, we still like it. - OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0); - nextAction = receiveUpHeartbeat(HostAndPort("host2"), + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); +} + +TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed) { + // In this test, the TopologyCoordinator should not tell us to change sync sources away from + // "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind + // "host3", since "host3" is blacklisted + // Then, confirm that unblacklisting only works if time has passed the blacklist time. + OpTime election = OpTime(); + OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0); + // ahead by more than maxSyncSourceLagSecs (30) + OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0); + + HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + lastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + + nextAction = receiveUpHeartbeat(HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, - newerThanLastOpTimeApplied, - newerThanLastOpTimeApplied); + fresherLastOpTimeApplied, + lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); + getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); + + // set up complete, time for actual check + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); + + // unblacklist with too early a time (node should remained blacklisted) + getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); + + // unblacklist and it should succeed + getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsDown) { @@ -4362,7 +4374,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsDown) { // set up complete, time for actual check nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsNotReadable) { @@ -4391,7 +4404,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsNotReadable) ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildIndexes) { @@ -4430,7 +4444,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildInd ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildIndexesNorDoWe) { @@ -4471,7 +4486,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildInd // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), false, now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 0e793a28bf9..24e904e265b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -593,28 +593,18 @@ TEST_F(TopoCoordTest, ForceSyncSource) { 0); setSelfMemberState(MemberState::RS_SECONDARY); + OpTime oldOpTime = OpTime(Timestamp(1, 0), 0); + OpTime newOpTime = OpTime(Timestamp(2, 0), 0); // two rounds of heartbeat pings from each member - heartbeatFromMember(HostAndPort("h2"), - "rs0", - MemberState::RS_SECONDARY, - OpTime(Timestamp(1, 0), 0), - Milliseconds(300)); - heartbeatFromMember(HostAndPort("h2"), - "rs0", - MemberState::RS_SECONDARY, - OpTime(Timestamp(1, 0), 0), - Milliseconds(300)); - heartbeatFromMember(HostAndPort("h3"), - "rs0", - MemberState::RS_SECONDARY, - OpTime(Timestamp(2, 0), 0), - Milliseconds(100)); - heartbeatFromMember(HostAndPort("h3"), - "rs0", - MemberState::RS_SECONDARY, - OpTime(Timestamp(2, 0), 0), - Milliseconds(100)); + heartbeatFromMember( + HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, oldOpTime, Milliseconds(300)); + heartbeatFromMember( + HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, oldOpTime, Milliseconds(300)); + heartbeatFromMember( + HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100)); + heartbeatFromMember( + HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100)); // force should overrule other defaults getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); @@ -622,8 +612,10 @@ TEST_F(TopoCoordTest, ForceSyncSource) { getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), now())); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("h2"), OpTime(), oldOpTime, false, now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("h3"), OpTime(), newOpTime, false, now())); getTopoCoord().chooseNewSyncSource(now()++, Timestamp()); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -2291,7 +2283,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberDoesNotBuildI // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } @@ -2331,29 +2324,19 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); // Show we like host2 while it is primary. - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); // Show that we also like host2 while it has a sync source. - // Cannot use receiveUpHeartbeat helper because syncingTo must be set. - ReplSetHeartbeatResponse hb; - hb.setConfigVersion(1); - hb.setState(MemberState::RS_SECONDARY); - hb.setOpTime(lastOpTimeApplied); - hb.setElectionTime(election.getTimestamp()); - hb.setSyncingTo(HostAndPort("host2")); - - StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb); - - getTopoCoord().prepareHeartbeatRequestV1(now(), "rs0", HostAndPort("host2")); - now() += Milliseconds(1); - ASSERT_NO_ACTION( - getTopoCoord() - .processHeartbeatResponse( - now(), Milliseconds(1), HostAndPort("host2"), hbResponse, lastOpTimeApplied) - .getAction()); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); + nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + lastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now())); // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress // beyond our own. @@ -2364,7 +2347,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima lastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); + ASSERT(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now())); // But if it has some progress beyond our own, we still like it. OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0); @@ -2375,8 +2359,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima newerThanLastOpTimeApplied, newerThanLastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now())); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsDown) { @@ -2407,7 +2391,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsDown) { // set up complete, time for actual check nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsBlackListed) { @@ -2438,20 +2423,89 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsBlackListed getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } +TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) { + // In this test, the TopologyCoordinator should not tell us to change sync sources away from + // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind + // "host3" according to metadata, not heartbeat data. + OpTime election = OpTime(); + OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); + // ahead by more than maxSyncSourceLagSecs (30) + OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0); + + HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + fresherLastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + + nextAction = receiveUpHeartbeat(HostAndPort("host3"), + "rs0", + MemberState::RS_SECONDARY, + election, + fresherLastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); +} + +TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbeatButNotMetadata) { + // In this test, the TopologyCoordinator should not tell us to change sync sources away from + // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind + // "host3" according to heartbeat data, not metadata. + OpTime election = OpTime(); + OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); + // ahead by more than maxSyncSourceLagSecs (30) + OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0); + + HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"), + "rs0", + MemberState::RS_SECONDARY, + election, + lastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + + nextAction = receiveUpHeartbeat(HostAndPort("host3"), + "rs0", + MemberState::RS_SECONDARY, + election, + fresherLastOpTimeApplied, + lastOpTimeApplied); + ASSERT_NO_ACTION(nextAction.getAction()); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source")); +} + TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherHappierMemberExists) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3" @@ -2478,22 +2532,24 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherHappierMemberExists // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source")); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceMemberHasYetToHeartbeat) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from - // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime) - // for "host2" - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + // "host2" since we do not use the member's heartbeatdata in pv1. + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), OpTime(), true, now())); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host4" since "host4" is absent from the config - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host4"), OpTime(), OpTime(), true, now())); } TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepsonse) { @@ -3271,7 +3327,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberDoesNotBuildI ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsNotReadable) { @@ -3300,7 +3357,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsNotReadable ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now())); } class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 { diff --git a/src/mongo/rpc/metadata/repl_set_metadata.cpp b/src/mongo/rpc/metadata/repl_set_metadata.cpp index aadd520a683..5e9a1e89f77 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.cpp +++ b/src/mongo/rpc/metadata/repl_set_metadata.cpp @@ -46,6 +46,7 @@ const char kLastOpCommittedFieldName[] = "lastOpCommitted"; const char kLastOpVisibleFieldName[] = "lastOpVisible"; const char kConfigVersionFieldName[] = "configVersion"; const char kPrimaryIndexFieldName[] = "primaryIndex"; +const char kSyncSourceIndexFieldName[] = "syncSourceIndex"; const char kTermFieldName[] = "term"; } // unnamed namespace @@ -58,12 +59,14 @@ ReplSetMetadata::ReplSetMetadata(long long term, OpTime committedOpTime, OpTime visibleOpTime, long long configVersion, - int currentPrimaryIndex) + int currentPrimaryIndex, + int currentSyncSourceIndex) : _lastOpCommitted(std::move(committedOpTime)), _lastOpVisible(std::move(visibleOpTime)), _currentTerm(term), _configVersion(configVersion), - _currentPrimaryIndex(currentPrimaryIndex) {} + _currentPrimaryIndex(currentPrimaryIndex), + _currentSyncSourceIndex(currentSyncSourceIndex) {} StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& metadataObj) { BSONElement replMetadataElement; @@ -84,6 +87,11 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met if (!status.isOK()) return status; + long long syncSourceIndex; + status = bsonExtractIntegerField(replMetadataObj, kSyncSourceIndexFieldName, &syncSourceIndex); + if (!status.isOK()) + return status; + long long term; status = bsonExtractIntegerField(replMetadataObj, kTermFieldName, &term); if (!status.isOK()) @@ -99,7 +107,8 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met if (!status.isOK()) return status; - return ReplSetMetadata(term, lastOpCommitted, lastOpVisible, configVersion, primaryIndex); + return ReplSetMetadata( + term, lastOpCommitted, lastOpVisible, configVersion, primaryIndex, syncSourceIndex); } Status ReplSetMetadata::writeToMetadata(BSONObjBuilder* builder) const { @@ -109,6 +118,7 @@ Status ReplSetMetadata::writeToMetadata(BSONObjBuilder* builder) const { _lastOpVisible.append(&replMetadataBuilder, kLastOpVisibleFieldName); replMetadataBuilder.append(kConfigVersionFieldName, _configVersion); replMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex); + replMetadataBuilder.append(kSyncSourceIndexFieldName, _currentSyncSourceIndex); replMetadataBuilder.doneFast(); return Status::OK(); diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h index 722f3befe3e..3e80afc1f9b 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata.h +++ b/src/mongo/rpc/metadata/repl_set_metadata.h @@ -55,16 +55,18 @@ public: repl::OpTime committedOpTime, repl::OpTime visibleOpTime, long long configVersion, - int currentPrimaryIndex); + int currentPrimaryIndex, + int currentSyncSourceIndex); /** * format: * { * term: 0, - * lastOpCommitted: {ts: Timestamp(0, 0), term: 0} - * lastOpVisible: {ts: Timestamp(0, 0), term: 0} + * lastOpCommitted: {ts: Timestamp(0, 0), term: 0}, + * lastOpVisible: {ts: Timestamp(0, 0), term: 0}, * configVersion: 0, - * primaryIndex: 0 + * primaryIndex: 0, + * syncSourceIndex: 0 * } */ static StatusWith<ReplSetMetadata> readFromMetadata(const BSONObj& doc); @@ -95,11 +97,19 @@ public: * Returns the index of the current primary from the perspective of the sender. * Returns kNoPrimary if there is no primary. */ - long long getPrimaryIndex() const { + int getPrimaryIndex() const { return _currentPrimaryIndex; } /** + * Returns the index of the sync source of the sender. + * Returns -1 if it has no sync source. + */ + int getSyncSourceIndex() const { + return _currentSyncSourceIndex; + } + + /** * Returns the current term from the perspective of the sender. */ long long getTerm() const { @@ -112,6 +122,7 @@ private: long long _currentTerm = -1; long long _configVersion = -1; int _currentPrimaryIndex = kNoPrimary; + int _currentSyncSourceIndex = -1; }; } // namespace rpc diff --git a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp index 499bbeb0b23..68d073be8cb 100644 --- a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp +++ b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp @@ -39,7 +39,7 @@ using repl::OpTime; TEST(ReplResponseMetadataTest, Roundtrip) { OpTime opTime(Timestamp(1234, 100), 5); OpTime opTime2(Timestamp(7777, 100), 6); - ReplSetMetadata metadata(3, opTime, opTime2, 6, 12); + ReplSetMetadata metadata(3, opTime, opTime2, 6, 12, -1); ASSERT_EQ(opTime, metadata.getLastOpCommitted()); ASSERT_EQ(opTime2, metadata.getLastOpVisible()); @@ -47,13 +47,12 @@ TEST(ReplResponseMetadataTest, Roundtrip) { BSONObjBuilder builder; metadata.writeToMetadata(&builder); - BSONObj expectedObj( - BSON(kReplSetMetadataFieldName - << BSON("term" << 3 << "lastOpCommitted" - << BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm()) - << "lastOpVisible" - << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) - << "configVersion" << 6 << "primaryIndex" << 12))); + BSONObj expectedObj(BSON( + kReplSetMetadataFieldName << BSON( + "term" << 3 << "lastOpCommitted" << BSON("ts" << opTime.getTimestamp() << "t" + << opTime.getTerm()) << "lastOpVisible" + << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) + << "configVersion" << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1))); BSONObj serializedObj = builder.obj(); ASSERT_EQ(expectedObj, serializedObj); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index e46c2582ea3..caa2b4ccfb6 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -112,7 +112,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -175,7 +175,7 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -508,7 +508,7 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -1084,7 +1084,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) { checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); - ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2354,7 +2354,7 @@ TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2390,7 +2390,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2419,7 +2419,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2444,7 +2444,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2470,7 +2470,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) { request.metadata); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); @@ -2531,7 +2531,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) { ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); - ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30); + ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1); BSONObjBuilder builder; metadata.writeToMetadata(&builder); |