summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-20 22:08:27 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-27 17:37:55 -0400
commit3dd8ba46bf2ce350b5e80b1b2b016a10007beb7b (patch)
treec0f5896b4727eb7bdc06a5a8fbb9a49d33f1fc1a /src/mongo
parent66a3866209039ab46274dfe27cf3d985e65d453c (diff)
downloadmongo-3dd8ba46bf2ce350b5e80b1b2b016a10007beb7b.tar.gz
SERVER-24222 Update current known primary from command metadata
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp11
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp10
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp7
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp15
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp20
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/sync_source_selector.h7
-rw-r--r--src/mongo/db/repl/topology_coordinator.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp20
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp46
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp65
-rw-r--r--src/mongo/db/repl/update_position_args.h2
-rw-r--r--src/mongo/rpc/metadata/repl_set_metadata.h4
20 files changed, 126 insertions, 115 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 71c8d28dbcd..c5a5b08a0e7 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -85,8 +85,7 @@ public:
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
private:
BackgroundSync* _bgsync;
@@ -99,15 +98,13 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
_bgsync(bgsync) {}
-bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
+ const HostAndPort& source, const rpc::ReplSetMetadata& metadata) {
if (_bgsync->shouldStopFetching()) {
return true;
}
- return DataReplicatorExternalStateImpl::shouldStopFetching(
- source, sourceOpTime, sourceHasSyncSource);
+ return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata);
}
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index d19f46f9711..848af9c720e 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -77,8 +77,7 @@ public:
* metadata).
*/
virtual bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) = 0;
+ const rpc::ReplSetMetadata& metadata) = 0;
private:
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 558f3faf700..87b5993b231 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -60,14 +60,12 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata
}
bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
// Re-evaluate quality of sync target.
- if (_replicationCoordinator->shouldChangeSyncSource(
- source, sourceOpTime, sourceHasSyncSource)) {
+ if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) {
LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: "
- << source << ", OpTime " << sourceOpTime
- << ", hasSyncSource:" << sourceHasSyncSource;
+ << source << ", OpTime " << metadata.getLastOpVisible()
+ << ", its sync source index:" << metadata.getSyncSourceIndex();
return true;
}
return false;
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 25a09e1d7db..e94dccdab32 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -51,8 +51,7 @@ public:
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
private:
StatusWith<OpTime> _multiApply(OperationContext* txn,
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 83a6e3fd83c..2ec80fad216 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -47,11 +47,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata
}
bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
lastSyncSourceChecked = source;
- syncSourceLastOpTime = sourceOpTime;
- syncSourceHasSyncSource = sourceHasSyncSource;
+ syncSourceLastOpTime = metadata.getLastOpVisible();
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
return shouldStopFetchingResult;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 4705fb57bd3..6335552f4b8 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -48,8 +48,7 @@ public:
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const OpTime& sourceOpTime,
- bool sourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
// Returned by getCurrentTermAndLastCommittedOpTime.
long long currentTerm = OpTime::kUninitializedTerm;
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 647b623e689..300100d4726 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -83,8 +83,7 @@ public:
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -125,10 +124,8 @@ public:
_syncSourceSelector->blacklistSyncSource(host, until);
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
- return _syncSourceSelector->shouldChangeSyncSource(
- currentSource, sourcesOpTime, syncSourceHasSyncSource);
+ const rpc::ReplSetMetadata& metadata) override {
+ return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata);
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched) override {
@@ -697,8 +694,7 @@ public:
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -829,8 +825,7 @@ public:
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& sourcesOpTime,
- bool syncSourceHasSyncSource) override {
+ const rpc::ReplSetMetadata& metadata) override {
return false;
}
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 1655a717210..ec02d16ce1a 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -281,8 +281,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
}
const auto& queryResponse = result.getValue();
- OpTime sourcesLastOpTime;
- bool syncSourceHasSyncSource = false;
+ rpc::ReplSetMetadata metadata;
// Forward metadata (containing liveness information) to data replicator external state.
bool receivedMetadata =
@@ -296,10 +295,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_onShutdown(metadataResult.getStatus());
return;
}
- auto metadata = metadataResult.getValue();
+ metadata = metadataResult.getValue();
_dataReplicatorExternalState->processMetadata(metadata);
- sourcesLastOpTime = metadata.getLastOpVisible();
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
}
const auto& documents = queryResponse.documents;
@@ -349,13 +346,14 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_lastFetched = opTimeWithHash;
}
- if (_dataReplicatorExternalState->shouldStopFetching(
- _fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) {
+ if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) {
_onShutdown(Status(ErrorCodes::InvalidSyncSource,
- str::stream() << "sync source " << _fetcher.getSource().toString()
- << " (last optime: " << sourcesLastOpTime.toString()
- << "; has sync source: " << syncSourceHasSyncSource
- << ") is no longer valid"),
+ str::stream()
+ << "sync source " << _fetcher.getSource().toString()
+ << " (last optime: " << metadata.getLastOpVisible().toString()
+ << "; sync source index: " << metadata.getSyncSourceIndex()
+ << "; primary index: " << metadata.getPrimaryIndex()
+ << ") is no longer valid"),
opTimeWithHash);
return;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 172f10b8a08..c33326031b7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2893,14 +2893,10 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
}
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
LockGuard topoLock(_topoMutex);
- return _topCoord->shouldChangeSyncSource(currentSource,
- getMyLastAppliedOpTime(),
- syncSourceLastOpTime,
- syncSourceHasSyncSource,
- _replExecutor.now());
+ return _topCoord->shouldChangeSyncSource(
+ currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now());
}
SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index d7ed62f89d7..53f0ffde4c3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -275,8 +275,7 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn) override;
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) override;
+ const rpc::ReplSetMetadata& metadata) override;
virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched) override;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 5384bf45d95..fc0f9866f55 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -357,8 +357,7 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn
}
bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) {
+ const rpc::ReplSetMetadata& metadata) {
invariant(false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 5fdf319306d..2b56a93c219 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -208,8 +208,7 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn);
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource);
+ const rpc::ReplSetMetadata& metadata);
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 812944420a9..c3621c3e95d 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -68,17 +68,16 @@ public:
/**
* Determines if a new sync source should be chosen, if a better candidate sync source is
- * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * available. If the current sync source's last optime (visibleOpTime of metadata under
* protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than
* _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
* running in ProtocolVersion 1, our current sync source is not primary, has no sync source
- * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
+ * and only has data up to "myLastOpTime", returns true.
*
* "now" is used to skip over currently blacklisted sync sources.
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource) = 0;
+ const rpc::ReplSetMetadata& metadata) = 0;
/**
* Returns a SyncSourceResolverResponse containing the syncSource or a new MinValid boundry as
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 0fd9974def5..27242c393d8 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -165,8 +165,7 @@ public:
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const = 0;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 45f8e6280ad..1cfaee288d4 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2290,10 +2290,11 @@ long long TopologyCoordinatorImpl::getTerm() {
return _term;
}
+// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
+// replset. Passing metadata is unnecessary.
bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const {
// Methodology:
// If there exists a viable sync source member other than currentSource, whose oplog has
@@ -2307,14 +2308,16 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return true;
}
- const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
- if (currentSourceIndex == -1) {
+ if (metadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
return true;
}
+
+ const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
+ invariant(currentSourceIndex != -1);
invariant(currentSourceIndex != _selfIndex);
OpTime currentSourceOpTime =
- std::max(syncSourceLastOpTime, _hbdata.at(currentSourceIndex).getAppliedOpTime());
+ std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime());
if (currentSourceOpTime.isNull()) {
// Haven't received a heartbeat from the sync source yet, so can't tell if we should
@@ -2322,9 +2325,10 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
- if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource &&
- currentSourceOpTime <= myLastOpTime &&
- _hbdata.at(currentSourceIndex).getState() != MemberState::RS_PRIMARY) {
+ // Change sync source if they are not ahead of us, and don't have a sync source,
+ // unless they are primary.
+ if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 &&
+ currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) {
return true;
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 74d7368d552..0cd56272565 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -156,8 +156,7 @@ public:
virtual void clearSyncSourceBlacklist();
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const OpTime& syncSourceLastOpTime,
- bool syncSourceHasSyncSource,
+ const rpc::ReplSetMetadata& metadata,
Date_t now) const;
virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
virtual void setElectionSleepUntil(Date_t newTime);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index b0e41018c32..6d1e9f08a97 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -51,6 +51,7 @@
ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION))
using std::unique_ptr;
+using mongo::rpc::ReplSetMetadata;
namespace mongo {
namespace repl {
@@ -146,6 +147,12 @@ protected:
_currentConfig = config;
}
+ // Make the metadata coming from sync source. Only set visibleOpTime.
+ ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) {
+ return ReplSetMetadata(
+ _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1);
+ }
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -629,9 +636,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now()));
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now()));
ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now()));
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -4909,9 +4916,10 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
- // "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host4"), OpTime(), OpTime(), false, now()));
+ // "host4" since "host4" is absent from the config of version 10.
+ ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) {
@@ -4919,7 +4927,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU
// "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime)
// for "host2"
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) {
@@ -4949,8 +4957,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ auto metadata = makeMetadata(lastOpTimeApplied);
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -4982,8 +4991,9 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
+ auto metadata = makeMetadata(fresherLastOpTimeApplied);
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5015,7 +5025,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5049,18 +5059,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -5094,7 +5104,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown)
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -5124,7 +5134,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) {
@@ -5164,7 +5174,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTest,
@@ -5207,7 +5217,7 @@ TEST_F(HeartbeatResponseTest,
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 79d6f7e6727..826905a860a 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -52,6 +52,7 @@
ASSERT_EQUALS(mongo::repl::HeartbeatResponseAction::NoAction, (EXPRESSION))
using std::unique_ptr;
+using mongo::rpc::ReplSetMetadata;
namespace mongo {
namespace repl {
@@ -148,6 +149,20 @@ protected:
_currentConfig = config;
}
+ // Make the metadata coming from sync source.
+ // Only set visibleOpTime, primaryIndex and syncSourceIndex
+ ReplSetMetadata makeMetadata(OpTime opTime = OpTime(),
+ int primaryIndex = -1,
+ int syncSourceIndex = -1) {
+ return ReplSetMetadata(_topo->getTerm(),
+ OpTime(),
+ opTime,
+ _currentConfig.getConfigVersion(),
+ OID(),
+ primaryIndex,
+ syncSourceIndex);
+ }
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -622,9 +637,9 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h2"), OpTime(), oldOpTime, false, now()));
+ HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h3"), OpTime(), newOpTime, false, now()));
+ HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -2696,7 +2711,7 @@ TEST_F(HeartbeatResponseTestV1,
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2738,7 +2753,7 @@ TEST_F(HeartbeatResponseTestV1,
ASSERT_NO_ACTION(nextAction.getAction());
// Show we like host2 while it is primary.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now()));
// Show that we also like host2 while it has a sync source.
nextAction = receiveUpHeartbeat(HostAndPort("host2"),
@@ -2749,7 +2764,7 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now()));
// Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress
// beyond our own.
@@ -2761,9 +2776,16 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now()));
+
+ // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata.
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */),
+ now()));
- // But if it has some progress beyond our own, we still like it.
+ // But if it is secondary and has some progress beyond our own, we still like it.
OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0);
nextAction = receiveUpHeartbeat(HostAndPort("host2"),
"rs0",
@@ -2773,7 +2795,7 @@ TEST_F(HeartbeatResponseTestV1,
newerThanLastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now()));
+ HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
@@ -2805,7 +2827,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) {
@@ -2837,18 +2859,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2881,7 +2903,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2914,7 +2936,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2946,7 +2968,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2955,14 +2977,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since we do not use the member's heartbeatdata in pv1.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), OpTime(), true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
- // "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host4"), OpTime(), OpTime(), true, now()));
+ // "host4" since "host4" is absent from the config of version 10.
+ ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
}
// TODO(dannenberg) figure out what this is trying to test..
@@ -3815,7 +3838,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -3845,7 +3868,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
+ HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
}
class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 {
diff --git a/src/mongo/db/repl/update_position_args.h b/src/mongo/db/repl/update_position_args.h
index bb4a94d8ffb..0acac33dd6f 100644
--- a/src/mongo/db/repl/update_position_args.h
+++ b/src/mongo/db/repl/update_position_args.h
@@ -40,7 +40,7 @@ class Status;
namespace repl {
/**
- * Arguments to the handshake command.
+ * Arguments to the update position command.
*/
class UpdatePositionArgs {
public:
diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h
index 24022063a04..a73e3001015 100644
--- a/src/mongo/rpc/metadata/repl_set_metadata.h
+++ b/src/mongo/rpc/metadata/repl_set_metadata.h
@@ -134,8 +134,8 @@ public:
}
private:
- repl::OpTime _lastOpCommitted = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- repl::OpTime _lastOpVisible = repl::OpTime(Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+ repl::OpTime _lastOpCommitted;
+ repl::OpTime _lastOpVisible;
long long _currentTerm = -1;
long long _configVersion = -1;
OID _replicaSetId;