summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-10-12 08:07:25 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-10-14 05:07:49 -0400
commitc860db7d39f8559054d1cebb83a6838accef8d94 (patch)
treeb3110c22e4afcdf6313144797957c8f6c4efe929
parent7d43b0dba28b4b8e0184579a75c3dddab9d86e1f (diff)
downloadmongo-c860db7d39f8559054d1cebb83a6838accef8d94.tar.gz
SERVER-20822 make sync source decisions based on ReplSetMetadata
-rw-r--r--src/mongo/db/repl/bgsync.cpp13
-rw-r--r--src/mongo/db/repl/bgsync.h15
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp15
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp59
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/sync_source_selector.h16
-rw-r--r--src/mongo/db/repl/topology_coordinator.h10
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp23
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp158
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp174
-rw-r--r--src/mongo/rpc/metadata/repl_set_metadata.cpp16
-rw-r--r--src/mongo/rpc/metadata/repl_set_metadata.h21
-rw-r--r--src/mongo/rpc/metadata/repl_set_metadata_test.cpp15
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp20
19 files changed, 382 insertions, 221 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 96deab1683a..118a9e02046 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -426,6 +426,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
const auto& queryResponse = result.getValue();
+ bool syncSourceHasSyncSource = false;
+ OpTime sourcesLastOp;
// Forward metadata (containing liveness information) to replication coordinator.
bool receivedMetadata =
@@ -443,6 +445,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
_replCoord->cancelAndRescheduleElectionTimeout();
}
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+ sourcesLastOp = metadata.getLastOpVisible();
}
const auto& documents = queryResponse.documents;
@@ -542,7 +546,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
// re-evaluate quality of sync target
- if (_shouldChangeSyncSource(source)) {
+ if (_shouldChangeSyncSource(source, sourcesLastOp, syncSourceHasSyncSource)) {
return;
}
@@ -562,7 +566,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
}
-bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) {
+bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource) {
// is it even still around?
if (getSyncTarget().empty() || syncSource.empty()) {
return true;
@@ -570,7 +576,8 @@ bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) {
// check other members: is any member's optime more than MaxSyncSourceLag seconds
// ahead of the current sync source?
- return _replCoord->shouldChangeSyncSource(syncSource);
+ return _replCoord->shouldChangeSyncSource(
+ syncSource, syncSourceLastOpTime, syncSourceHasSyncSource);
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 936a18c1209..56bedc5d8cf 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -191,8 +191,19 @@ private:
const HostAndPort& source,
stdx::function<DBClientBase*()> getConnection);
- // Evaluate if the current sync target is still good
- bool _shouldChangeSyncSource(const HostAndPort& syncSource);
+ /**
+ * Evaluate if the current sync source is still good.
+ * "syncSource" is the name of the current sync source, which will be used to look up the
+ * member's heartbeat data.
+ * "syncSourceLastOpTime" is the last OpTime the sync source has. This is passed in because the
+ * data stored from heartbeats could be too stale and would cause unnecessary sync source
+ * changes.
+ * "syncSourceHasSyncSource" indicates whether our sync source is currently syncing from another
+ * member.
+ */
+ bool _shouldChangeSyncSource(const HostAndPort& syncSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource);
// restart syncing
void start(OperationContext* txn);
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 887a7a924bf..adff8d96782 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -76,7 +76,9 @@ public:
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_blacklistedSource = host;
}
- bool shouldChangeSyncSource(const HostAndPort& currentSource) override {
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& sourcesOpTime,
+ bool syncSourceHasSyncSource) override {
return false;
}
HostAndPort _syncSource;
@@ -112,8 +114,11 @@ public:
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
}
- bool shouldChangeSyncSource(const HostAndPort& currentSource) override {
- return _syncSourceSelector->shouldChangeSyncSource(currentSource);
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& sourcesOpTime,
+ bool syncSourceHasSyncSource) override {
+ return _syncSourceSelector->shouldChangeSyncSource(
+ currentSource, sourcesOpTime, syncSourceHasSyncSource);
}
void scheduleNetworkResponse(const BSONObj& obj) {
@@ -554,7 +559,9 @@ public:
LockGuard lk(_mutex);
_blacklistedSource = host;
}
- bool shouldChangeSyncSource(const HostAndPort& currentSource) override {
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& sourcesOpTime,
+ bool syncSourceHasSyncSource) override {
return false;
}
mutable stdx::mutex _mutex;
@@ -680,7 +687,9 @@ public:
return HostAndPort();
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {}
- bool shouldChangeSyncSource(const HostAndPort& currentSource) override {
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& sourcesOpTime,
+ bool syncSourceHasSyncSource) override {
return false;
}
ReplicationExecutor* _exec;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 087a74ec523..20394ff090b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2679,22 +2679,31 @@ void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn)
void ReplicationCoordinatorImpl::_shouldChangeSyncSource(
const ReplicationExecutor::CallbackArgs& cbData,
const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource,
bool* shouldChange) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
- *shouldChange =
- _topCoord->shouldChangeSyncSource(currentSource, getMyLastOptime(), _replExecutor.now());
+ *shouldChange = _topCoord->shouldChangeSyncSource(currentSource,
+ getMyLastOptime(),
+ syncSourceLastOpTime,
+ syncSourceHasSyncSource,
+ _replExecutor.now());
}
-bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) {
+bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource) {
bool shouldChange(false);
CBHStatus cbh =
_replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource,
this,
stdx::placeholders::_1,
currentSource,
+ syncSourceLastOpTime,
+ syncSourceHasSyncSource,
&shouldChange));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return false;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 018cc7a8539..eb107186bdf 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -255,7 +255,9 @@ public:
virtual void resetLastOpTimeFromOplog(OperationContext* txn) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) override;
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource) override;
virtual OpTime getLastCommittedOpTime() const override;
@@ -946,6 +948,8 @@ private:
*/
void _shouldChangeSyncSource(const ReplicationExecutor::CallbackArgs& cbData,
const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource,
bool* shouldChange);
/**
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 32f85354f85..d0d57a626bd 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -338,12 +338,13 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeat) {
auto test = [this](OpTime committedOpTime, OpTime expected) {
// process heartbeat metadata directly
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName << BSON(
- "lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t"
- << committedOpTime.getTerm()) << "lastOpVisible"
- << BSON("ts" << committedOpTime.getTimestamp() << "t"
- << committedOpTime.getTerm()) << "configVersion" << 1
- << "primaryIndex" << 1 << "term" << committedOpTime.getTerm())));
+ rpc::kReplSetMetadataFieldName
+ << BSON("lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t"
+ << committedOpTime.getTerm()) << "lastOpVisible"
+ << BSON("ts" << committedOpTime.getTimestamp() << "t"
+ << committedOpTime.getTerm()) << "configVersion"
+ << 1 << "primaryIndex" << 1 << "term"
+ << committedOpTime.getTerm() << "syncSourceIndex" << 1)));
ASSERT_OK(metadata.getStatus());
getReplCoord()->processReplSetMetadata(metadata.getValue());
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 674f50bfc9d..bf4c680f29a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2617,19 +2617,20 @@ TEST_F(ReplCoordTest, MetadataWrongConfigVersion) {
// lower configVersion
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion"
- << 1 << "primaryIndex" << 2 << "term" << 2)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1
+ << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// higher configVersion
- StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion"
- << 100 << "primaryIndex" << 2 << "term" << 2)));
+ StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(
+ BSON(rpc::kReplSetMetadataFieldName
+ << BSON("lastOpCommitted"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100
+ << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
}
@@ -2660,20 +2661,20 @@ TEST_F(ReplCoordTest, MetadataUpdatesLastCommittedOpTime) {
// higher OpTime, should change
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion"
- << 2 << "primaryIndex" << 2 << "term" << 1)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2
+ << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());
// lower OpTime, should not change
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible"
- << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion"
- << 2 << "primaryIndex" << 2 << "term" << 1)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible"
+ << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2
+ << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
}
@@ -2700,10 +2701,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) {
// higher term, should change
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion"
- << 2 << "primaryIndex" << 2 << "term" << 3)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2
+ << "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
@@ -2711,10 +2712,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) {
// lower term, should not change
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion"
- << 2 << "primaryIndex" << 1 << "term" << 2)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
+ << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
+ << "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
@@ -2722,10 +2723,10 @@ TEST_F(ReplCoordTest, MetadataUpdatesTermAndPrimaryId) {
// same term, should not change
StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
- << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion"
- << 2 << "primaryIndex" << 1 << "term" << 3)));
+ rpc::kReplSetMetadataFieldName << BSON(
+ "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
+ << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
+ << "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1)));
getReplCoord()->processReplSetMetadata(metadata3.getValue());
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 4750fa28602..b9e9bdbd26c 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -320,7 +320,9 @@ void ReplicationCoordinatorMock::resetLastOpTimeFromOplog(OperationContext* txn)
invariant(false);
}
-bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource) {
+bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource) {
invariant(false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 5fbbef2fc51..362ef8cba73 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -185,7 +185,9 @@ public:
virtual void resetLastOpTimeFromOplog(OperationContext* txn);
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource);
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 71eccdb8a30..1155ef0be20 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -38,6 +38,8 @@ class Timestamp;
namespace repl {
+class OpTime;
+
/**
* Manage list of viable and blocked sync sources that we can replicate from.
*/
@@ -64,10 +66,18 @@ public:
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0;
/**
- * Determines if a new sync source should be considered.
- * currentSource: the current sync source
+ * Determines if a new sync source should be chosen, if a better candidate sync source is
+ * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than
+ * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
+ * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
+ * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
+ *
+ * "now" is used to skip over currently blacklisted sync sources.
*/
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0;
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index b0ff312c2c9..89f8aca3c5f 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -153,14 +153,18 @@ public:
/**
* Determines if a new sync source should be chosen, if a better candidate sync source is
- * available. If the current sync source's last optime is more than _maxSyncSourceLagSecs
- * behind any syncable source, this function returns true. If our current sync source is not
- * primary, has no sync source, and only has data up to "myLastOpTime", returns true.
+ * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than
+ * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
+ * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
+ * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
*
* "now" is used to skip over currently blacklisted sync sources.
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource,
Date_t now) const = 0;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 739c523b853..d7220c4fca4 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2300,6 +2300,8 @@ long long TopologyCoordinatorImpl::getTerm() {
bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource,
Date_t now) const {
// Methodology:
// If there exists a viable sync source member other than currentSource, whose oplog has
@@ -2319,8 +2321,8 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
}
invariant(currentSourceIndex != _selfIndex);
- const auto& currentSourceHBData = _hbdata[currentSourceIndex];
- OpTime currentSourceOpTime = currentSourceHBData.getOpTime();
+ OpTime currentSourceOpTime =
+ std::max(syncSourceLastOpTime, _hbdata[currentSourceIndex].getOpTime());
if (currentSourceOpTime.isNull()) {
// Haven't received a heartbeat from the sync source yet, so can't tell if we should
@@ -2328,8 +2330,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
- if (currentSourceHBData.getSyncSource().empty() && currentSourceOpTime <= myLastOpTime &&
- currentSourceHBData.getState() != MemberState::RS_PRIMARY) {
+ if (_rsConfig.getProtocolVersion() == 1 && !syncSourceHasSyncSource &&
+ currentSourceOpTime <= myLastOpTime &&
+ _hbdata[currentSourceIndex].getState() != MemberState::RS_PRIMARY) {
return true;
}
@@ -2359,11 +2362,13 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
const OpTime& lastVisibleOpTime,
const OpTime& lastCommittedOpTime) const {
- *metadata = rpc::ReplSetMetadata(_term,
- lastCommittedOpTime,
- lastVisibleOpTime,
- _rsConfig.getConfigVersion(),
- _currentPrimaryIndex);
+ *metadata =
+ rpc::ReplSetMetadata(_term,
+ lastCommittedOpTime,
+ lastVisibleOpTime,
+ _rsConfig.getConfigVersion(),
+ _currentPrimaryIndex,
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
}
void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 49263136f49..9509285080c 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -160,6 +160,8 @@ public:
virtual void clearSyncSourceBlacklist();
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
+ const OpTime& syncSourceLastOpTime,
+ bool syncSourceHasSyncSource,
Date_t now) const;
virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
virtual void setElectionSleepUntil(Date_t newTime);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 0301abe215d..73e3e794c4e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -620,8 +620,10 @@ TEST_F(TopoCoordTest, ForceSyncSource) {
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), now()));
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), now()));
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), OpTime(), false, now()));
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), OpTime(), false, now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -4189,19 +4191,22 @@ TEST_F(HeartbeatResponseTest, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepso
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host4"), OpTime(), OpTime(), false, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceMemberHasYetToHeartbeat) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime)
// for "host2"
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
}
-TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) {
- // In this test, the TopologyCoordinator should tell us to change sync sources away from
- // "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3"
+TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind
+ // "host3" according to metadata, not heartbeat data.
OpTime election = OpTime();
OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0);
// ahead by more than maxSyncSourceLagSecs (30)
@@ -4211,7 +4216,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists)
"rs0",
MemberState::RS_SECONDARY,
election,
- lastOpTimeApplied,
+ fresherLastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
@@ -4225,18 +4230,18 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists)
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
+ ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
-TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed) {
+TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbeatButNotMetadata) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
- // "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind
- // "host3", since "host3" is blacklisted
- // Then, confirm that unblacklisting only works if time has passed the blacklist time.
+ // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind
+ // "host3" according to heartbeat data, not metadata.
OpTime election = OpTime();
- OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0);
+ OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0);
// ahead by more than maxSyncSourceLagSecs (30)
OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0);
@@ -4255,83 +4260,90 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed)
fresherLastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
-
- // unblacklist with too early a time (node should remained blacklisted)
- getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
-
- // unblacklist and it should succeed
- getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
+ ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
}
-TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceCurrentTargetNoLongerPrimary) {
- // In this test, the TopologyCoordinator will tell us change our sync source away from "host2"
- // when it is not ahead of us, unless it is PRIMARY or has a sync source of its own.
+TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherHappierMemberExists) {
+ // In this test, the TopologyCoordinator should tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3"
OpTime election = OpTime();
- OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0);
+ OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0);
+ // ahead by more than maxSyncSourceLagSecs (30)
+ OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0);
HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"),
"rs0",
- MemberState::RS_PRIMARY,
+ MemberState::RS_SECONDARY,
election,
lastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- // Show we like host2 while it is primary.
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
-
- // Show that we also like host2 while it has a sync source.
- // Cannot use receiveUpHeartbeat helper because syncingTo must be set.
- ReplSetHeartbeatResponse hb;
- hb.setConfigVersion(1);
- hb.setState(MemberState::RS_SECONDARY);
- hb.setOpTime(lastOpTimeApplied);
- hb.setElectionTime(election.getTimestamp());
- hb.setSyncingTo(HostAndPort("host2"));
-
- StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
- getTopoCoord().prepareHeartbeatRequest(now(), "rs0", HostAndPort("host2"));
- now() += Milliseconds(1);
- ASSERT_NO_ACTION(
- getTopoCoord()
- .processHeartbeatResponse(
- now(), Milliseconds(1), HostAndPort("host2"), hbResponse, lastOpTimeApplied)
- .getAction());
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
-
- // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress
- // beyond our own.
- nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ nextAction = receiveUpHeartbeat(HostAndPort("host3"),
"rs0",
MemberState::RS_SECONDARY,
election,
- lastOpTimeApplied,
+ fresherLastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
- // But if it has some progress beyond our own, we still like it.
- OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0);
- nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
+}
+
+TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsBlackListed) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind
+ // "host3", since "host3" is blacklisted
+ // Then, confirm that unblacklisting only works if time has passed the blacklist time.
+ OpTime election = OpTime();
+ OpTime lastOpTimeApplied = OpTime(Timestamp(400, 0), 0);
+ // ahead by more than maxSyncSourceLagSecs (30)
+ OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0);
+
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ lastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ nextAction = receiveUpHeartbeat(HostAndPort("host3"),
"rs0",
MemberState::RS_SECONDARY,
election,
- newerThanLastOpTimeApplied,
- newerThanLastOpTimeApplied);
+ fresherLastOpTimeApplied,
+ lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
+ getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
+
+ // set up complete, time for actual check
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+
+ // unblacklist with too early a time (node should remained blacklisted)
+ getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+
+ // unblacklist and it should succeed
+ getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsDown) {
@@ -4362,7 +4374,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsDown) {
// set up complete, time for actual check
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsNotReadable) {
@@ -4391,7 +4404,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberIsNotReadable)
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildIndexes) {
@@ -4430,7 +4444,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildInd
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildIndexesNorDoWe) {
@@ -4471,7 +4486,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceFresherMemberDoesNotBuildInd
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), false, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 0e793a28bf9..24e904e265b 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -593,28 +593,18 @@ TEST_F(TopoCoordTest, ForceSyncSource) {
0);
setSelfMemberState(MemberState::RS_SECONDARY);
+ OpTime oldOpTime = OpTime(Timestamp(1, 0), 0);
+ OpTime newOpTime = OpTime(Timestamp(2, 0), 0);
// two rounds of heartbeat pings from each member
- heartbeatFromMember(HostAndPort("h2"),
- "rs0",
- MemberState::RS_SECONDARY,
- OpTime(Timestamp(1, 0), 0),
- Milliseconds(300));
- heartbeatFromMember(HostAndPort("h2"),
- "rs0",
- MemberState::RS_SECONDARY,
- OpTime(Timestamp(1, 0), 0),
- Milliseconds(300));
- heartbeatFromMember(HostAndPort("h3"),
- "rs0",
- MemberState::RS_SECONDARY,
- OpTime(Timestamp(2, 0), 0),
- Milliseconds(100));
- heartbeatFromMember(HostAndPort("h3"),
- "rs0",
- MemberState::RS_SECONDARY,
- OpTime(Timestamp(2, 0), 0),
- Milliseconds(100));
+ heartbeatFromMember(
+ HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, oldOpTime, Milliseconds(300));
+ heartbeatFromMember(
+ HostAndPort("h2"), "rs0", MemberState::RS_SECONDARY, oldOpTime, Milliseconds(300));
+ heartbeatFromMember(
+ HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100));
+ heartbeatFromMember(
+ HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, newOpTime, Milliseconds(100));
// force should overrule other defaults
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
@@ -622,8 +612,10 @@ TEST_F(TopoCoordTest, ForceSyncSource) {
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), now()));
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("h2"), OpTime(), oldOpTime, false, now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("h3"), OpTime(), newOpTime, false, now()));
getTopoCoord().chooseNewSyncSource(now()++, Timestamp());
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -2291,7 +2283,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberDoesNotBuildI
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
@@ -2331,29 +2324,19 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
// Show we like host2 while it is primary.
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
// Show that we also like host2 while it has a sync source.
- // Cannot use receiveUpHeartbeat helper because syncingTo must be set.
- ReplSetHeartbeatResponse hb;
- hb.setConfigVersion(1);
- hb.setState(MemberState::RS_SECONDARY);
- hb.setOpTime(lastOpTimeApplied);
- hb.setElectionTime(election.getTimestamp());
- hb.setSyncingTo(HostAndPort("host2"));
-
- StatusWith<ReplSetHeartbeatResponse> hbResponse = StatusWith<ReplSetHeartbeatResponse>(hb);
-
- getTopoCoord().prepareHeartbeatRequestV1(now(), "rs0", HostAndPort("host2"));
- now() += Milliseconds(1);
- ASSERT_NO_ACTION(
- getTopoCoord()
- .processHeartbeatResponse(
- now(), Milliseconds(1), HostAndPort("host2"), hbResponse, lastOpTimeApplied)
- .getAction());
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
+ nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ lastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, true, now()));
// Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress
// beyond our own.
@@ -2364,7 +2347,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima
lastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
+ ASSERT(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), lastOpTimeApplied, lastOpTimeApplied, false, now()));
// But if it has some progress beyond our own, we still like it.
OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0);
@@ -2375,8 +2359,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceCurrentTargetNoLongerPrima
newerThanLastOpTimeApplied,
newerThanLastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), lastOpTimeApplied, now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), lastOpTimeApplied, newerThanLastOpTimeApplied, false, now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsDown) {
@@ -2407,7 +2391,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsDown) {
// set up complete, time for actual check
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsBlackListed) {
@@ -2438,20 +2423,89 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsBlackListed
getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
+TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind
+ // "host3" according to metadata, not heartbeat data.
+ OpTime election = OpTime();
+ OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0);
+ // ahead by more than maxSyncSourceLagSecs (30)
+ OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0);
+
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ fresherLastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ nextAction = receiveUpHeartbeat(HostAndPort("host3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ fresherLastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
+}
+
+TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbeatButNotMetadata) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is only more than maxSyncSourceLagSecs(30) behind
+ // "host3" according to heartbeat data, not metadata.
+ OpTime election = OpTime();
+ OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0);
+ // ahead by more than maxSyncSourceLagSecs (30)
+ OpTime fresherLastOpTimeApplied = OpTime(Timestamp(3005, 0), 0);
+
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(HostAndPort("host2"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ lastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ nextAction = receiveUpHeartbeat(HostAndPort("host3"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ fresherLastOpTimeApplied,
+ lastOpTimeApplied);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), fresherLastOpTimeApplied, false, now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("re-evaluating sync source"));
+}
+
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherHappierMemberExists) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host2" and to "host3" since "host2" is more than maxSyncSourceLagSecs(30) behind "host3"
@@ -2478,22 +2532,24 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherHappierMemberExists
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, false, now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("re-evaluating sync source"));
}
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceMemberHasYetToHeartbeat) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
- // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime)
- // for "host2"
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ // "host2" since we do not use the member's heartbeatdata in pv1.
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), OpTime(), true, now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host4"), OpTime(), OpTime(), true, now()));
}
TEST_F(HeartbeatResponseTestV1, ReconfigNodeRemovedBetweenHeartbeatRequestAndRepsonse) {
@@ -3271,7 +3327,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberDoesNotBuildI
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsNotReadable) {
@@ -3300,7 +3357,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceFresherMemberIsNotReadable
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"), OpTime(), lastOpTimeApplied, true, now()));
}
class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 {
diff --git a/src/mongo/rpc/metadata/repl_set_metadata.cpp b/src/mongo/rpc/metadata/repl_set_metadata.cpp
index aadd520a683..5e9a1e89f77 100644
--- a/src/mongo/rpc/metadata/repl_set_metadata.cpp
+++ b/src/mongo/rpc/metadata/repl_set_metadata.cpp
@@ -46,6 +46,7 @@ const char kLastOpCommittedFieldName[] = "lastOpCommitted";
const char kLastOpVisibleFieldName[] = "lastOpVisible";
const char kConfigVersionFieldName[] = "configVersion";
const char kPrimaryIndexFieldName[] = "primaryIndex";
+const char kSyncSourceIndexFieldName[] = "syncSourceIndex";
const char kTermFieldName[] = "term";
} // unnamed namespace
@@ -58,12 +59,14 @@ ReplSetMetadata::ReplSetMetadata(long long term,
OpTime committedOpTime,
OpTime visibleOpTime,
long long configVersion,
- int currentPrimaryIndex)
+ int currentPrimaryIndex,
+ int currentSyncSourceIndex)
: _lastOpCommitted(std::move(committedOpTime)),
_lastOpVisible(std::move(visibleOpTime)),
_currentTerm(term),
_configVersion(configVersion),
- _currentPrimaryIndex(currentPrimaryIndex) {}
+ _currentPrimaryIndex(currentPrimaryIndex),
+ _currentSyncSourceIndex(currentSyncSourceIndex) {}
StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& metadataObj) {
BSONElement replMetadataElement;
@@ -84,6 +87,11 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met
if (!status.isOK())
return status;
+ long long syncSourceIndex;
+ status = bsonExtractIntegerField(replMetadataObj, kSyncSourceIndexFieldName, &syncSourceIndex);
+ if (!status.isOK())
+ return status;
+
long long term;
status = bsonExtractIntegerField(replMetadataObj, kTermFieldName, &term);
if (!status.isOK())
@@ -99,7 +107,8 @@ StatusWith<ReplSetMetadata> ReplSetMetadata::readFromMetadata(const BSONObj& met
if (!status.isOK())
return status;
- return ReplSetMetadata(term, lastOpCommitted, lastOpVisible, configVersion, primaryIndex);
+ return ReplSetMetadata(
+ term, lastOpCommitted, lastOpVisible, configVersion, primaryIndex, syncSourceIndex);
}
Status ReplSetMetadata::writeToMetadata(BSONObjBuilder* builder) const {
@@ -109,6 +118,7 @@ Status ReplSetMetadata::writeToMetadata(BSONObjBuilder* builder) const {
_lastOpVisible.append(&replMetadataBuilder, kLastOpVisibleFieldName);
replMetadataBuilder.append(kConfigVersionFieldName, _configVersion);
replMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex);
+ replMetadataBuilder.append(kSyncSourceIndexFieldName, _currentSyncSourceIndex);
replMetadataBuilder.doneFast();
return Status::OK();
diff --git a/src/mongo/rpc/metadata/repl_set_metadata.h b/src/mongo/rpc/metadata/repl_set_metadata.h
index 722f3befe3e..3e80afc1f9b 100644
--- a/src/mongo/rpc/metadata/repl_set_metadata.h
+++ b/src/mongo/rpc/metadata/repl_set_metadata.h
@@ -55,16 +55,18 @@ public:
repl::OpTime committedOpTime,
repl::OpTime visibleOpTime,
long long configVersion,
- int currentPrimaryIndex);
+ int currentPrimaryIndex,
+ int currentSyncSourceIndex);
/**
* format:
* {
* term: 0,
- * lastOpCommitted: {ts: Timestamp(0, 0), term: 0}
- * lastOpVisible: {ts: Timestamp(0, 0), term: 0}
+ * lastOpCommitted: {ts: Timestamp(0, 0), term: 0},
+ * lastOpVisible: {ts: Timestamp(0, 0), term: 0},
* configVersion: 0,
- * primaryIndex: 0
+ * primaryIndex: 0,
+ * syncSourceIndex: 0
* }
*/
static StatusWith<ReplSetMetadata> readFromMetadata(const BSONObj& doc);
@@ -95,11 +97,19 @@ public:
* Returns the index of the current primary from the perspective of the sender.
* Returns kNoPrimary if there is no primary.
*/
- long long getPrimaryIndex() const {
+ int getPrimaryIndex() const {
return _currentPrimaryIndex;
}
/**
+ * Returns the index of the sync source of the sender.
+ * Returns -1 if it has no sync source.
+ */
+ int getSyncSourceIndex() const {
+ return _currentSyncSourceIndex;
+ }
+
+ /**
* Returns the current term from the perspective of the sender.
*/
long long getTerm() const {
@@ -112,6 +122,7 @@ private:
long long _currentTerm = -1;
long long _configVersion = -1;
int _currentPrimaryIndex = kNoPrimary;
+ int _currentSyncSourceIndex = -1;
};
} // namespace rpc
diff --git a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp
index 499bbeb0b23..68d073be8cb 100644
--- a/src/mongo/rpc/metadata/repl_set_metadata_test.cpp
+++ b/src/mongo/rpc/metadata/repl_set_metadata_test.cpp
@@ -39,7 +39,7 @@ using repl::OpTime;
TEST(ReplResponseMetadataTest, Roundtrip) {
OpTime opTime(Timestamp(1234, 100), 5);
OpTime opTime2(Timestamp(7777, 100), 6);
- ReplSetMetadata metadata(3, opTime, opTime2, 6, 12);
+ ReplSetMetadata metadata(3, opTime, opTime2, 6, 12, -1);
ASSERT_EQ(opTime, metadata.getLastOpCommitted());
ASSERT_EQ(opTime2, metadata.getLastOpVisible());
@@ -47,13 +47,12 @@ TEST(ReplResponseMetadataTest, Roundtrip) {
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
- BSONObj expectedObj(
- BSON(kReplSetMetadataFieldName
- << BSON("term" << 3 << "lastOpCommitted"
- << BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm())
- << "lastOpVisible"
- << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm())
- << "configVersion" << 6 << "primaryIndex" << 12)));
+ BSONObj expectedObj(BSON(
+ kReplSetMetadataFieldName << BSON(
+ "term" << 3 << "lastOpCommitted" << BSON("ts" << opTime.getTimestamp() << "t"
+ << opTime.getTerm()) << "lastOpVisible"
+ << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm())
+ << "configVersion" << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1)));
BSONObj serializedObj = builder.obj();
ASSERT_EQ(expectedObj, serializedObj);
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
index e46c2582ea3..caa2b4ccfb6 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
@@ -112,7 +112,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) {
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -175,7 +175,7 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) {
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -508,7 +508,7 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) {
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -1084,7 +1084,7 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) {
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
- ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2354,7 +2354,7 @@ TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) {
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2390,7 +2390,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2419,7 +2419,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2444,7 +2444,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2470,7 +2470,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) {
request.metadata);
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
@@ -2531,7 +2531,7 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) {
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
- ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30);
+ ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);