diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-11-21 18:05:53 -0500 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-11-22 12:58:39 -0500 |
commit | 3c4e621c328975316ff0c60857203036b8e15b8c (patch) | |
tree | 350e399390965f14a7751a55520515bd801203ad | |
parent | 18ff331c446023d25cb349e757f063bdd83c9cf9 (diff) | |
download | mongo-3c4e621c328975316ff0c60857203036b8e15b8c.tar.gz |
SERVER-27149 Don't sync from nodes in an older term.
(cherry picked from commit 138402742f13b1cf85b021966eb27f2e33667cca)
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_selector.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp | 146 |
15 files changed, 170 insertions, 122 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 7eb266c2b9c..7761e56b076 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1207,8 +1207,7 @@ void DataReplicator::_setState_inlock(const DataReplicatorState& newState) { } StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() { - auto syncSource = - _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp()); + auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime); if (syncSource.empty()) { return Status{ErrorCodes::InvalidSyncSource, str::stream() << "No valid sync source available. Our last fetched optime: " diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 2aaeb5a6074..3a451783597 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -85,7 +85,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector { public: SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {} void clearSyncSourceBlacklist() override {} - HostAndPort chooseNewSyncSource(const Timestamp& ts) override { + HostAndPort chooseNewSyncSource(const OpTime& ot) override { HostAndPort result = _syncSource; return result; } @@ -120,8 +120,8 @@ public: void clearSyncSourceBlacklist() override { _syncSourceSelector->clearSyncSourceBlacklist(); } - HostAndPort chooseNewSyncSource(const Timestamp& ts) override { - return _syncSourceSelector->chooseNewSyncSource(ts); + HostAndPort chooseNewSyncSource(const OpTime& ot) override { + return _syncSourceSelector->chooseNewSyncSource(ot); } void blacklistSyncSource(const HostAndPort& host, Date_t until) override { _syncSourceSelector->blacklistSyncSource(host, until); diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index b7e6a06c7d2..92bc6648b8b 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -164,7 +164,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn, invariant(conn() == NULL); while (true) { - HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched.getTimestamp()); + HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched); if (candidate.empty()) { if (oldestOpTimeSeen == sentinel) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 851c91dac40..699bb2636ac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3037,15 +3037,15 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { return getReplicationMode() != modeNone; } -HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { +HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { LockGuard topoLock(_topoMutex); HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); auto chainingPreference = isCatchingUp() ? TopologyCoordinator::ChainingPreference::kAllowChaining : TopologyCoordinator::ChainingPreference::kUseConfiguration; - HostAndPort newSyncSource = _topCoord->chooseNewSyncSource( - _replExecutor.now(), lastTimestampFetched, chainingPreference); + HostAndPort newSyncSource = + _topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched, chainingPreference); stdx::lock_guard<stdx::mutex> lock(_mutex); // If we lost our sync source, schedule new heartbeats immediately to update our knowledge diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 14b9571b7f4..bdbec4bac08 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -261,7 +261,7 @@ public: virtual bool isReplEnabled() const override; - virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override; + virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override; virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 22d2eed1f0d..d6e4679e78c 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -370,7 +370,7 @@ Status ReplicationCoordinatorMock::checkReplEnabledForCommand(BSONObjBuilder* re return Status::OK(); } -HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const Timestamp& lastTimestampFetched) { +HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { return HostAndPort(); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 0a61d010a1a..d21c3c8e40d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -205,7 +205,7 @@ public: virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); - virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched); + virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index dced97cef3a..39ec824ebac 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -144,7 +144,7 @@ bool SyncSourceResolver::_isShuttingDown() const { StatusWith<HostAndPort> SyncSourceResolver::_chooseNewSyncSource() { HostAndPort candidate; try { - candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched.getTimestamp()); + candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched); } catch (...) { return exceptionToStatus(); } diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 4e94057031d..1022ef54e26 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -70,9 +70,9 @@ private: class SyncSourceSelectorMock : public SyncSourceSelector { public: void clearSyncSourceBlacklist() override {} - HostAndPort chooseNewSyncSource(const Timestamp& ts) override { + HostAndPort chooseNewSyncSource(const OpTime& ot) override { chooseNewSyncSourceHook(); - lastTimestampFetched = ts; + lastOpTimeFetched = ot; return syncSource; } void blacklistSyncSource(const HostAndPort& host, Date_t until) override { @@ -84,7 +84,7 @@ public: } HostAndPort syncSource = HostAndPort("host1", 1234); - Timestamp lastTimestampFetched; + OpTime lastOpTimeFetched; stdx::function<void()> chooseNewSyncSourceHook = []() {}; HostAndPort blacklistHost; @@ -264,7 +264,7 @@ TEST_F(SyncSourceResolverTest, // Resolver invokes callback with empty host and becomes inactive immediately. ASSERT_FALSE(_resolver->isActive()); ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); - ASSERT_EQUALS(lastOpTimeFetched.getTimestamp(), _selector->lastTimestampFetched); + ASSERT_EQUALS(lastOpTimeFetched, _selector->lastOpTimeFetched); // Cannot restart a completed resolver. ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _resolver->startup()); diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 131c3846e24..36cd7df3bf4 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -64,7 +64,7 @@ public: /** * Chooses a viable sync source, or, if none available, returns empty HostAndPort. */ - virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) = 0; + virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) = 0; /** * Blacklists choosing 'host' as a sync source until time 'until'. diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 3dbd739f9cd..8fe8fe13c34 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -138,7 +138,7 @@ public: * Chooses and sets a new sync source, based on our current knowledge of the world. */ virtual HostAndPort chooseNewSyncSource(Date_t now, - const Timestamp& lastTimestampFetched, + const OpTime& lastOpTimeFetched, ChainingPreference chainingPreference) = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 8062a122e5c..bc7029a7a4c 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -154,7 +154,7 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { } HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, - const Timestamp& lastTimestampFetched, + const OpTime& lastOpTimeFetched, ChainingPreference chainingPreference) { // If we are not a member of the current replica set configuration, no sync source is valid. if (_selfIndex == -1) { @@ -305,12 +305,12 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, } } // only consider candidates that are ahead of where we are - if (it->getAppliedOpTime().getTimestamp() <= lastTimestampFetched) { + if (it->getAppliedOpTime() <= lastOpTimeFetched) { LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. " - << "My last fetched oplog timestamp: " << lastTimestampFetched.toBSON() - << ", latest oplog timestamp of sync candidate " + << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON() + << ", latest oplog optime of sync candidate " << itMemberConfig.getHostAndPort() << ": " - << it->getAppliedOpTime().getTimestamp().toBSON(); + << it->getAppliedOpTime().toBSON(); continue; } // Candidate cannot be more latent than anything we've already considered. diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 9d57ffe0812..aac4c201e5b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -151,8 +151,8 @@ public: virtual UpdateTermResult updateTerm(long long term, Date_t now); virtual void setForceSyncSourceIndex(int index); virtual HostAndPort chooseNewSyncSource(Date_t now, - const Timestamp& lastTimestampFetched, - ChainingPreference chainingPreference); + const OpTime& lastOpTimeFetched, + ChainingPreference chainingPreference) override; virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); virtual void clearSyncSourceBlacklist(); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 52515127657..d0281db8217 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -240,7 +240,7 @@ private: TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // if we do not have an index in the config, we should get an empty syncsource HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_TRUE(newSyncSource.empty()); updateConfig(BSON("_id" @@ -267,7 +267,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Fail due to insufficient number of pings newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); @@ -278,7 +278,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Should choose h2, since it is furthest ahead newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -286,34 +286,34 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 becomes an invalid candidate for sync source; should choose h2 again heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back in SECONDARY and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 goes down receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime()); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back up and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -358,7 +358,7 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) { 0); setSelfMemberState(MemberState::RS_SECONDARY); - Timestamp lastOpTimeWeApplied = Timestamp(100, 0); + OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0); heartbeatFromMember(HostAndPort("h1"), "rs0", @@ -533,7 +533,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { ASSERT_EQUALS( HostAndPort(), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Add primary @@ -548,18 +548,20 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { // h3 is primary and should be chosen as the sync source when we are not in catch-up mode, // despite being further away than h2 and the primary (h3) being behind our most recently // applied optime. - ASSERT_EQUALS( - HostAndPort("h3"), - getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as // the sync source. - ASSERT_EQUALS( - HostAndPort("h2"), - getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kAllowChaining)); + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kAllowChaining)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary: should not choose self as sync source. @@ -573,7 +575,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { ASSERT_EQUALS( HostAndPort(), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } @@ -589,7 +591,7 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { HostAndPort h2("h2"), h3("h3"); Timestamp t1(1, 0), t5(5, 0), t10(10, 0); - OpTime ot1(t1, 0), ot5(t5, 0); + OpTime ot1(t1, 0), ot5(t5, 0), ot10(t10, 0); Milliseconds hbRTT100(100), hbRTT300(300); // Two rounds of heartbeat pings from each member. @@ -600,18 +602,18 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { // Should choose h3 as it is a voter auto newSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(h3, newSource); // Can't choose h2 as it is not a voter newSource = getTopoCoord().chooseNewSyncSource( - now()++, t10, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort(), newSource); // Should choose h3 as it is a voter, and ahead heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300); newSource = getTopoCoord().chooseNewSyncSource( - now()++, t1, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(h3, newSource); } @@ -657,7 +659,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { ASSERT_EQUALS( HostAndPort("h2"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary @@ -668,7 +670,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { ASSERT_EQUALS( HostAndPort("h2"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -712,7 +714,7 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should overrule other defaults getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true @@ -722,12 +724,12 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc ASSERT_TRUE( getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now())); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // force should only work for one call to chooseNewSyncSource getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -770,19 +772,19 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp Milliseconds(100)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // Should choose second best choice now that h3 is blacklisted. ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // After time has passed, should go back to original sync source getTopoCoord().chooseNewSyncSource( - expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -828,19 +830,19 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis Milliseconds(100)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // Can't choose any sync source now. ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // After time has passed, should go back to the primary getTopoCoord().chooseNewSyncSource( - expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -885,7 +887,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { ASSERT_EQUALS( HostAndPort("h3"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); // Good state setup done @@ -894,7 +896,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout); ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, - Timestamp(), + OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration) .empty()); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); @@ -904,7 +906,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized); ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, - Timestamp(), + OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration) .empty()); ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s); @@ -1280,9 +1282,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) { ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response.obj()["warning"].String()); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); } @@ -1329,9 +1330,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) { ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress()); } @@ -1380,7 +1380,7 @@ TEST_F(TopoCoordTest, ASSERT_FALSE(responseObj.hasField("warning")); receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime()); HostAndPort syncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h6"), syncSource); } @@ -1494,9 +1494,8 @@ TEST_F(TopoCoordTest, BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); ASSERT_FALSE(responseObj.hasField("prevSyncTarget")); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); heartbeatFromMember( @@ -4893,7 +4892,7 @@ TEST_F(PrepareHeartbeatResponseTest, heartbeatFromMember( HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // set up args ReplSetHeartbeatArgs args; diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 851de44c9f8..3cb7f7ec038 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -251,7 +251,7 @@ private: TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // if we do not have an index in the config, we should get an empty syncsource HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_TRUE(newSyncSource.empty()); updateConfig(BSON("_id" @@ -278,7 +278,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Fail due to insufficient number of pings newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); @@ -289,7 +289,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { // Should choose h2, since it is furthest ahead newSyncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -297,34 +297,34 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 becomes an invalid candidate for sync source; should choose h2 again heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back in SECONDARY and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // h3 goes down receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime()); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 back up and ahead heartbeatFromMember( HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -369,7 +369,7 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) { 0); setSelfMemberState(MemberState::RS_SECONDARY); - Timestamp lastOpTimeWeApplied = Timestamp(100, 0); + OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0); heartbeatFromMember(HostAndPort("h1"), "rs0", @@ -500,6 +500,57 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) { ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } +TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) { + updateConfig(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "hself") + << BSON("_id" << 10 << "host" + << "h1") + << BSON("_id" << 20 << "host" + << "h2"))), + 0); + + setSelfMemberState(MemberState::RS_SECONDARY); + OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 3); + + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(200, 0), 3), + Milliseconds(200)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(300, 0), 2), // old term + Milliseconds(100)); + + // Record 2nd round of pings to allow choosing a new sync source + heartbeatFromMember(HostAndPort("h1"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(200, 0), 3), + Milliseconds(200)); + heartbeatFromMember(HostAndPort("h2"), + "rs0", + MemberState::RS_SECONDARY, + OpTime(Timestamp(300, 0), 2), // old term + Milliseconds(100)); + + getTopoCoord().chooseNewSyncSource( + now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress()); + + // h1 goes down; no sync source candidates remain + receiveDownHeartbeat(HostAndPort("h1"), "rs0", OpTime()); + getTopoCoord().chooseNewSyncSource( + now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration); + ASSERT(getTopoCoord().getSyncSourceAddress().empty()); +} + TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { updateConfig(BSON("_id" @@ -544,7 +595,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { ASSERT_EQUALS( HostAndPort(), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Add primary @@ -559,18 +610,20 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { // h3 is primary and should be chosen as the sync source when we are not in catch-up mode, // despite being further away than h2 and the primary (h3) being behind our most recently // applied optime. - ASSERT_EQUALS( - HostAndPort("h3"), - getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + ASSERT_EQUALS(HostAndPort("h3"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); // When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as // the sync source. - ASSERT_EQUALS( - HostAndPort("h2"), - getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kAllowChaining)); + ASSERT_EQUALS(HostAndPort("h2"), + getTopoCoord().chooseNewSyncSource( + now()++, + OpTime(Timestamp(10, 0), 0), + TopologyCoordinator::ChainingPreference::kAllowChaining)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary: should not choose self as sync source. @@ -584,7 +637,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) { ASSERT_EQUALS( HostAndPort(), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); } @@ -600,7 +653,7 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { HostAndPort h2("h2"), h3("h3"); Timestamp t1(1, 0), t5(5, 0), t10(10, 0); - OpTime ot1(t1, 0), ot5(t5, 0); + OpTime ot1(t1, 0), ot5(t5, 0), ot10(t10, 0); Milliseconds hbRTT100(100), hbRTT300(300); // Two rounds of heartbeat pings from each member. @@ -611,18 +664,18 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { // Should choose h3 as it is a voter auto newSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(h3, newSource); // Can't choose h2 as it is not a voter newSource = getTopoCoord().chooseNewSyncSource( - now()++, t10, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort(), newSource); // Should choose h3 as it is a voter, and ahead heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300); newSource = getTopoCoord().chooseNewSyncSource( - now()++, t1, TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(h3, newSource); } @@ -668,7 +721,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { ASSERT_EQUALS( HostAndPort("h2"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // Become primary @@ -679,7 +732,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) { ASSERT_EQUALS( HostAndPort("h2"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -713,7 +766,7 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc // force should overrule other defaults getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true @@ -723,12 +776,12 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now())); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // force should only work for one call to chooseNewSyncSource getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -771,19 +824,19 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp Milliseconds(100)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // Should choose second best choice now that h3 is blacklisted. ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // After time has passed, should go back to original sync source getTopoCoord().chooseNewSyncSource( - expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); } @@ -829,19 +882,19 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis Milliseconds(100)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); Date_t expireTime = Date_t::fromMillisSinceEpoch(1000); getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // Can't choose any sync source now. ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // After time has passed, should go back to the primary getTopoCoord().chooseNewSyncSource( - expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); } @@ -886,7 +939,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { ASSERT_EQUALS( HostAndPort("h3"), getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration)); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); // Good state setup done @@ -895,7 +948,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout); ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, - Timestamp(), + OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration) .empty()); ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s); @@ -905,7 +958,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) { receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized); ASSERT_TRUE(getTopoCoord() .chooseNewSyncSource(now()++, - Timestamp(), + OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration) .empty()); ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s); @@ -1281,9 +1334,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) { ASSERT_OK(result); ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us", response.obj()["warning"].String()); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); } @@ -1330,9 +1382,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) { ASSERT_OK(result); BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress()); } @@ -1381,7 +1432,7 @@ TEST_F(TopoCoordTest, ASSERT_FALSE(responseObj.hasField("warning")); receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime()); HostAndPort syncSource = getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h6"), syncSource); } @@ -1495,9 +1546,8 @@ TEST_F(TopoCoordTest, BSONObj responseObj = response.obj(); ASSERT_FALSE(responseObj.hasField("warning")); ASSERT_FALSE(responseObj.hasField("prevSyncTarget")); - getTopoCoord().chooseNewSyncSource(now()++, - ourOpTime.getTimestamp(), - TopologyCoordinator::ChainingPreference::kUseConfiguration); + getTopoCoord().chooseNewSyncSource( + now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress()); heartbeatFromMember( @@ -1960,7 +2010,7 @@ TEST_F(PrepareHeartbeatResponseV1Test, heartbeatFromMember( HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0)); getTopoCoord().chooseNewSyncSource( - now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration); + now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); // set up args ReplSetHeartbeatArgsV1 args; |