summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSamyukta Lanka <samy.lanka@mongodb.com>2020-02-12 23:19:35 +0000
committerevergreen <evergreen@mongodb.com>2020-02-12 23:19:35 +0000
commit0a4a011397d7e399cbe93a2e2da60aec84997533 (patch)
treef204d0c765e9ef04d11e405706b227660af5366b /src/mongo/db
parent994fdd99bb6adb2cf9c7dd4061c2035188c2c8da (diff)
downloadmongo-0a4a011397d7e399cbe93a2e2da60aec84997533.tar.gz
SERVER-45470 Process metadata with each new batch in the new oplog fetcher
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp114
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h7
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp283
3 files changed, 365 insertions, 39 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7f7e8c11d0d..5cd9e5c967c 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -248,6 +248,31 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
}
return oqMetadata;
}
+
+/**
+ * Parses the cursor's metadata response for the OplogQueryMetadata. If there is an error it returns
+ * it. If no OplogQueryMetadata is provided then it returns boost::none.
+ *
+ * OplogQueryMetadata is made optional for backwards compatibility.
+ * TODO SERVER-27668: Make this non-optional. When this stops being optional we can remove the
+ * duplicated fields in both metadata types and begin to always use OplogQueryMetadata's data.
+ */
+StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
+ const BSONObj& metadata) {
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata = boost::none;
+
+ bool receivedOplogQueryMetadata = metadata.hasElement(rpc::kOplogQueryMetadataFieldName);
+ if (receivedOplogQueryMetadata) {
+ auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadata);
+ if (!metadataResult.isOK()) {
+ return metadataResult.getStatus();
+ }
+
+ oqMetadata = boost::make_optional(metadataResult.getValue());
+ }
+
+ return oqMetadata;
+}
} // namespace
StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
@@ -708,6 +733,22 @@ Mutex* NewOplogFetcher::_getMutex() noexcept {
return &_mutex;
}
+std::string NewOplogFetcher::toString() {
+ stdx::lock_guard lock(_mutex);
+ str::stream output;
+ output << "OplogFetcher -";
+ output << " last optime fetched: " << _lastFetched.toString();
+ output << " source: " << _source.toString();
+ output << " namespace: " << _nss.toString();
+ output << " active: " << _isActive_inlock();
+ output << " shutting down?:" << _isShuttingDown_inlock();
+ output << " first batch: " << _firstBatch;
+ output << " initial find timeout: " << _getInitialFindMaxTime();
+ output << " retried find timeout: " << _getRetriedFindMaxTime();
+ output << " awaitData timeout: " << _awaitDataTimeout;
+ return output;
+}
+
OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const {
return _getLastOpTimeFetched();
}
@@ -809,6 +850,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
return;
}
+ _setMetadataWriterAndReader();
_createNewCursor(true /* initialFind */);
while (true) {
@@ -874,7 +916,7 @@ Status NewOplogFetcher::_connect() {
uassertStatusOK(_conn->connect(_source, "OplogFetcher"));
uassertStatusOK(replAuthenticate(_conn.get())
.withContext(str::stream()
- << "OplogFecther failed to authenticate to "
+ << "OplogFetcher failed to authenticate to "
<< _source));
// Reset any state needed to track restarts on successful connection.
_oplogFetcherRestartDecision->fetchSuccessful(this);
@@ -890,6 +932,23 @@ Status NewOplogFetcher::_connect() {
return connectStatus;
}
+void NewOplogFetcher::_setMetadataWriterAndReader() {
+ invariant(_conn);
+
+ _conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) {
+ *metadataBob << rpc::kReplSetMetadataFieldName << 1;
+ *metadataBob << rpc::kOplogQueryMetadataFieldName << 1;
+ metadataBob->appendElements(ReadPreferenceSetting::secondaryPreferredMetadata());
+ return Status::OK();
+ });
+
+ _conn->setReplyMetadataReader(
+ [this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) {
+ _metadataObj = metadataObj.getOwned();
+ return Status::OK();
+ });
+}
+
BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder queryBob;
@@ -1039,15 +1098,21 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
LOG(2) << "oplog fetcher read 0 operations from remote oplog";
}
- // TODO SERVER-45470: parse metadata.
+ auto oqMetadataResult = parseOplogQueryMetadata(_metadataObj);
+ if (!oqMetadataResult.isOK()) {
+ error() << "invalid oplog query metadata from sync source " << _source << ": "
+ << oqMetadataResult.getStatus() << ": " << _metadataObj;
+ return oqMetadataResult.getStatus();
+ }
+ auto oqMetadata = oqMetadataResult.getValue();
// This lastFetched value is the last OpTime from the previous batch.
auto lastFetched = _getLastOpTimeFetched();
if (_firstBatch) {
- // TODO SERVER-45470: use metadata to populate remoteRBID and remoteLastApplied.
- auto remoteRBID = boost::none;
- auto remoteLastApplied = boost::none;
+ auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
+ auto remoteLastApplied =
+ oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
auto status = checkRemoteOplogStart(documents,
lastFetched,
remoteLastApplied,
@@ -1083,7 +1148,25 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
}
auto info = validateResult.getValue();
- // TODO SERVER-45470: Process replset metadata.
+ // Process replset metadata. It is important that this happen after we've validated the
+ // first batch, so we don't progress our knowledge of the commit point from a
+ // response that triggers a rollback.
+ rpc::ReplSetMetadata replSetMetadata;
+ bool receivedReplMetadata = _metadataObj.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedReplMetadata) {
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(_metadataObj);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << _source << ": "
+ << metadataResult.getStatus() << ": " << _metadataObj;
+ return metadataResult.getStatus();
+ }
+ replSetMetadata = metadataResult.getValue();
+
+ // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe
+ // to call processMetadata() in this if block.
+ invariant(oqMetadata);
+ _dataReplicatorExternalState->processMetadata(replSetMetadata, *oqMetadata);
+ }
// Increment stats. We read all of the docs in the query.
opsReadStats.increment(info.networkDocumentCount);
@@ -1100,7 +1183,24 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// of this fetcher.
_startingPoint = StartingPoint::kSkipFirstDoc;
- // TODO SERVER-45470: check for invalid sync source.
+ if (_dataReplicatorExternalState->shouldStopFetching(_source, replSetMetadata, oqMetadata)) {
+ str::stream errMsg;
+ errMsg << "sync source " << _source.toString();
+ errMsg << " (config version: " << replSetMetadata.getConfigVersion();
+ // If OplogQueryMetadata was provided, its values were used to determine if we should
+ // stop fetching from this sync source.
+ if (oqMetadata) {
+ errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString();
+ errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex();
+ errMsg << "; primary index: " << oqMetadata->getPrimaryIndex();
+ } else {
+ errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString();
+ errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex();
+ errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex();
+ }
+ errMsg << ") is no longer valid";
+ return Status(ErrorCodes::InvalidSyncSource, errMsg);
+ }
// We have now processed the batch and should move forward our view of _lastFetched.
if (documents.size() > 0) {
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 0c64c44ede3..69af79c6c6f 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -391,7 +391,7 @@ public:
/**
* Prints out the status and settings of the oplog fetcher.
*/
- std::string toString() const;
+ std::string toString();
// ================== Test support API ===================
@@ -481,6 +481,11 @@ private:
Status _connect();
/**
+ * Sets the RequestMetadataWriter and ReplyMetadataReader on the connection.
+ */
+ void _setMetadataWriterAndReader();
+
+ /**
* Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData,
* exhaust cursor.
*
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index ced49a76235..063f32f0762 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -1092,6 +1092,14 @@ bool blockedOnNetworkSoon(MockDBClientConnection* conn) {
return false;
}
+void validateMetadataRequest(OpMsg msg) {
+ ASSERT_EQUALS(1, msg.body.getIntField("$replData"));
+ ASSERT_EQUALS(1, msg.body.getIntField("$oplogQueryData"));
+ ASSERT_BSONOBJ_EQ(BSON("mode"
+ << "secondaryPreferred"),
+ msg.body.getObjectField("$readPreference"));
+}
+
void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) {
auto msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
@@ -1106,7 +1114,8 @@ void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) {
<< "local"
<< "afterClusterTime" << Timestamp(0, 1)),
msg.body.getObjectField("readConcern"));
- // TODO SERVER-45470: Test the metadata sent.
+
+ validateMetadataRequest(msg);
}
void validateGetMoreCommand(Message m,
@@ -1135,6 +1144,8 @@ void validateGetMoreCommand(Message m,
} else {
ASSERT_FALSE(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
}
+
+ validateMetadataRequest(msg);
}
// Simulate a response to a single outgoing client request and return the client request. Use this
@@ -1180,7 +1191,9 @@ protected:
static const int rbid = 2;
static const int primaryIndex = 2;
static const int syncSourceIndex = 2;
+ static const rpc::OplogQueryMetadata oqMetadata;
static const rpc::OplogQueryMetadata staleOqMetadata;
+ static const rpc::ReplSetMetadata replSetMetadata;
// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
@@ -1204,7 +1217,8 @@ protected:
std::unique_ptr<ShutdownState> processSingleBatch(const Message& response,
bool shouldShutdown = false,
- bool requireFresherSyncSource = true);
+ bool requireFresherSyncSource = true,
+ bool lastFetchedShouldAdvance = false);
/**
* Tests checkSyncSource result handling.
@@ -1226,12 +1240,19 @@ protected:
std::unique_ptr<MockRemoteDBServer> _mockServer;
};
-const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2);
+const int NewOplogFetcherTest::rbid;
+const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2);
+const rpc::OplogQueryMetadata NewOplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex);
+
const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata(
{staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex);
+const rpc::ReplSetMetadata NewOplogFetcherTest::replSetMetadata =
+ rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, OID(), primaryIndex, syncSourceIndex);
+
void NewOplogFetcherTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
launchExecutorThread();
@@ -1296,7 +1317,7 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer
source,
_createConfig(),
std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
- -1,
+ rbid,
requireFresherSyncSource,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
@@ -1312,7 +1333,10 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer
}
std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch(
- const Message& response, bool shouldShutdown, bool requireFresherSyncSource) {
+ const Message& response,
+ bool shouldShutdown,
+ bool requireFresherSyncSource,
+ bool lastFetchedShouldAdvance) {
auto shutdownState = std::make_unique<ShutdownState>();
// Create an oplog fetcher with no retries.
@@ -1335,6 +1359,10 @@ std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch(
}
oplogFetcher->join();
+ if (!lastFetchedShouldAdvance) {
+ ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
+ }
+
return shutdownState;
}
@@ -1350,8 +1378,7 @@ void NewOplogFetcherTest::testSyncSourceChecking(
dataReplicatorExternalState->shouldStopFetchingResult = true;
auto shutdownState =
- processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj),
- true /* shouldShutdown */);
+ processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj));
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
}
@@ -1520,7 +1547,7 @@ TEST_F(NewOplogFetcherTest,
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
// Update lastFetched before it is updated by getting the next batch.
lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
@@ -1655,7 +1682,7 @@ TEST_F(
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto conn = oplogFetcher->getDBClientConnection_forTest();
@@ -1692,6 +1719,166 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) {
failPoint->setMode(FailPoint::off);
}
+TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj =
+ BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj =
+ BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+}
+
+DEATH_TEST_F(NewOplogFetcherTest,
+ ValidMetadataInResponseWithoutOplogMetadataInvariants,
+ "Invariant failure oqMetadata") {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none);
+
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj));
+}
+
+TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
+ CursorId cursorId = 0LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+
+ ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+ ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT_EQUALS(replSetMetadata.getPrimaryIndex(),
+ dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
+ ASSERT_EQUALS(oqMetadata.getPrimaryIndex(),
+ dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
+}
+
+TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+
+ rpc::OplogQueryMetadata oplogQueryMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, primaryIndex, syncSourceIndex);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(staleOpTime);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata);
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+
+ rpc::OplogQueryMetadata oplogQueryMetadata(
+ {staleOpTime, staleWallTime}, lastFetched, rbid, primaryIndex, syncSourceIndex);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(NewOplogFetcherTest,
+ MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
+ CursorId cursorId = 22LL;
+ auto entry = makeNoopOplogEntry(staleOpTime);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata);
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj),
+ false /* shouldShutdown */,
+ false /* requireFresherSyncSource */)
+ ->getStatus());
+
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) {
+ // This tests the case where the sync source metadata is behind us but we get a document which
+ // is equal to us. Since that means the metadata is stale and can be ignored, we should accept
+ // this sync source.
+ CursorId cursorId = 0LL;
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata);
+ auto entry = makeNoopOplogEntry(lastFetched);
+
+ ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj),
+ false /* shouldShutdown */,
+ false /* requireFresherSyncSource */)
+ ->getStatus());
+
+ ASSERT(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(NewOplogFetcherTest,
+ MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
+ CursorId cursorId = 0LL;
+ rpc::OplogQueryMetadata oplogQueryMetadata(
+ {staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
+ auto entry = makeNoopOplogEntry(lastFetched);
+
+ ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj),
+ false /* shouldShutdown */,
+ false /* requireFresherSyncSource */)
+ ->getStatus());
+ ASSERT(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(NewOplogFetcherTest,
+ MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
+ CursorId cursorId = 22LL;
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none);
+ auto entry = makeNoopOplogEntry(Seconds(456));
+
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
+ CursorId cursorId = 22LL;
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+ auto entry = makeNoopOplogEntry(Seconds(456));
+
+ ASSERT_EQUALS(
+ ErrorCodes::OplogStartMissing,
+ processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj}))->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+}
+
+TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) {
+ CursorId cursorId = 0LL;
+ auto entry = makeNoopOplogEntry(lastFetched);
+
+ ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, {}))->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+}
+
TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) {
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, processSingleBatch(Message())->getStatus());
}
@@ -1745,7 +1932,7 @@ TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Update lastFetched before it is updated by getting the next batch.
@@ -1783,7 +1970,7 @@ TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownO
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Update lastFetched before it is updated by getting the next batch.
@@ -1853,7 +2040,7 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Update lastFetched before it is updated by getting the next batch.
@@ -1997,7 +2184,7 @@ TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Update lastFetched before it is updated by getting the next batch.
@@ -2070,7 +2257,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Update lastFetched before it is updated by getting the next batch.
@@ -2149,7 +2336,7 @@ TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatu
CursorId cursorId = 0LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto firstBatch = {firstEntry, secondEntry};
// Creating the cursor will succeed, but the oplog fetcher will shut down after receiving this
@@ -2181,7 +2368,7 @@ TEST_F(NewOplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissin
TEST_F(NewOplogFetcherTest,
MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
CursorId cursorId = 22LL;
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(
ErrorCodes::InvalidBSON,
processSingleBatch(makeFirstBatch(cursorId, {BSONObj()}, metadataObj))->getStatus());
@@ -2192,7 +2379,7 @@ TEST_F(
LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
}
@@ -2201,7 +2388,7 @@ TEST_F(NewOplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(
ErrorCodes::NoSuchKey,
processSingleBatch(makeFirstBatch(cursorId,
@@ -2215,7 +2402,7 @@ TEST_F(NewOplogFetcherTest,
TEST_F(NewOplogFetcherTest,
TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
CursorId cursorId = 22LL;
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
processSingleBatch(makeFirstBatch(cursorId,
{makeNoopOplogEntry(lastFetched),
@@ -2232,11 +2419,13 @@ TEST_F(NewOplogFetcherTest,
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto shutdownState = processSingleBatch(
makeFirstBatch(cursorId, {firstEntry, secondEntry, thirdEntry}, metadataObj),
- true /* shouldShutdown */);
+ true /* shouldShutdown */,
+ true /* requireFresherSyncSource */,
+ true /* lastFetchedShouldAdvance */);
ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
@@ -2291,7 +2480,7 @@ TEST_F(NewOplogFetcherTest,
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
// Update lastFetched before it is updated by getting the next batch.
lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
@@ -2366,7 +2555,7 @@ TEST_F(NewOplogFetcherTest,
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
// Update lastFetched before it is updated by getting the next batch.
lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
@@ -2409,7 +2598,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
enqueueDocumentsFn = [](NewOplogFetcher::Documents::const_iterator,
NewOplogFetcher::Documents::const_iterator,
@@ -2422,6 +2611,38 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum
ASSERT_EQ(Status(ErrorCodes::InternalError, "my custom error"), shutdownState->getStatus());
}
+TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
+ testSyncSourceChecking(boost::none, boost::none);
+
+ // Sync source optime and "hasSyncSource" are not available if the response does not
+ // contain metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
+TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
+ testSyncSourceChecking(replSetMetadata, oqMetadata);
+
+ // Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
+TEST_F(NewOplogFetcherTest,
+ FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
+ rpc::OplogQueryMetadata oplogQueryMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1);
+ testSyncSourceChecking(replSetMetadata, oplogQueryMetadata);
+
+ // Sync source "hasSyncSource" is derived from metadata.
+ ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
+ ASSERT_EQUALS(oplogQueryMetadata.getLastOpApplied(),
+ dataReplicatorExternalState->syncSourceLastOpTime);
+ ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+}
+
TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = BSON("o" << BSON("msg"
@@ -2646,7 +2867,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection)
CursorId cursorId = 0LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
// Allow the cursor re-initialization to succeed. But the OplogFetcher will shut down with an OK
// status after receiving this batch because the cursor id is 0.
@@ -2671,7 +2892,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
// Simulate closing the cursor and the OplogFetcher should exit with an OK status.
processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj));
@@ -2711,7 +2932,7 @@ TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto conn = oplogFetcher->getDBClientConnection_forTest();
// First batch for the initial find command.
@@ -2757,17 +2978,17 @@ TEST_F(NewOplogFetcherTest, GetMoreEmptyBatch) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
auto conn = oplogFetcher->getDBClientConnection_forTest();
// Creating the cursor will succeed.
- auto m =
- processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj));
+ auto m = processSingleRequestResponse(
+ conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true);
// Empty batch from first getMore.
processSingleRequestResponse(
- conn, makeSubsequentBatch(cursorId, {}, metadataObj, true /* moreToCome */));
+ conn, makeSubsequentBatch(cursorId, {}, metadataObj, true /* moreToCome */), true);
// Terminating empty batch from exhaust stream with cursorId 0.
processSingleExhaustResponse(oplogFetcher->getDBClientConnection_forTest(),