summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-11-21 18:05:53 -0500
committerSpencer T Brody <spencer@mongodb.com>2016-11-22 12:58:39 -0500
commit3c4e621c328975316ff0c60857203036b8e15b8c (patch)
tree350e399390965f14a7751a55520515bd801203ad
parent18ff331c446023d25cb349e757f063bdd83c9cf9 (diff)
downloadmongo-3c4e621c328975316ff0c60857203036b8e15b8c.tar.gz
SERVER-27149 Don't sync from nodes in an older term.
(cherry picked from commit 138402742f13b1cf85b021966eb27f2e33667cca)
-rw-r--r--src/mongo/db/repl/data_replicator.cpp3
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp6
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp8
-rw-r--r--src/mongo/db/repl/sync_source_selector.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp10
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp95
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp146
15 files changed, 170 insertions, 122 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 7eb266c2b9c..7761e56b076 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1207,8 +1207,7 @@ void DataReplicator::_setState_inlock(const DataReplicatorState& newState) {
}
StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
- auto syncSource =
- _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp());
+ auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime);
if (syncSource.empty()) {
return Status{ErrorCodes::InvalidSyncSource,
str::stream() << "No valid sync source available. Our last fetched optime: "
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 2aaeb5a6074..3a451783597 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -85,7 +85,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector {
public:
SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {}
void clearSyncSourceBlacklist() override {}
- HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
+ HostAndPort chooseNewSyncSource(const OpTime& ot) override {
HostAndPort result = _syncSource;
return result;
}
@@ -120,8 +120,8 @@ public:
void clearSyncSourceBlacklist() override {
_syncSourceSelector->clearSyncSourceBlacklist();
}
- HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
- return _syncSourceSelector->chooseNewSyncSource(ts);
+ HostAndPort chooseNewSyncSource(const OpTime& ot) override {
+ return _syncSourceSelector->chooseNewSyncSource(ot);
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index b7e6a06c7d2..92bc6648b8b 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -164,7 +164,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
invariant(conn() == NULL);
while (true) {
- HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched.getTimestamp());
+ HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched);
if (candidate.empty()) {
if (oldestOpTimeSeen == sentinel) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 851c91dac40..699bb2636ac 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -3037,15 +3037,15 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
-HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
+HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
LockGuard topoLock(_topoMutex);
HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
auto chainingPreference = isCatchingUp()
? TopologyCoordinator::ChainingPreference::kAllowChaining
: TopologyCoordinator::ChainingPreference::kUseConfiguration;
- HostAndPort newSyncSource = _topCoord->chooseNewSyncSource(
- _replExecutor.now(), lastTimestampFetched, chainingPreference);
+ HostAndPort newSyncSource =
+ _topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched, chainingPreference);
stdx::lock_guard<stdx::mutex> lock(_mutex);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 14b9571b7f4..bdbec4bac08 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -261,7 +261,7 @@ public:
virtual bool isReplEnabled() const override;
- virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override;
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 22d2eed1f0d..d6e4679e78c 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -370,7 +370,7 @@ Status ReplicationCoordinatorMock::checkReplEnabledForCommand(BSONObjBuilder* re
return Status::OK();
}
-HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
+HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
return HostAndPort();
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 0a61d010a1a..d21c3c8e40d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -205,7 +205,7 @@ public:
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
- virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched);
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index dced97cef3a..39ec824ebac 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -144,7 +144,7 @@ bool SyncSourceResolver::_isShuttingDown() const {
StatusWith<HostAndPort> SyncSourceResolver::_chooseNewSyncSource() {
HostAndPort candidate;
try {
- candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched.getTimestamp());
+ candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched);
} catch (...) {
return exceptionToStatus();
}
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 4e94057031d..1022ef54e26 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -70,9 +70,9 @@ private:
class SyncSourceSelectorMock : public SyncSourceSelector {
public:
void clearSyncSourceBlacklist() override {}
- HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
+ HostAndPort chooseNewSyncSource(const OpTime& ot) override {
chooseNewSyncSourceHook();
- lastTimestampFetched = ts;
+ lastOpTimeFetched = ot;
return syncSource;
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
@@ -84,7 +84,7 @@ public:
}
HostAndPort syncSource = HostAndPort("host1", 1234);
- Timestamp lastTimestampFetched;
+ OpTime lastOpTimeFetched;
stdx::function<void()> chooseNewSyncSourceHook = []() {};
HostAndPort blacklistHost;
@@ -264,7 +264,7 @@ TEST_F(SyncSourceResolverTest,
// Resolver invokes callback with empty host and becomes inactive immediately.
ASSERT_FALSE(_resolver->isActive());
ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
- ASSERT_EQUALS(lastOpTimeFetched.getTimestamp(), _selector->lastTimestampFetched);
+ ASSERT_EQUALS(lastOpTimeFetched, _selector->lastOpTimeFetched);
// Cannot restart a completed resolver.
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _resolver->startup());
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 131c3846e24..36cd7df3bf4 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -64,7 +64,7 @@ public:
/**
* Chooses a viable sync source, or, if none available, returns empty HostAndPort.
*/
- virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) = 0;
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) = 0;
/**
* Blacklists choosing 'host' as a sync source until time 'until'.
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 3dbd739f9cd..8fe8fe13c34 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -138,7 +138,7 @@ public:
* Chooses and sets a new sync source, based on our current knowledge of the world.
*/
virtual HostAndPort chooseNewSyncSource(Date_t now,
- const Timestamp& lastTimestampFetched,
+ const OpTime& lastOpTimeFetched,
ChainingPreference chainingPreference) = 0;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 8062a122e5c..bc7029a7a4c 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -154,7 +154,7 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
}
HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
- const Timestamp& lastTimestampFetched,
+ const OpTime& lastOpTimeFetched,
ChainingPreference chainingPreference) {
// If we are not a member of the current replica set configuration, no sync source is valid.
if (_selfIndex == -1) {
@@ -305,12 +305,12 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
}
}
// only consider candidates that are ahead of where we are
- if (it->getAppliedOpTime().getTimestamp() <= lastTimestampFetched) {
+ if (it->getAppliedOpTime() <= lastOpTimeFetched) {
LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
- << "My last fetched oplog timestamp: " << lastTimestampFetched.toBSON()
- << ", latest oplog timestamp of sync candidate "
+ << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
+ << ", latest oplog optime of sync candidate "
<< itMemberConfig.getHostAndPort() << ": "
- << it->getAppliedOpTime().getTimestamp().toBSON();
+ << it->getAppliedOpTime().toBSON();
continue;
}
// Candidate cannot be more latent than anything we've already considered.
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 9d57ffe0812..aac4c201e5b 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -151,8 +151,8 @@ public:
virtual UpdateTermResult updateTerm(long long term, Date_t now);
virtual void setForceSyncSourceIndex(int index);
virtual HostAndPort chooseNewSyncSource(Date_t now,
- const Timestamp& lastTimestampFetched,
- ChainingPreference chainingPreference);
+ const OpTime& lastOpTimeFetched,
+ ChainingPreference chainingPreference) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
virtual void clearSyncSourceBlacklist();
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 52515127657..d0281db8217 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -240,7 +240,7 @@ private:
TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// if we do not have an index in the config, we should get an empty syncsource
HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_TRUE(newSyncSource.empty());
updateConfig(BSON("_id"
@@ -267,7 +267,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Fail due to insufficient number of pings
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
@@ -278,7 +278,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Should choose h2, since it is furthest ahead
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -286,34 +286,34 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 becomes an invalid candidate for sync source; should choose h2 again
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back in SECONDARY and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 goes down
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime());
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back up and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -358,7 +358,7 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) {
0);
setSelfMemberState(MemberState::RS_SECONDARY);
- Timestamp lastOpTimeWeApplied = Timestamp(100, 0);
+ OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0);
heartbeatFromMember(HostAndPort("h1"),
"rs0",
@@ -533,7 +533,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
ASSERT_EQUALS(
HostAndPort(),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Add primary
@@ -548,18 +548,20 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
// h3 is primary and should be chosen as the sync source when we are not in catch-up mode,
// despite being further away than h2 and the primary (h3) being behind our most recently
// applied optime.
- ASSERT_EQUALS(
- HostAndPort("h3"),
- getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as
// the sync source.
- ASSERT_EQUALS(
- HostAndPort("h2"),
- getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kAllowChaining));
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kAllowChaining));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary: should not choose self as sync source.
@@ -573,7 +575,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
ASSERT_EQUALS(
HostAndPort(),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
@@ -589,7 +591,7 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
HostAndPort h2("h2"), h3("h3");
Timestamp t1(1, 0), t5(5, 0), t10(10, 0);
- OpTime ot1(t1, 0), ot5(t5, 0);
+ OpTime ot1(t1, 0), ot5(t5, 0), ot10(t10, 0);
Milliseconds hbRTT100(100), hbRTT300(300);
// Two rounds of heartbeat pings from each member.
@@ -600,18 +602,18 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
// Should choose h3 as it is a voter
auto newSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(h3, newSource);
// Can't choose h2 as it is not a voter
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, t10, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort(), newSource);
// Should choose h3 as it is a voter, and ahead
heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300);
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, t1, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(h3, newSource);
}
@@ -657,7 +659,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
ASSERT_EQUALS(
HostAndPort("h2"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary
@@ -668,7 +670,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
ASSERT_EQUALS(
HostAndPort("h2"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -712,7 +714,7 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should overrule other defaults
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
@@ -722,12 +724,12 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
ASSERT_TRUE(
getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now()));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// force should only work for one call to chooseNewSyncSource
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -770,19 +772,19 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp
Milliseconds(100));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime);
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// Should choose second best choice now that h3 is blacklisted.
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// After time has passed, should go back to original sync source
getTopoCoord().chooseNewSyncSource(
- expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -828,19 +830,19 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis
Milliseconds(100));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime);
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// Can't choose any sync source now.
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// After time has passed, should go back to the primary
getTopoCoord().chooseNewSyncSource(
- expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -885,7 +887,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
ASSERT_EQUALS(
HostAndPort("h3"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
// Good state setup done
@@ -894,7 +896,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout);
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
- Timestamp(),
+ OpTime(),
TopologyCoordinator::ChainingPreference::kUseConfiguration)
.empty());
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
@@ -904,7 +906,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized);
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
- Timestamp(),
+ OpTime(),
TopologyCoordinator::ChainingPreference::kUseConfiguration)
.empty());
ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s);
@@ -1280,9 +1282,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
}
@@ -1329,9 +1330,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress());
}
@@ -1380,7 +1380,7 @@ TEST_F(TopoCoordTest,
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h6"), syncSource);
}
@@ -1494,9 +1494,8 @@ TEST_F(TopoCoordTest,
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
ASSERT_FALSE(responseObj.hasField("prevSyncTarget"));
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
heartbeatFromMember(
@@ -4893,7 +4892,7 @@ TEST_F(PrepareHeartbeatResponseTest,
heartbeatFromMember(
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// set up args
ReplSetHeartbeatArgs args;
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 851de44c9f8..3cb7f7ec038 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -251,7 +251,7 @@ private:
TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// if we do not have an index in the config, we should get an empty syncsource
HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_TRUE(newSyncSource.empty());
updateConfig(BSON("_id"
@@ -278,7 +278,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Fail due to insufficient number of pings
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
@@ -289,7 +289,7 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
// Should choose h2, since it is furthest ahead
newSyncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -297,34 +297,34 @@ TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 becomes an invalid candidate for sync source; should choose h2 again
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_RECOVERING, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back in SECONDARY and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// h3 goes down
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime());
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 back up and ahead
heartbeatFromMember(
HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(2, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -369,7 +369,7 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) {
0);
setSelfMemberState(MemberState::RS_SECONDARY);
- Timestamp lastOpTimeWeApplied = Timestamp(100, 0);
+ OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 0);
heartbeatFromMember(HostAndPort("h1"),
"rs0",
@@ -500,6 +500,57 @@ TEST_F(TopoCoordTest, NodeReturnsClosestValidSyncSourceAsSyncSource) {
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
+TEST_F(TopoCoordTest, NodeWontChooseSyncSourceFromOlderTerm) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "hself")
+ << BSON("_id" << 10 << "host"
+ << "h1")
+ << BSON("_id" << 20 << "host"
+ << "h2"))),
+ 0);
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime lastOpTimeWeApplied = OpTime(Timestamp(100, 0), 3);
+
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(200, 0), 3),
+ Milliseconds(200));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(300, 0), 2), // old term
+ Milliseconds(100));
+
+ // Record 2nd round of pings to allow choosing a new sync source
+ heartbeatFromMember(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(200, 0), 3),
+ Milliseconds(200));
+ heartbeatFromMember(HostAndPort("h2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ OpTime(Timestamp(300, 0), 2), // old term
+ Milliseconds(100));
+
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT_EQUALS(HostAndPort("h1"), getTopoCoord().getSyncSourceAddress());
+
+ // h1 goes down; no sync source candidates remain
+ receiveDownHeartbeat(HostAndPort("h1"), "rs0", OpTime());
+ getTopoCoord().chooseNewSyncSource(
+ now()++, lastOpTimeWeApplied, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+}
+
TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
updateConfig(BSON("_id"
@@ -544,7 +595,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
ASSERT_EQUALS(
HostAndPort(),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Add primary
@@ -559,18 +610,20 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
// h3 is primary and should be chosen as the sync source when we are not in catch-up mode,
// despite being further away than h2 and the primary (h3) being behind our most recently
// applied optime.
- ASSERT_EQUALS(
- HostAndPort("h3"),
- getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ ASSERT_EQUALS(HostAndPort("h3"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
// When we are in catch-up mode, the chainingAllowed setting is ignored. h2 should be chosen as
// the sync source.
- ASSERT_EQUALS(
- HostAndPort("h2"),
- getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(10, 0), TopologyCoordinator::ChainingPreference::kAllowChaining));
+ ASSERT_EQUALS(HostAndPort("h2"),
+ getTopoCoord().chooseNewSyncSource(
+ now()++,
+ OpTime(Timestamp(10, 0), 0),
+ TopologyCoordinator::ChainingPreference::kAllowChaining));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary: should not choose self as sync source.
@@ -584,7 +637,7 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenChainingIsDisallowed) {
ASSERT_EQUALS(
HostAndPort(),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
}
@@ -600,7 +653,7 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
HostAndPort h2("h2"), h3("h3");
Timestamp t1(1, 0), t5(5, 0), t10(10, 0);
- OpTime ot1(t1, 0), ot5(t5, 0);
+ OpTime ot1(t1, 0), ot5(t5, 0), ot10(t10, 0);
Milliseconds hbRTT100(100), hbRTT300(300);
// Two rounds of heartbeat pings from each member.
@@ -611,18 +664,18 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
// Should choose h3 as it is a voter
auto newSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(h3, newSource);
// Can't choose h2 as it is not a voter
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, t10, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, ot10, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort(), newSource);
// Should choose h3 as it is a voter, and ahead
heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300);
newSource = getTopoCoord().chooseNewSyncSource(
- now()++, t1, TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, ot1, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(h3, newSource);
}
@@ -668,7 +721,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
ASSERT_EQUALS(
HostAndPort("h2"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// Become primary
@@ -679,7 +732,7 @@ TEST_F(TopoCoordTest, ChooseSameSyncSourceEvenWhenPrimary) {
ASSERT_EQUALS(
HostAndPort("h2"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -713,7 +766,7 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should overrule other defaults
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
@@ -723,12 +776,12 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now()));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// force should only work for one call to chooseNewSyncSource
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -771,19 +824,19 @@ TEST_F(TopoCoordTest, NodeDoesNotChooseBlacklistedSyncSourceUntilBlacklistingExp
Milliseconds(100));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h3"), expireTime);
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// Should choose second best choice now that h3 is blacklisted.
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// After time has passed, should go back to original sync source
getTopoCoord().chooseNewSyncSource(
- expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
}
@@ -829,19 +882,19 @@ TEST_F(TopoCoordTest, ChooseNoSyncSourceWhenPrimaryIsBlacklistedAndChainingIsDis
Milliseconds(100));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
Date_t expireTime = Date_t::fromMillisSinceEpoch(1000);
getTopoCoord().blacklistSyncSource(HostAndPort("h2"), expireTime);
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// Can't choose any sync source now.
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// After time has passed, should go back to the primary
getTopoCoord().chooseNewSyncSource(
- expireTime, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ expireTime, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
}
@@ -886,7 +939,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
ASSERT_EQUALS(
HostAndPort("h3"),
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration));
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
// Good state setup done
@@ -895,7 +948,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::NetworkTimeout);
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
- Timestamp(),
+ OpTime(),
TopologyCoordinator::ChainingPreference::kUseConfiguration)
.empty());
ASSERT_EQUALS(MemberState::RS_SECONDARY, getTopoCoord().getMemberState().s);
@@ -905,7 +958,7 @@ TEST_F(TopoCoordTest, NodeChangesToRecoveringWhenOnlyUnauthorizedNodesAreUp) {
receiveDownHeartbeat(HostAndPort("h3"), "rs0", OpTime(), ErrorCodes::Unauthorized);
ASSERT_TRUE(getTopoCoord()
.chooseNewSyncSource(now()++,
- Timestamp(),
+ OpTime(),
TopologyCoordinator::ChainingPreference::kUseConfiguration)
.empty());
ASSERT_EQUALS(MemberState::RS_RECOVERING, getTopoCoord().getMemberState().s);
@@ -1281,9 +1334,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAStaleNode) {
ASSERT_OK(result);
ASSERT_EQUALS("requested member \"h5:27017\" is more than 10 seconds behind us",
response.obj()["warning"].String());
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
}
@@ -1330,9 +1382,8 @@ TEST_F(TopoCoordTest, ChooseRequestedNodeWhenSyncFromRequestsAValidNode) {
ASSERT_OK(result);
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h6"), getTopoCoord().getSyncSourceAddress());
}
@@ -1381,7 +1432,7 @@ TEST_F(TopoCoordTest,
ASSERT_FALSE(responseObj.hasField("warning"));
receiveDownHeartbeat(HostAndPort("h6"), "rs0", OpTime());
HostAndPort syncSource = getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h6"), syncSource);
}
@@ -1495,9 +1546,8 @@ TEST_F(TopoCoordTest,
BSONObj responseObj = response.obj();
ASSERT_FALSE(responseObj.hasField("warning"));
ASSERT_FALSE(responseObj.hasField("prevSyncTarget"));
- getTopoCoord().chooseNewSyncSource(now()++,
- ourOpTime.getTimestamp(),
- TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ getTopoCoord().chooseNewSyncSource(
+ now()++, ourOpTime, TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h5"), getTopoCoord().getSyncSourceAddress());
heartbeatFromMember(
@@ -1960,7 +2010,7 @@ TEST_F(PrepareHeartbeatResponseV1Test,
heartbeatFromMember(
HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, OpTime(Timestamp(1, 0), 0));
getTopoCoord().chooseNewSyncSource(
- now()++, Timestamp(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
// set up args
ReplSetHeartbeatArgsV1 args;