summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-02-15 11:49:36 -0500
committerJudah Schvimer <judah@mongodb.com>2017-02-15 11:49:36 -0500
commit7284884f3d8f3cf1d1489579180c2637efcc42b2 (patch)
tree7e2749c6940232f2a70a182e6856bfb0c97f2454 /src/mongo/db
parentf6006942e76377c9434a61e76a7803eb83430591 (diff)
downloadmongo-7284884f3d8f3cf1d1489579180c2637efcc42b2.tar.gz
SERVER-27543 Process OplogQueryMetadata with backwards and forwards compatibility
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp9
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h11
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp45
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h6
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp4
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp26
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h9
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp5
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp77
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp124
-rw-r--r--src/mongo/db/repl/replication_coordinator.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp26
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h23
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp39
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp82
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h8
-rw-r--r--src/mongo/db/repl/replset_commands.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_selector.h19
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.cpp6
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator.h5
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp41
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp85
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp251
28 files changed, 657 insertions, 288 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index a4943f3f8a4..1874e73474b 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -80,7 +80,8 @@ public:
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) override;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
private:
BackgroundSync* _bgsync;
@@ -94,12 +95,14 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground
_bgsync(bgsync) {}
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
- const HostAndPort& source, const rpc::ReplSetMetadata& metadata) {
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
if (_bgsync->shouldStopFetching()) {
return true;
}
- return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata);
+ return DataReplicatorExternalStateImpl::shouldStopFetching(source, replMetadata, oqMetadata);
}
size_t getSize(const BSONObj& o) {
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index f0ae73e61ec..44f34ee208c 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -35,6 +35,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -88,16 +89,22 @@ public:
/**
* Forwards the parsed metadata in the query results to the replication system.
+ *
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
- virtual void processMetadata(const rpc::ReplSetMetadata& metadata) = 0;
+ virtual void processMetadata(const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0;
/**
* Evaluates quality of sync source. Accepts the current sync source; the last optime on this
* sync source (from metadata); and whether this sync source has a sync source (also from
* metadata).
+ *
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
virtual bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) = 0;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0;
/**
* This function creates an oplog buffer of the type specified at server startup.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 238f4717aa6..ab166f3bdc2 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -60,21 +60,46 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp
return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()};
}
-void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) {
- _replicationCoordinator->processReplSetMetadata(metadata, true /*advance the commit point*/);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+void DataReplicatorExternalStateImpl::processMetadata(
+ const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
+ OpTime newCommitPoint;
+ // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
+ // ReplSetMetadata.
+ if (oqMetadata) {
+ newCommitPoint = oqMetadata->getLastOpCommitted();
+ } else {
+ newCommitPoint = replMetadata.getLastOpCommitted();
+ }
+ _replicationCoordinator->advanceCommitPoint(newCommitPoint);
+
+ _replicationCoordinator->processReplSetMetadata(replMetadata);
+
+ if ((oqMetadata && (oqMetadata->getPrimaryIndex() != rpc::OplogQueryMetadata::kNoPrimary)) ||
+ (replMetadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary)) {
_replicationCoordinator->cancelAndRescheduleElectionTimeout();
}
}
-bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) {
+bool DataReplicatorExternalStateImpl::shouldStopFetching(
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
// Re-evaluate quality of sync target.
- if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) {
- log()
- << "Canceling oplog query because we have to choose a new sync source. Current source: "
- << source << ", OpTime " << metadata.getLastOpVisible()
- << ", its sync source index:" << metadata.getSyncSourceIndex();
+ if (_replicationCoordinator->shouldChangeSyncSource(source, replMetadata, oqMetadata)) {
+ // If OplogQueryMetadata was provided, its values were used to determine if we should
+ // change sync sources.
+ if (oqMetadata) {
+ log() << "Canceling oplog query due to OplogQueryMetadata. We have to choose a new "
+ "sync source. Current source: "
+ << source << ", OpTime " << oqMetadata->getLastOpApplied()
+ << ", its sync source index:" << oqMetadata->getSyncSourceIndex();
+
+ } else {
+ log() << "Canceling oplog query due to ReplSetMetadata. We have to choose a new sync "
+ "source. Current source: "
+ << source << ", OpTime " << replMetadata.getLastOpVisible()
+ << ", its sync source index:" << replMetadata.getSyncSourceIndex();
+ }
return true;
}
return false;
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 19a08b6c702..d4ce61119a6 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -52,10 +52,12 @@ public:
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
- void processMetadata(const rpc::ReplSetMetadata& metadata) override;
+ void processMetadata(const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) override;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
index 1337b170f32..8cb6a42293c 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
@@ -39,8 +39,8 @@ DataReplicatorExternalStateInitialSync::DataReplicatorExternalStateInitialSync(
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState) {
}
-bool DataReplicatorExternalStateInitialSync::shouldStopFetching(const HostAndPort&,
- const rpc::ReplSetMetadata&) {
+bool DataReplicatorExternalStateInitialSync::shouldStopFetching(
+ const HostAndPort&, const rpc::ReplSetMetadata&, boost::optional<rpc::OplogQueryMetadata>) {
// Since initial sync does not allow for sync source changes, it should not check if there are
// better sync sources. If there is a problem on the sync source, it will manifest itself in the
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
index 6ee2ce75fe4..e36fa710f47 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
@@ -44,7 +44,8 @@ public:
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) override;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 45ec02b0ad2..eeed8c240f3 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -53,16 +53,30 @@ OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOp
return {currentTerm, lastCommittedOpTime};
}
-void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& metadata) {
- metadataProcessed = metadata;
+void DataReplicatorExternalStateMock::processMetadata(
+ const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
+ replMetadataProcessed = replMetadata;
+ if (oqMetadata) {
+ oqMetadataProcessed = oqMetadata.get();
+ }
metadataWasProcessed = true;
}
-bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) {
+bool DataReplicatorExternalStateMock::shouldStopFetching(
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
lastSyncSourceChecked = source;
- syncSourceLastOpTime = metadata.getLastOpVisible();
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+
+ // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
+ // ReplSetMetadata.
+ if (oqMetadata) {
+ syncSourceLastOpTime = oqMetadata->getLastOpApplied();
+ syncSourceHasSyncSource = oqMetadata->getSyncSourceIndex() != -1;
+ } else {
+ syncSourceLastOpTime = replMetadata.getLastOpVisible();
+ syncSourceHasSyncSource = replMetadata.getSyncSourceIndex() != -1;
+ }
return shouldStopFetchingResult;
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 423e8f401dc..1e32647a518 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -49,10 +49,12 @@ public:
OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override;
- void processMetadata(const rpc::ReplSetMetadata& metadata) override;
+ void processMetadata(const rpc::ReplSetMetadata& metadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) override;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override;
@@ -71,7 +73,8 @@ public:
OpTime lastCommittedOpTime;
// Set by processMetadata.
- rpc::ReplSetMetadata metadataProcessed;
+ rpc::ReplSetMetadata replMetadataProcessed;
+ rpc::OplogQueryMetadata oqMetadataProcessed;
bool metadataWasProcessed = false;
// Set by shouldStopFetching.
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 4f841f2f2b7..790dfede040 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -171,8 +171,9 @@ public:
_syncSourceSelector->blacklistSyncSource(host, until);
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata) override {
- return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata);
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override {
+ return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata);
}
void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index df08e1b2174..5378689027e 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -184,6 +184,30 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash
return Status::OK();
}
+/**
+ * Parses a QueryResponse for the OplogQueryMetadata. If there is an error it returns it. If
+ * no OplogQueryMetadata is provided then it returns boost::none.
+ *
+ * OplogQueryMetadata is made optional for backwards compatibility.
+ * TODO (SERVER-27668): Make this non-optional in mongodb 3.8. When this stops being optional
+ * we can remove the duplicated fields in both metadata types and begin to always use
+ * OplogQueryMetadata's data.
+ */
+StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
+ Fetcher::QueryResponse queryResponse) {
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata = boost::none;
+ bool receivedOplogQueryMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kOplogQueryMetadataFieldName);
+ if (receivedOplogQueryMetadata) {
+ const auto& metadataObj = queryResponse.otherFields.metadata;
+ auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadataObj);
+ if (!metadataResult.isOK()) {
+ return metadataResult.getStatus();
+ }
+ oqMetadata = boost::make_optional<rpc::OplogQueryMetadata>(metadataResult.getValue());
+ }
+ return oqMetadata;
+}
} // namespace
StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
@@ -445,6 +469,15 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
auto opTimeWithHash = getLastOpTimeWithHashFetched();
+ auto oqMetadataResult = parseOplogQueryMetadata(queryResponse);
+ if (!oqMetadataResult.isOK()) {
+ error() << "invalid oplog query metadata from sync source " << _fetcher->getSource() << ": "
+ << oqMetadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ _finishCallback(oqMetadataResult.getStatus());
+ return;
+ }
+ auto oqMetadata = oqMetadataResult.getValue();
+
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
auto status = checkRemoteOplogStart(documents, opTimeWithHash);
@@ -471,10 +504,10 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
// Process replset metadata. It is important that this happen after we've validated the
// first batch, so we don't progress our knowledge of the commit point from a
// response that triggers a rollback.
- rpc::ReplSetMetadata metadata;
- bool receivedMetadata =
+ rpc::ReplSetMetadata replSetMetadata;
+ bool receivedReplMetadata =
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedMetadata) {
+ if (receivedReplMetadata) {
const auto& metadataObj = queryResponse.otherFields.metadata;
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
if (!metadataResult.isOK()) {
@@ -483,8 +516,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_finishCallback(metadataResult.getStatus());
return;
}
- metadata = metadataResult.getValue();
- _dataReplicatorExternalState->processMetadata(metadata);
+ replSetMetadata = metadataResult.getValue();
+
+ // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe
+ // to call processMetadata() in this if block.
+ _dataReplicatorExternalState->processMetadata(replSetMetadata, oqMetadata);
}
// Increment stats. We read all of the docs in the query.
@@ -511,19 +547,24 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
_lastFetched = opTimeWithHash;
}
- if (_dataReplicatorExternalState->shouldStopFetching(_fetcher->getSource(), metadata)) {
- _finishCallback(Status(ErrorCodes::InvalidSyncSource,
- str::stream() << "sync source " << _fetcher->getSource().toString()
- << " (last visible optime: "
- << metadata.getLastOpVisible().toString()
- << "; config version: "
- << metadata.getConfigVersion()
- << "; sync source index: "
- << metadata.getSyncSourceIndex()
- << "; primary index: "
- << metadata.getPrimaryIndex()
- << ") is no longer valid"),
- opTimeWithHash);
+ if (_dataReplicatorExternalState->shouldStopFetching(
+ _fetcher->getSource(), replSetMetadata, oqMetadata)) {
+ str::stream errMsg;
+ errMsg << "sync source " << _fetcher->getSource().toString();
+ errMsg << " (config version: " << replSetMetadata.getConfigVersion();
+ // If OplogQueryMetadata was provided, its values were used to determine if we should
+ // stop fetching from this sync source.
+ if (oqMetadata) {
+ errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString();
+ errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex();
+ errMsg << "; primary index: " << oqMetadata->getPrimaryIndex();
+ } else {
+ errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString();
+ errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex();
+ errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex();
+ }
+ errMsg << ") is no longer valid";
+ _finishCallback(Status(ErrorCodes::InvalidSyncSource, errMsg), opTimeWithHash);
return;
}
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 5c6db18013e..a94dac92122 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -97,7 +97,8 @@ protected:
/**
* Tests checkSyncSource result handling.
*/
- void testSyncSourceChecking(rpc::ReplSetMetadata* metadata);
+ void testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata,
+ rpc::OplogQueryMetadata* oqMetadata);
/**
* Tests handling of two batches of operations returned from query.
@@ -389,7 +390,7 @@ TEST_F(
_checkDefaultCommandObjectFields(cmdObj);
}
-TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocolVersion1) {
+TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersion1) {
auto metadataObj = OplogFetcher(&getExecutor(),
lastFetched,
source,
@@ -537,7 +538,7 @@ BSONObj makeCursorResponse(CursorId cursorId,
return bob.obj();
}
-TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
auto shutdownState = processSingleBatch(
{makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)),
@@ -546,7 +547,17 @@ TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) {
ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
}
-TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadataFn) {
+TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
+ auto shutdownState = processSingleBatch(
+ {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1)),
+ Milliseconds(0)});
+
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
+}
+
+TEST_F(OplogFetcherTest,
+ ValidMetadataInResponseWithoutOplogMetadataShouldBeForwardedToProcessMetadataFn) {
rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2);
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
@@ -557,10 +568,30 @@ TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadat
->getStatus());
ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT_EQUALS(metadata.getPrimaryIndex(),
- dataReplicatorExternalState->metadataProcessed.getPrimaryIndex());
+ dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
+ ASSERT_EQUALS(-1, dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
}
-TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
+TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+ ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT_EQUALS(replMetadata.getPrimaryIndex(),
+ dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
+ ASSERT_EQUALS(oqMetadata.getPrimaryIndex(),
+ dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
+}
+
+TEST_F(OplogFetcherTest,
+ MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2);
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
@@ -574,6 +605,30 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
+TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
+ processSingleBatch(
+ {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) {
+ ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ rpc::makeEmptyMetadata(),
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+}
+
TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) {
ASSERT_EQUALS(ErrorCodes::RemoteOplogStale,
processSingleBatch(makeCursorResponse(0, {}))->getStatus());
@@ -674,18 +729,21 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error"));
}
-void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
+void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata,
+ rpc::OplogQueryMetadata* oqMetadata) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
- BSONObj metadataObj;
- if (metadata) {
- BSONObjBuilder bob;
- ASSERT_OK(metadata->writeToMetadata(&bob));
- metadataObj = bob.obj();
+ BSONObjBuilder bob;
+ if (replMetadata) {
+ ASSERT_OK(replMetadata->writeToMetadata(&bob));
}
+ if (oqMetadata) {
+ ASSERT_OK(oqMetadata->writeToMetadata(&bob));
+ }
+ BSONObj metadataObj = bob.obj();
dataReplicatorExternalState->shouldStopFetchingResult = true;
@@ -703,7 +761,7 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) {
}
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
- testSyncSourceChecking(nullptr);
+ testSyncSourceChecking(nullptr, nullptr);
// Sync source optime and "hasSyncSource" are not available if the response does not
// contain metadata.
@@ -712,7 +770,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetche
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
-TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFetcher) {
rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
{{Seconds(10000), 0}, 1},
{{Seconds(20000), 0}, 1},
@@ -721,7 +779,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher)
2,
2);
- testSyncSourceChecking(&metadata);
+ testSyncSourceChecking(&metadata, nullptr);
// Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
@@ -729,8 +787,21 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher)
ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
+ rpc::ReplSetMetadata replMetadata(
+ lastFetched.opTime.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata({{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, 2);
+
+ testSyncSourceChecking(&replMetadata, &oqMetadata);
+
+ // Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
TEST_F(OplogFetcherTest,
- FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
+ FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceInReplSetMetadataStopsTheOplogFetcher) {
rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(),
{{Seconds(10000), 0}, 1},
{{Seconds(20000), 0}, 1},
@@ -739,7 +810,7 @@ TEST_F(OplogFetcherTest,
2,
-1);
- testSyncSourceChecking(&metadata);
+ testSyncSourceChecking(&metadata, nullptr);
// Sync source "hasSyncSource" is derived from metadata.
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
@@ -747,6 +818,25 @@ TEST_F(OplogFetcherTest,
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
+TEST_F(OplogFetcherTest,
+ FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
+ rpc::ReplSetMetadata replMetadata(lastFetched.opTime.getTerm(),
+ {{Seconds(10000), 0}, 1},
+ {{Seconds(20000), 0}, 1},
+ 1,
+ OID::gen(),
+ 2,
+ 2);
+ rpc::OplogQueryMetadata oqMetadata(
+ {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, -1);
+
+ testSyncSourceChecking(&replMetadata, &oqMetadata);
+
+ // Sync source "hasSyncSource" is derived from metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) {
ShutdownState shutdownState;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index a9d55d42259..a840b8e99ed 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -481,11 +481,16 @@ public:
/**
* Processes the ReplSetMetadata returned from a command run against another
* replica set member and so long as the config version in the metadata matches the replica set
- * config version this node currently has, updates the current term and optionally updates
- * this node's notion of the commit point.
+ * config version this node currently has, updates the current term.
+ *
+ * This does NOT update this node's notion of the commit point.
+ */
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0;
+
+ /**
+ * This updates the node's notion of the commit point.
*/
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint) = 0;
+ virtual void advanceCommitPoint(const OpTime& committedOptime) = 0;
/**
* Elections under protocol version 1 are triggered by a timer.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 0716601d6e7..623f5251977 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2126,13 +2126,12 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
result->append("config", _rsConfig.toBSON());
}
-void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint) {
+void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
EventHandle evh;
{
LockGuard topoLock(_topoMutex);
- evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint);
+ evh = _processReplSetMetadata_incallback(replMetadata);
}
if (evh) {
@@ -2146,13 +2145,10 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
}
EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
- const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) {
+ const rpc::ReplSetMetadata& replMetadata) {
if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
return EventHandle();
}
- if (advanceCommitPoint) {
- _setLastCommittedOpTime(replMetadata.getLastOpCommitted());
- }
return _updateTerm_incallback(replMetadata.getTerm());
}
@@ -3133,11 +3129,13 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
_externalState->setGlobalTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp());
}
-bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata) {
+bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
LockGuard topoLock(_topoMutex);
return _topCoord->shouldChangeSyncSource(
- currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now());
+ currentSource, getMyLastAppliedOpTime(), replMetadata, oqMetadata, _replExecutor.now());
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
@@ -3168,15 +3166,15 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
// need the majority to have this OpTime
OpTime committedOpTime =
votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
- _setLastCommittedOpTime_inlock(committedOpTime);
+ _advanceCommitPoint_inlock(committedOpTime);
}
-void ReplicationCoordinatorImpl::_setLastCommittedOpTime(const OpTime& committedOpTime) {
+void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _setLastCommittedOpTime_inlock(committedOpTime);
+ _advanceCommitPoint_inlock(committedOpTime);
}
-void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& committedOpTime) {
+void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) {
if (committedOpTime == _lastCommittedOpTime) {
return; // Hasn't changed, so ignore it.
} else if (committedOpTime < _lastCommittedOpTime) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index c481b30cdc6..9144e88b6b8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -200,8 +200,9 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint) override;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
+
+ virtual void advanceCommitPoint(const OpTime& committedOpTime) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -262,8 +263,10 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata) override;
+ virtual bool shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
virtual OpTime getLastCommittedOpTime() const override;
@@ -590,8 +593,7 @@ private:
* Updates the last committed OpTime to be "committedOpTime" if it is more recent than the
* current last committed OpTime.
*/
- void _setLastCommittedOpTime(const OpTime& committedOpTime);
- void _setLastCommittedOpTime_inlock(const OpTime& committedOpTime);
+ void _advanceCommitPoint_inlock(const OpTime& committedOpTime);
/**
* Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
@@ -984,12 +986,13 @@ private:
/**
* Callback that processes the ReplSetMetadata returned from a command run against another
* replica set member and so long as the config version in the metadata matches the replica set
- * config version this node currently has, updates the current term and optionally updates
- * this node's notion of the commit point.
+ * config version this node currently has, updates the current term.
+ *
+ * This does NOT update this node's notion of the commit point.
+ *
* Returns the finish event which is invalid if the process has already finished.
*/
- EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint);
+ EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
/**
* Prepares a metadata object for ReplSetMetadata.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 6c344de33cb..4c85e836bbb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -162,11 +162,13 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
replMetadata = responseStatus;
}
if (replMetadata.isOK()) {
+ // Arbiters are the only nodes allowed to advance their commit point via heartbeats.
+ if (getMemberState().arbiter()) {
+ advanceCommitPoint(replMetadata.getValue().getLastOpCommitted());
+ }
// Asynchronous stepdown could happen, but it will wait for _topoMutex and execute
// after this function, so we cannot and don't need to wait for it to finish.
- // Arbiters are the only nodes allowed to advance their commit point via heartbeats.
- bool advanceCommitPoint = getMemberState().arbiter();
- _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint);
+ _processReplSetMetadata_incallback(replMetadata.getValue());
}
}
const Date_t now = _replExecutor.now();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 0ebaa43cc77..1cc20369998 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -341,45 +341,6 @@ TEST_F(ReplCoordHBV1Test,
assertMemberState(MemberState::RS_RECOVERING, "0");
}
-TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) {
- // Tests that an arbiter will update its committed optime from the heartbeat metadata
- assertStartSuccess(fromjson("{_id:'mySet', version:1, protocolVersion:1, members:["
- "{_id:1, host:'node1:12345', arbiterOnly:true}, "
- "{_id:2, host:'node2:12345'}]}"),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_ARBITER));
-
- // calls processReplSetMetadata with the "committed" optime and verifies that the arbiter sets
- // its current optime to 'expected'
- auto test = [this](OpTime committedOpTime, OpTime expected) {
- // process heartbeat metadata directly
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(
- BSON(rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t"
- << committedOpTime.getTerm())
- << "lastOpVisible"
- << BSON("ts" << committedOpTime.getTimestamp() << "t"
- << committedOpTime.getTerm())
- << "configVersion"
- << 1
- << "primaryIndex"
- << 1
- << "term"
- << committedOpTime.getTerm()
- << "syncSourceIndex"
- << 1)));
- ASSERT_OK(metadata.getStatus());
- getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
-
- ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp());
- };
-
- OpTime committedOpTime{Timestamp{10, 10}, 10};
- test(committedOpTime, committedOpTime);
- OpTime olderOpTime{Timestamp{2, 2}, 9};
- test(olderOpTime, committedOpTime);
-}
-
TEST_F(ReplCoordHBV1Test, IgnoreTheContentsOfMetadataWhenItsReplicaSetIdDoesNotMatchOurs) {
// Tests that a secondary node will not update its committed optime from the heartbeat metadata
// if the replica set ID is inconsistent with the existing configuration.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index bb9e9a4535d..9e8a92a531f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -3837,8 +3837,8 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
<< 2
<< "syncSourceIndex"
<< 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->processReplSetMetadata(metadata.getValue());
+ ASSERT_EQUALS(0, getReplCoord()->getTerm());
// higher configVersion
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
@@ -3853,11 +3853,11 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
<< 2
<< "syncSourceIndex"
<< 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ ASSERT_EQUALS(0, getReplCoord()->getTerm());
}
-TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMetadataIsNewer) {
+TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer) {
// Ensure that LastCommittedOpTime updates when a newer OpTime comes in via ReplSetMetadata,
// but not if the OpTime is older than the current LastCommittedOpTime.
assertStartSuccess(BSON("_id"
@@ -3887,42 +3887,19 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
ASSERT_EQUALS(1, getReplCoord()->getTerm());
OpTime time(Timestamp(10, 0), 1);
+ OpTime oldTime(Timestamp(9, 0), 1);
getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1));
// higher OpTime, should change
- StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible"
- << BSON("ts" << Timestamp(10, 0) << "t" << 1)
- << "configVersion"
- << 2
- << "primaryIndex"
- << 2
- << "term"
- << 1
- << "syncSourceIndex"
- << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
- ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());
+ getReplCoord()->advanceCommitPoint(time);
+ ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
// lower OpTime, should not change
- StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
- rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible"
- << BSON("ts" << Timestamp(9, 0) << "t" << 1)
- << "configVersion"
- << 2
- << "primaryIndex"
- << 2
- << "term"
- << 1
- << "syncSourceIndex"
- << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->advanceCommitPoint(oldTime);
+ ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
}
-
TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurrentPrimaryIndex) {
// Ensure that the term is updated if and only if the term is greater than our current term.
// Ensure that currentPrimaryIndex is never altered by ReplSetMetadata.
@@ -3964,10 +3941,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
<< 3
<< "syncSourceIndex"
<< 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// lower term, should not change
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
@@ -3982,10 +3959,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
<< 2
<< "syncSourceIndex"
<< 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// same term, should not change
StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON(
@@ -4000,10 +3977,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
<< 3
<< "syncSourceIndex"
<< 1)));
- getReplCoord()->processReplSetMetadata(metadata3.getValue(), true);
- ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
+ getReplCoord()->processReplSetMetadata(metadata3.getValue());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
}
TEST_F(ReplCoordTest,
@@ -4095,23 +4072,10 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
HostAndPort("node1", 12345));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- // Update committed optime with ReplSetMetadata.
OpTime optime1{Timestamp(10, 0), 3};
OpTime optime2{Timestamp(11, 2), 5};
- StatusWith<rpc::ReplSetMetadata> metadataToProcess = rpc::ReplSetMetadata::readFromMetadata(
- BSON(rpc::kReplSetMetadataFieldName
- << BSON("lastOpCommitted" << optime1.toBSON() << "lastOpVisible" << optime1.toBSON()
- << "configVersion"
- << 2
- << "primaryIndex"
- << 2
- << "term"
- << 3
- << "syncSourceIndex"
- << 1)));
- getReplCoord()->processReplSetMetadata(metadataToProcess.getValue(), true);
-
+ getReplCoord()->advanceCommitPoint(optime1);
getReplCoord()->setMyLastAppliedOpTime(optime2);
// Get current rbid to check against.
@@ -4141,7 +4105,7 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1);
ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime());
ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2);
- ASSERT_EQ(replMetadata.getValue().getTerm(), 3);
+ ASSERT_EQ(replMetadata.getValue().getTerm(), 0);
ASSERT_EQ(replMetadata.getValue().getSyncSourceIndex(), -1);
ASSERT_EQ(replMetadata.getValue().getPrimaryIndex(), -1);
}
@@ -4884,7 +4848,8 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
// Set last committed optime via metadata.
rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
- getReplCoord()->processReplSetMetadata(syncSourceMetadata, true);
+ getReplCoord()->processReplSetMetadata(syncSourceMetadata);
+ getReplCoord()->advanceCommitPoint(optime);
getReplCoord()->createSnapshot(txn.get(), optime, SnapshotName(1));
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand(
@@ -4892,6 +4857,9 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm());
ASSERT_EQUALS(metadata.getLastOpVisible(), optime);
+
+ auto oqMetadataStatus = rpc::OplogQueryMetadata::readFromMetadata(cmd);
+ ASSERT_EQUALS(oqMetadataStatus.getStatus(), ErrorCodes::NoSuchKey);
}
TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 2a4432c2590..fd41930ce28 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -250,8 +250,9 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result)
// TODO
}
-void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint) {}
+void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {}
+
+void ReplicationCoordinatorMock::advanceCommitPoint(const OpTime& committedOptime) {}
void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {}
@@ -382,8 +383,10 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn
invariant(false);
}
-bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata) {
+bool ReplicationCoordinatorMock::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
invariant(false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 9825883ecbf..b211007a8dd 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -155,8 +155,9 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result);
- void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
- bool advanceCommitPoint) override;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
+
+ virtual void advanceCommitPoint(const OpTime& committedOptime) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -213,7 +214,8 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* txn);
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata);
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata);
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 6303888dde1..c2793e42764 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -640,7 +640,7 @@ public:
// New style update position command has metadata, which may inform the
// upstream of a higher term.
auto metadata = metadataResult.getValue();
- replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/);
+ replCoord->processReplSetMetadata(metadata);
}
// In the case of an update from a member with an invalid replica set config,
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 36cd7df3bf4..0c9fbc82d45 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -39,6 +40,7 @@ class Timestamp;
namespace rpc {
class ReplSetMetadata;
+class OplogQueryMetadata;
}
namespace repl {
@@ -73,16 +75,21 @@ public:
/**
* Determines if a new sync source should be chosen, if a better candidate sync source is
- * available. If the current sync source's last optime (visibleOpTime of metadata under
- * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than
- * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
- * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
- * and only has data up to "myLastOpTime", returns true.
+ * available. If the current sync source's last optime (visibleOpTime or appliedOpTime of
+ * metadata under protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion
+ * 0) is more than _maxSyncSourceLagSecs behind any syncable source, this function returns true.
+ * If we are running in ProtocolVersion 1, our current sync source is not primary, has no sync
+ * source and only has data up to "myLastOpTime", returns true.
*
* "now" is used to skip over currently blacklisted sync sources.
+ *
+ * OplogQueryMetadata is optional for compatibility with 3.4 servers that do not know to
+ * send OplogQueryMetadata.
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& metadata) = 0;
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp
index 0b697a2e234..df8e953c3cf 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.cpp
+++ b/src/mongo/db/repl/sync_source_selector_mock.cpp
@@ -55,8 +55,10 @@ void SyncSourceSelectorMock::setChooseNewSyncSourceHook_forTest(
_chooseNewSyncSourceHook = hook;
}
-bool SyncSourceSelectorMock::shouldChangeSyncSource(const HostAndPort&,
- const rpc::ReplSetMetadata&) {
+bool SyncSourceSelectorMock::shouldChangeSyncSource(
+ const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
return false;
}
diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h
index b53dfb6dd49..551eec9b846 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.h
+++ b/src/mongo/db/repl/sync_source_selector_mock.h
@@ -51,7 +51,9 @@ public:
void clearSyncSourceBlacklist() override;
HostAndPort chooseNewSyncSource(const OpTime& ot) override;
void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&) override;
+ bool shouldChangeSyncSource(const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
/**
* Sets a function that will be run every time chooseNewSyncSource() is called.
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 7dcf76164ec..0761b800b38 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -166,10 +166,13 @@ public:
* ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
*
* "now" is used to skip over currently blacklisted sync sources.
+ *
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const rpc::ReplSetMetadata& metadata,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
Date_t now) const = 0;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index f4e9c01cea7..79dba110db4 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2431,10 +2431,12 @@ long long TopologyCoordinatorImpl::getTerm() {
// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
// replset. Passing metadata is unnecessary.
-bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- const OpTime& myLastOpTime,
- const rpc::ReplSetMetadata& metadata,
- Date_t now) const {
+bool TopologyCoordinatorImpl::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const OpTime& myLastOpTime,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
+ Date_t now) const {
// Methodology:
// If there exists a viable sync source member other than currentSource, whose oplog has
// reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's,
@@ -2451,9 +2453,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
}
if (_rsConfig.getProtocolVersion() == 1 &&
- metadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
+ replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
log() << "Choosing new sync source because the config version supplied by " << currentSource
- << ", " << metadata.getConfigVersion() << ", does not match ours, "
+ << ", " << replMetadata.getConfigVersion() << ", does not match ours, "
<< _rsConfig.getConfigVersion();
return true;
}
@@ -2468,8 +2470,22 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
invariant(currentSourceIndex != _selfIndex);
- OpTime currentSourceOpTime =
- std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime());
+ // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
+ // ReplSetMetadata.
+ OpTime currentSourceOpTime;
+ int syncSourceIndex = -1;
+ int primaryIndex = -1;
+ if (oqMetadata) {
+ currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(),
+ _hbdata.at(currentSourceIndex).getAppliedOpTime());
+ syncSourceIndex = oqMetadata->getSyncSourceIndex();
+ primaryIndex = oqMetadata->getPrimaryIndex();
+ } else {
+ currentSourceOpTime = std::max(replMetadata.getLastOpVisible(),
+ _hbdata.at(currentSourceIndex).getAppliedOpTime());
+ syncSourceIndex = replMetadata.getSyncSourceIndex();
+ primaryIndex = replMetadata.getPrimaryIndex();
+ }
if (currentSourceOpTime.isNull()) {
// Haven't received a heartbeat from the sync source yet, so can't tell if we should
@@ -2479,16 +2495,15 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
// Change sync source if they are not ahead of us, and don't have a sync source,
// unless they are primary.
- if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 &&
- currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) {
+ if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
+ currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
std::stringstream logMessage;
logMessage << "Choosing new sync source because our current sync source, "
<< currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
<< ") which is not ahead of ours (" << myLastOpTime
<< "), it does not have a sync source, and it's not the primary";
- if (metadata.getPrimaryIndex() >= 0) {
- logMessage << " (" << _rsConfig.getMemberAt(metadata.getPrimaryIndex()).getHostAndPort()
- << " is)";
+ if (primaryIndex >= 0) {
+ logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
} else {
logMessage << " (sync source does not know the primary)";
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index f57d945465e..96708c90b48 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -158,7 +158,8 @@ public:
virtual void clearSyncSourceBlacklist();
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& myLastOpTime,
- const rpc::ReplSetMetadata& metadata,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
Date_t now) const;
virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
virtual void setElectionSleepUntil(Date_t newTime);
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index b61f8907b41..3519dd5e8d5 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/logger/logger.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -52,6 +53,7 @@
using std::unique_ptr;
using mongo::rpc::ReplSetMetadata;
+using mongo::rpc::OplogQueryMetadata;
namespace mongo {
namespace repl {
@@ -148,11 +150,22 @@ protected:
}
// Make the metadata coming from sync source. Only set visibleOpTime.
- ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) {
- return ReplSetMetadata(
- _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1);
+ ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime()) {
+ return ReplSetMetadata(_topo->getTerm(),
+ OpTime(),
+ visibleOpTime,
+ _currentConfig.getConfigVersion(),
+ OID(),
+ -1,
+ -1);
}
+ // Make the metadata coming from sync source. Only set lastAppliedOpTime.
+ OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime()) {
+ return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, -1, -1);
+ }
+
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -719,10 +732,10 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
- ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now()));
- ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("h2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("h3"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
getTopoCoord().chooseNewSyncSource(
now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -5349,8 +5362,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config of version 10.
ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
- ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host4"), OpTime(), metadata, makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) {
@@ -5358,7 +5371,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU
// "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime)
// for "host2"
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) {
@@ -5388,9 +5401,21 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- auto metadata = makeMetadata(lastOpTimeApplied);
- ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
}
@@ -5422,9 +5447,23 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- auto metadata = makeMetadata(fresherLastOpTimeApplied);
ASSERT_FALSE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now()));
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(fresherLastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(fresherLastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
}
@@ -5456,7 +5495,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
@@ -5490,18 +5529,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
@@ -5535,7 +5574,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown)
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -5565,7 +5604,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) {
@@ -5610,7 +5649,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB
// set up complete, time for actual check
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTest,
@@ -5661,7 +5700,7 @@ TEST_F(HeartbeatResponseTest,
// set up complete, time for actual check
startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 216bbceabc7..a171f7ca89e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/logger/logger.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -53,6 +54,7 @@
using std::unique_ptr;
using mongo::rpc::ReplSetMetadata;
+using mongo::rpc::OplogQueryMetadata;
namespace mongo {
namespace repl {
@@ -149,20 +151,28 @@ protected:
_currentConfig = config;
}
- // Make the metadata coming from sync source.
+ // Make the ReplSetMetadata coming from sync source.
// Only set visibleOpTime, primaryIndex and syncSourceIndex
- ReplSetMetadata makeMetadata(OpTime opTime = OpTime(),
- int primaryIndex = -1,
- int syncSourceIndex = -1) {
+ ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(),
+ int primaryIndex = -1,
+ int syncSourceIndex = -1) {
return ReplSetMetadata(_topo->getTerm(),
OpTime(),
- opTime,
+ visibleOpTime,
_currentConfig.getConfigVersion(),
OID(),
primaryIndex,
syncSourceIndex);
}
+ // Make the OplogQueryMetadata coming from sync source.
+ // Only set lastAppliedOpTime, primaryIndex and syncSourceIndex
+ OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
+ int primaryIndex = -1,
+ int syncSourceIndex = -1) {
+ return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
+ }
+
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
const std::string& setName,
MemberState memberState,
@@ -771,10 +781,20 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc
getTopoCoord().setForceSyncSourceIndex(1);
// force should cause shouldChangeSyncSource() to return true
// even if the currentSource is the force target
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(oldOpTime),
+ now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(newOpTime),
+ now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now()));
+ HostAndPort("h2"), OpTime(), makeReplSetMetadata(oldOpTime), boost::none, now()));
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now()));
+ HostAndPort("h3"), OpTime(), makeReplSetMetadata(newOpTime), boost::none, now()));
getTopoCoord().chooseNewSyncSource(
now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
@@ -3229,8 +3249,21 @@ TEST_F(HeartbeatResponseTestV1,
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
@@ -3271,8 +3304,16 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
// Show we like host2 while it is primary.
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied, 1),
+ now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(lastOpTimeApplied, 1),
+ boost::none,
+ now()));
// Show that we also like host2 while it has a sync source.
nextAction = receiveUpHeartbeat(HostAndPort("host2"),
@@ -3282,8 +3323,17 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now()));
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied, 2, 2),
+ now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(lastOpTimeApplied, 2, 2),
+ boost::none,
+ now()));
// Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress
// beyond our own.
@@ -3294,14 +3344,30 @@ TEST_F(HeartbeatResponseTestV1,
lastOpTimeApplied,
lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now()));
+ ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
// Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
lastOpTimeApplied,
- makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(
+ lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */),
+ now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */),
+ boost::none,
now()));
// But if it is secondary and has some progress beyond our own, we still like it.
@@ -3313,8 +3379,18 @@ TEST_F(HeartbeatResponseTestV1,
newerThanLastOpTimeApplied,
newerThanLastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now()));
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(newerThanLastOpTimeApplied),
+ now()));
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ lastOpTimeApplied,
+ makeReplSetMetadata(newerThanLastOpTimeApplied),
+ boost::none,
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
@@ -3345,8 +3421,17 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown
// set up complete, time for actual check
nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied);
ASSERT_NO_ACTION(nextAction.getAction());
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) {
@@ -3377,19 +3462,47 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla
getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
// unblacklist with too early a time (node should remained blacklisted)
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90));
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
// unblacklist and it should succeed
getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100));
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
@@ -3421,8 +3534,20 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
+
+ startCapturingLogMessages();
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
}
@@ -3454,8 +3579,22 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now()));
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(fresherLastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_FALSE(
+ getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(fresherLastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source"));
}
@@ -3486,8 +3625,21 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
// set up complete, time for actual check
startCapturingLogMessages();
- ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
}
@@ -3496,15 +3648,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since we do not use the member's heartbeatdata in pv1.
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(), now()));
+ HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config of version 10.
- ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
- ASSERT_TRUE(
- getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now()));
+ ReplSetMetadata replMetadata(0, OpTime(), OpTime(), 10, OID(), -1, -1);
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host4"), OpTime(), replMetadata, makeOplogQueryMetadata(), now()));
}
// TODO(dannenberg) figure out what this is trying to test..
@@ -4433,8 +4585,17 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+ // set up complete, time for actual check
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) {
@@ -4463,8 +4624,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR
ASSERT_NO_ACTION(nextAction.getAction());
// set up complete, time for actual check
- ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
- HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now()));
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(lastOpTimeApplied),
+ now()));
+
+ // set up complete, time for actual check
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"),
+ OpTime(),
+ makeReplSetMetadata(lastOpTimeApplied),
+ boost::none,
+ now()));
}
class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 {