summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-06-01 18:47:00 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-27 13:09:38 +0000
commit694ed4153b9d5424b5d169fea5c68f99d4dfb45a (patch)
treeb3cffb5dce360007663e53a2aba68a77f89fdf86 /src
parent1d3972cea1ae1a35e398fe61882cd455d78c01d1 (diff)
downloadmongo-694ed4153b9d5424b5d169fea5c68f99d4dfb45a.tar.gz
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/catalog/collection_mock.h2
-rw-r--r--src/mongo/db/exec/collection_scan.cpp32
-rw-r--r--src/mongo/db/exec/collection_scan.h5
-rw-r--r--src/mongo/db/exec/collection_scan_common.h3
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp212
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h93
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp704
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h2
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/query/classic_stage_builder.cpp1
-rw-r--r--src/mongo/db/query/planner_access.cpp2
-rw-r--r--src/mongo/db/query/query_planner.cpp3
-rw-r--r--src/mongo/db/query/query_planner_params.h4
-rw-r--r--src/mongo/db/query/query_solution.cpp1
-rw-r--r--src/mongo/db/query/query_solution.h3
19 files changed, 623 insertions, 457 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 2ee434d2c08..fe34bf4f7d8 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -395,6 +395,7 @@ error_codes:
- {code: 325, name: TenantMigrationAborted}
+ - {code: 326, name: OplogQueryMinTsMissing}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index 831d11d57f3..1466108d5ff 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -38,7 +38,7 @@ namespace mongo {
/**
* This class comprises a mock Collection for use by CollectionCatalog unit tests.
*/
-class CollectionMock final : public Collection {
+class CollectionMock : public Collection {
public:
CollectionMock(const NamespaceString& ns)
: CollectionMock(ns, std::unique_ptr<IndexCatalog>()) {}
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 00547983fc8..ffabd1886b4 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -79,6 +79,12 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
}
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present.
+ if (params.assertMinTsHasNotFallenOffOplog) {
+ invariant(params.shouldTrackLatestOplogTimestamp);
+ invariant(params.minTs);
+ }
+
if (params.resumeAfterRecordId) {
// The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we
// only support in the forward direction.
@@ -187,19 +193,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
if (!record) {
- // We just hit EOF. If we are tailable and have already returned data, leave us in a
- // state to pick up where we left off on the next call to work(). Otherwise EOF is
- // permanent.
+ // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up
+ // where we left off on the next call to work(). Otherwise, the EOF is permanent.
if (_params.tailable && !_lastSeenId.isNull()) {
_cursor.reset();
} else {
_commonStats.isEOF = true;
}
-
return PlanStage::IS_EOF;
}
_lastSeenId = record->id;
+ if (_params.assertMinTsHasNotFallenOffOplog) {
+ assertMinTsHasNotFallenOffOplog(*record);
+ }
if (_params.shouldTrackLatestOplogTimestamp) {
setLatestOplogEntryTimestamp(*record);
}
@@ -223,6 +230,23 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
}
+void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
+ // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
+ // is earlier than or equal to the minTs time.
+ auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson()));
+ invariant(_specificStats.docsTested == 0);
+ const bool isNewRS =
+ oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
+ oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
+ uassert(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ // We don't need to check this assertion again after we've confirmed the first oplog event.
+ _params.assertMinTsHasNotFallenOffOplog = false;
+}
+
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 74d9d8ddb29..3d79915b0fe 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -100,6 +100,11 @@ private:
*/
void setLatestOplogEntryTimestamp(const Record& record);
+ /**
+ * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog.
+ */
+ void assertMinTsHasNotFallenOffOplog(const Record& record);
+
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index 659f6659eb1..aa0c790d1fd 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -66,6 +66,9 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
// across a sharded cluster.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 509369c31ae..4798bc2d247 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -416,6 +416,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/service_context_d',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
+ '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/s/query/router_exec_stage',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 9405edb87a2..1d4480cb4ba 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -384,7 +384,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
// to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'.
startFrom = tokenData.clusterTime;
if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) {
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData);
} else {
resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData);
}
@@ -396,7 +396,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
"Only one type of resume option is allowed, but multiple were found.",
!resumeStage);
startFrom = *startAtOperationTime;
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom);
}
// We can only run on a replica set, or through mongoS. Confirm that this is the case.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 8d023d1ece4..2930854ca2f 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -37,7 +37,7 @@ using boost::intrusive_ptr;
namespace mongo {
namespace {
-using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
+using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus;
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
@@ -85,10 +85,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
} else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) {
// This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the
// applyOps was irrelevant (meaning it was an operation on a collection or DB not being
- // watched). If we are looking for the resume token on a shard then this simply means that
- // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
- uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge);
- // We are running on a merging shard. Signal that we have read beyond the resume token.
+ // watched). Signal that we have read beyond the resume token.
return ResumeStatus::kSurpassedToken;
}
@@ -169,17 +166,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
}
} // namespace
-const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
- return kStageName.rawData();
-}
-
-Value DocumentSourceEnsureResumeTokenPresent::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // We only serialize this stage in the context of explain.
- return explain
- ? Value(DOC(kStageName << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
- : Value();
-}
+DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSourceCheckResumability(expCtx, std::move(token)) {}
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
@@ -187,170 +176,121 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon
return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
-DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
+ return kStageName.rawData();
+}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() {
+ // If we have already verified the resume token is present, return the next doc immediately.
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
- // We've already verified the resume token is present.
return pSource->getNext();
}
- // The incoming documents are sorted by resume token. We examine a range of documents that have
- // the same clusterTime as the client's resume token, until we either find (and swallow) a match
- // for the token or pass the point in the stream where it should have been.
+ auto nextInput = GetNextResult::makeEOF();
+
+ // If we are starting after an 'invalidate' and the invalidating command (e.g. collection drop)
+ // occurred at the same clusterTime on more than one shard, then we may see multiple identical
+ // resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken.
while (_resumeStatus != ResumeStatus::kSurpassedToken) {
- auto nextInput = pSource->getNext();
+ // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This
+ // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token.
+ nextInput = DocumentSourceCheckResumability::doGetNext();
- // If there are no more results, return EOF. We will continue checking for the client's
- // resume token the next time the getNext method is called.
+ // If there are no more results, return EOF. We will continue checking for the resume token
+ // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed
+ // the resume token on this iteration.
if (!nextInput.isAdvanced()) {
+ invariant(_resumeStatus != ResumeStatus::kSurpassedToken);
return nextInput;
}
- // Check the current event. If we found and swallowed the resume token, then the result will
- // be the first event in the stream which should be returned to the user. Otherwise, we keep
- // iterating the stream until we find an event matching the client's resume token.
- if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) {
- return *nextOutput;
- }
- }
- MONGO_UNREACHABLE;
-}
-boost::optional<DocumentSource::GetNextResult>
-DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken(
- const DocumentSource::GetNextResult& nextInput) {
- // We should only ever call this method when we have a new event to examine.
- invariant(nextInput.isAdvanced());
- auto resumeStatus =
- compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
- switch (resumeStatus) {
- case ResumeStatus::kCheckNextDoc:
- return boost::none;
- case ResumeStatus::kFoundToken:
- // We found the resume token. If we are starting after an 'invalidate' token and the
- // invalidating command (e.g. collection drop) occurred at the same clusterTime on
- // more than one shard, then we will see multiple identical 'invalidate' events
- // here. We should continue to swallow all of them to ensure that the new stream
- // begins after the collection drop, and that it is not immediately re-invalidated.
- if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) {
- _resumeStatus = ResumeStatus::kFoundToken;
- return boost::none;
- }
- // If the token is not an invalidate or if we are not running in a cluster, we mark
- // the stream as having surpassed the resume token, skip the current event since the
- // client has already seen it, and return the next event in the stream.
- _resumeStatus = ResumeStatus::kSurpassedToken;
- return pSource->getNext();
- case ResumeStatus::kSurpassedToken:
- // If we have surpassed the point in the stream where the resume token should have
- // been and we did not see the token itself, then this stream cannot be resumed.
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "cannot resume stream; the resume token was not found. "
- << nextInput.getDocument()["_id"].getDocument().toString(),
- _resumeStatus == ResumeStatus::kFoundToken);
- _resumeStatus = ResumeStatus::kSurpassedToken;
- return nextInput;
+ // When we reach here, we have either found the resume token or surpassed it.
+ invariant(_resumeStatus != ResumeStatus::kCheckNextDoc);
+
+ // If the resume status is kFoundToken, record the fact that we have seen the token. When we
+ // have surpassed the resume token, we will assert that we saw the token before doing so. We
+ // cannot simply assert once and then assume we have surpassed the token, because in certain
+ // cases we may see 1..N identical tokens and must swallow them all before proceeding.
+ _hasSeenResumeToken = (_hasSeenResumeToken || _resumeStatus == ResumeStatus::kFoundToken);
}
- MONGO_UNREACHABLE;
-}
-const char* DocumentSourceShardCheckResumability::getSourceName() const {
- return kStageName.rawData();
-}
+ // Assert that before surpassing the resume token, we observed the token itself in the stream.
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "cannot resume stream; the resume token was not found. "
+ << nextInput.getDocument()["_id"].getDocument().toString(),
+ _hasSeenResumeToken);
-Value DocumentSourceShardCheckResumability::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // We only serialize this stage in the context of explain.
- return explain
- ? Value(DOC(kStageName << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
- : Value();
+ // At this point, we have seen the token and swallowed it. Return the next event to the client.
+ invariant(_hasSeenResumeToken && _resumeStatus == ResumeStatus::kSurpassedToken);
+ return nextInput;
}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+DocumentSourceCheckResumability::DocumentSourceCheckResumability(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSource(getSourceName(), expCtx), _tokenFromClient(std::move(token)) {}
+
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
// We are resuming from a point in time, not an event. Seed the stage with a high water mark.
return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData());
}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
+ return new DocumentSourceCheckResumability(expCtx, std::move(token));
}
-DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceCheckResumability::getSourceName() const {
+ return kStageName.rawData();
+}
-DocumentSource::GetNextResult DocumentSourceShardCheckResumability::doGetNext() {
- if (_surpassedResumeToken)
+DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() {
+ if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
+ }
- while (!_surpassedResumeToken) {
- auto nextInput = pSource->getNext();
+ while (_resumeStatus != ResumeStatus::kSurpassedToken) {
+ // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change
+ // stream filter has fallen off the oplog. Catch this and throw a more explanatory error.
+ auto nextInput = [this]() {
+ try {
+ return pSource->getNext();
+ } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
+ uasserted(ErrorCodes::ChangeStreamHistoryLost,
+ "Resume of change stream was not possible, as the resume point may no "
+ "longer be in the oplog.");
+ }
+ }();
- // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us
- // from continually returning EOF in cases where the resume point has fallen off the oplog.
+ // If we hit EOF, return it immediately.
if (!nextInput.isAdvanced()) {
- _assertOplogHasEnoughHistory(nextInput);
return nextInput;
}
+
// Determine whether the current event sorts before, equal to or after the resume token.
- auto resumeStatus =
+ _resumeStatus =
compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
- switch (resumeStatus) {
+ switch (_resumeStatus) {
case ResumeStatus::kCheckNextDoc:
// If the result was kCheckNextDoc, we are resumable but must swallow this event.
- _verifiedOplogHasEnoughHistory = true;
continue;
case ResumeStatus::kSurpassedToken:
- // In this case the resume token wasn't found; it must be on another shard. We must
- // examine the oplog to ensure that its history reaches back to before the resume
- // token, otherwise we may have missed events that fell off the oplog. If we can
- // resume, fall through into the following case and set _surpassedResumeToken.
- _assertOplogHasEnoughHistory(nextInput);
+ // In this case the resume token wasn't found; it may be on another shard. However,
+ // since the oplog scan did not throw, we know that we are resumable. Fall through
+ // into the following case and return the document.
case ResumeStatus::kFoundToken:
- // We found the actual token! Set _surpassedResumeToken and return the result.
- _surpassedResumeToken = true;
+ // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it.
return nextInput;
}
}
MONGO_UNREACHABLE;
}
-void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
- const GetNextResult& nextInput) {
- // If we have already verified that this stream is resumable, return immediately.
- if (_verifiedOplogHasEnoughHistory) {
- return;
- }
- // Look up the first document in the oplog and compare it with the resume token's clusterTime.
- auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
- auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = Pipeline::makePipeline({matchSpec}, firstEntryExpCtx);
- if (auto first = pipeline->getNext()) {
- auto firstOplogEntry = Value(*first);
- // If the first entry in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the resume token. No events earlier than the token can
- // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that
- // the timestamp of the first oplog entry is earlier than that of the resume token.
- using repl::kInitiatingSetMsg;
- const bool isNewRS =
- Value::compare(firstOplogEntry["o"]["msg"], Value(kInitiatingSetMsg), nullptr) == 0 &&
- Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0;
- uassert(ErrorCodes::ChangeStreamHistoryLost,
- "Resume of change stream was not possible, as the resume point may no longer be in "
- "the oplog. ",
- isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
- } else {
- // Very unusual case: the oplog is empty. We can always resume. However, it should never be
- // possible to have obtained a document that matched the filter if the oplog is empty.
- uassert(ErrorCodes::ChangeStreamFatalError,
- "Oplog was empty but found an event in the change stream pipeline. It should not "
- "be possible for this to happen",
- nextInput.isEOF());
- }
- _verifiedOplogHasEnoughHistory = true;
+Value DocumentSourceCheckResumability::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ // We only serialize this stage in the context of explain.
+ return explain ? Value(DOC(getSourceName()
+ << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
+ : Value();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 0e50658f89d..c3e19a4a450 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -39,31 +39,41 @@
namespace mongo {
/**
- * This checks for resumability on a single shard in the sharded case. The rules are
+ * This stage checks whether or not the oplog has enough history to resume the stream, and consumes
+ * all events up to the given resume point. It is deployed on all shards when resuming a stream on
+ * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with
+ * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check
+ * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal
+ * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an
+ * assertion, which this stage catches and converts into a more comprehensible $changeStream
+ * specific exception. The rules are:
*
- * - If the first document in the pipeline for this shard has a matching timestamp, we can
- * always resume.
- * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur
- * on a secondary that has just started up from a primary that has not taken a write.
- * In particular, an empty oplog cannot be the result of oplog truncation.
- * - If neither of the above is true, the least-recent document in the oplog must precede the resume
- * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or
- * after seeing that there are no documents in the pipeline after the resume token in the shard,
- * we're guaranteed not to miss any documents.
+ * - If the first event seen in the oplog has the same timestamp as the requested resume token or
+ * startAtOperationTime, we can resume.
+ * - If the timestamp of the first event seen in the oplog is earlier than the requested resume
+ * token or startAtOperationTime, we can resume.
+ * - If the first entry in the oplog is a replica set initialization, then we can resume even if the
+ * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen
+ * in a sharded cluster when a new shard is added.
*
- * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume
- * token and the first matching document in the pipeline.
- *
- * This source need only run on a sharded collection. For unsharded collections,
- * DocumentSourceEnsureResumeTokenPresent is sufficient.
+ * - Otherwise we cannot resume, as we do not know if there were any events between the resume token
+ * and the first matching document in the oplog.
*/
-class DocumentSourceShardCheckResumability final : public DocumentSource {
+class DocumentSourceCheckResumability : public DocumentSource {
public:
- static constexpr StringData kStageName = "$_internalCheckShardResumability"_sd;
+ static constexpr StringData kStageName = "$_internalCheckResumability"_sd;
- const char* getSourceName() const final;
+ // Used to record the results of comparing the token data extracted from documents in the
+ // resumed stream against the client's resume token.
+ enum class ResumeStatus {
+ kFoundToken, // The stream produced a document satisfying the client resume token.
+ kSurpassedToken, // The stream's latest document is more recent than the resume token.
+ kCheckNextDoc // The next document produced by the stream may contain the resume token.
+ };
+
+ const char* getSourceName() const override;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
return {StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
@@ -75,48 +85,38 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
return boost::none;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts);
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
-private:
+protected:
/**
- * Use the create static method to create a DocumentSourceShardCheckResumability.
+ * Use the create static method to create a DocumentSourceCheckResumability.
*/
- DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token);
+ DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ ResumeTokenData token);
- GetNextResult doGetNext() final;
-
- void _assertOplogHasEnoughHistory(const GetNextResult& nextInput);
+ GetNextResult doGetNext() override;
+ ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
const ResumeTokenData _tokenFromClient;
- bool _verifiedOplogHasEnoughHistory = false;
- bool _surpassedResumeToken = false;
};
/**
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability {
public:
static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
- // Used to record the results of comparing the token data extracted from documents in the
- // resumed stream against the client's resume token.
- enum class ResumeStatus {
- kFoundToken, // The stream produced a document satisfying the client resume token.
- kSurpassedToken, // The stream's latest document is more recent than the resume token.
- kCheckNextDoc // The next document produced by the stream may contain the resume token.
- };
const char* getSourceName() const final;
@@ -142,13 +142,11 @@ public:
logic.mergingStage = this;
// Also add logic to the shards to ensure that each shard has enough oplog history to resume
// the change stream.
- logic.shardsStage = DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient);
+ logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient);
logic.inputSortPattern = change_stream_constants::kSortSpec;
return logic;
};
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
@@ -161,15 +159,8 @@ private:
GetNextResult doGetNext() final;
- /**
- * Check the given event to determine whether it matches the client's resume token. If so, we
- * swallow this event and return the next event in the stream. Otherwise, return boost::none.
- */
- boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken(
- const DocumentSource::GetNextResult& nextInput);
-
- ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
- const ResumeTokenData _tokenFromClient;
+ // Records whether we have observed the token in the resumed stream.
+ bool _hasSeenResumeToken = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index e6d5f4b92da..e6d3cda6a24 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -35,7 +35,10 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/collection_mock.h"
+#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -44,54 +47,261 @@
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/devnull/devnull_kv_engine.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
using boost::intrusive_ptr;
-using std::deque;
namespace mongo {
namespace {
+static constexpr StringData kOtherNs = "test.other.ns"_sd;
static constexpr StringData kTestNs = "test.ns"_sd;
+class ChangeStreamOplogCursorMock : public SeekableRecordCursor {
+public:
+ ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {}
+
+ virtual ~ChangeStreamOplogCursorMock() {}
+
+ boost::optional<Record> next() override {
+ if (_records->empty()) {
+ return boost::none;
+ }
+ auto& next = _records->front();
+ _records->pop_front();
+ return next;
+ }
+
+ boost::optional<Record> seekExact(const RecordId& id) override {
+ return Record{};
+ }
+ void save() override {}
+ bool restore() override {
+ return true;
+ }
+ void detachFromOperationContext() override {}
+ void reattachToOperationContext(OperationContext* opCtx) override {}
+
+private:
+ std::deque<Record>* _records;
+};
+
+class ChangeStreamOplogCollectionMock : public CollectionMock {
+public:
+ ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) {
+ _recordStore =
+ _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {});
+ }
+
+ void push_back(Document doc) {
+ // Every entry we push into the oplog should have both 'ts' and 'ns' fields.
+ invariant(doc["ts"].getType() == BSONType::bsonTimestamp);
+ invariant(doc["ns"].getType() == BSONType::String);
+ // Events should always be added in ascending ts order.
+ auto lastTs =
+ _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp();
+ invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0);
+ // Fill out remaining required fields in the oplog entry.
+ MutableDocument mutableDoc{doc};
+ mutableDoc.setField("op", Value("n"_sd));
+ mutableDoc.setField("o", Value(Document{}));
+ mutableDoc.setField("wall",
+ Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL())));
+ // Must remove _id since the oplog expects either no _id or an OID.
+ mutableDoc.remove("_id");
+ // Convert to owned BSON and create corresponding Records.
+ _data.push_back(mutableDoc.freeze().toBson());
+ Record record;
+ record.data = {_data.back().objdata(), _data.back().objsize()};
+ record.id = RecordId{static_cast<int64_t>(_data.size())};
+ _records.push_back(std::move(record));
+ }
+
+ std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
+ bool forward) const override {
+ return std::make_unique<ChangeStreamOplogCursorMock>(&_records);
+ }
+
+ const RecordStore* getRecordStore() const override {
+ return _recordStore.get();
+ }
+ RecordStore* getRecordStore() override {
+ return _recordStore.get();
+ }
+
+private:
+ // We retain the owned record queue here because cursors may be destroyed and recreated.
+ mutable std::deque<Record> _records;
+ std::deque<BSONObj> _data;
+
+ // These are no-op structures which are required by the CollectionScan.
+ std::unique_ptr<RecordStore> _recordStore;
+ DevNullKVEngine _devNullEngine;
+};
+
+/**
+ * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This
+ * class maintains its own queue of documents added by each test, but also pushes each doc into an
+ * underlying ChangeStreamOplogCollectionMock. When doGetNext() is called, it retrieves the next
+ * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test
+ * the latter's changestream-specific functionality. The reason this class keeps its own queue in
+ * addition to the ChangeStreamOplogCollectionMock is twofold:
+ *
+ * - The _id must be stripped from each document before it can be added to the mocked oplog, since
+ * the _id field of the test document is a resume token but oplog entries are only permitted to
+ * have OID _ids. We therefore have to restore the _id field of the document pulled from the
+ * CollectionScan before passing it into the pipeline.
+ *
+ * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan;
+ * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise
+ * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue
+ * with no corresponding entry in the ChangeStreamOplogCollectionMock queue.
+ */
+class DocumentSourceChangeStreamMock : public DocumentSourceMock {
+public:
+ DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContextForTest>& expCtx)
+ : DocumentSourceMock({}, expCtx) {
+ _filterExpr = BSON("ns" << kTestNs);
+ _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
+ _params.assertMinTsHasNotFallenOffOplog = true;
+ _params.shouldTrackLatestOplogTimestamp = true;
+ _params.minTs = Timestamp(0, 0);
+ _params.tailable = true;
+ }
+
+ void setResumeToken(ResumeTokenData resumeToken) {
+ invariant(!_collScan);
+ _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
+ _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
+ _params.minTs = resumeToken.clusterTime;
+ }
+
+ void push_back(GetNextResult&& result) {
+ // We should never push an explicit EOF onto the queue.
+ invariant(!result.isEOF());
+ // If there is a document supplied, add it to the mock collection.
+ if (result.isAdvanced()) {
+ _collection.push_back(result.getDocument());
+ }
+ // Both documents and pauses are stored in the DSMock queue.
+ DocumentSourceMock::push_back(std::move(result));
+ }
+
+ void push_back(const GetNextResult& result) {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isPermanentlyEOF() const {
+ return _collScan->getCommonStats()->isEOF;
+ }
+
+protected:
+ GetNextResult doGetNext() override {
+ // If this is the first call to doGetNext, we must create the COLLSCAN.
+ if (!_collScan) {
+ _collScan = std::make_unique<CollectionScan>(
+ pExpCtx.get(), &_collection, _params, &_ws, _filter.get());
+ // The first call to doWork will create the cursor and return NEED_TIME. But it won't
+ // actually scan any of the documents that are present in the mock cursor queue.
+ ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME);
+ ASSERT_EQ(_getNumDocsTested(), 0);
+ }
+ while (true) {
+ // If the next result is a pause, return it and don't collscan.
+ auto nextResult = DocumentSourceMock::doGetNext();
+ if (nextResult.isPaused()) {
+ return nextResult;
+ }
+ // Otherwise, retrieve the document via the CollectionScan stage.
+ auto id = WorkingSet::INVALID_ID;
+ switch (_collScan->doWork(&id)) {
+ case PlanStage::IS_EOF:
+ invariant(nextResult.isEOF());
+ return nextResult;
+ case PlanStage::ADVANCED: {
+ // We need to restore the _id field which was removed when we added this
+ // entry into the oplog. This is like a stripped-down DSCSTransform stage.
+ MutableDocument mutableDoc{_ws.get(id)->doc.value()};
+ mutableDoc["_id"] = nextResult.getDocument()["_id"];
+ return mutableDoc.freeze();
+ }
+ case PlanStage::NEED_TIME:
+ continue;
+ case PlanStage::NEED_YIELD:
+ MONGO_UNREACHABLE;
+ }
+ }
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ size_t _getNumDocsTested() {
+ return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested;
+ }
+
+ ChangeStreamOplogCollectionMock _collection;
+ std::unique_ptr<CollectionScan> _collScan;
+ CollectionScanParams _params;
+
+ std::unique_ptr<MatchExpression> _filter;
+ BSONObj _filterExpr;
+
+ WorkingSet _ws;
+};
+
class CheckResumeTokenTest : public AggregationContextFixture {
public:
- CheckResumeTokenTest() : _mock(DocumentSourceMock::createForTest(getExpCtx())) {}
+ CheckResumeTokenTest() : _mock(make_intrusive<DocumentSourceChangeStreamMock>(getExpCtx())) {}
protected:
/**
* Pushes a document with a resume token corresponding to the given ResumeTokenData into the
- * mock queue.
+ * mock queue. This document will have an ns field that matches the test namespace, and will
+ * appear in the change stream pipeline if its timestamp is at or after the resume timestamp.
*/
- void addDocument(ResumeTokenData tokenData) {
- _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}});
+ void addOplogEntryOnTestNS(ResumeTokenData tokenData) {
+ _mock->push_back(Document{{"ns", kTestNs},
+ {"ts", tokenData.clusterTime},
+ {"_id", ResumeToken(std::move(tokenData)).toDocument()}});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* txnOpIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(
+ void addOplogEntryOnTestNS(
Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) {
- return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)});
+ return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* txnOpIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, docKey, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid);
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, _id string, and
* namespace into the mock queue.
*/
- void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, Document{{"_id", id}}, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid);
+ }
+
+ /**
+ * Pushes a document that does not match the test namespace into the mock oplog. This will be
+ * examined by the oplog CollectionScan but will not produce an event in the pipeline.
+ */
+ void addOplogEntryOnOtherNS(Timestamp ts) {
+ _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}});
}
+ /**
+ * Pushes a pause in execution into the pipeline queue.
+ */
void addPause() {
_mock->push_back(DocumentSource::GetNextResult::makePauseExecution());
}
@@ -99,10 +309,11 @@ protected:
/**
* Convenience method to create the class under test with a given ResumeTokenData.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
ResumeTokenData tokenData) {
auto checkResumeToken =
- DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData));
+ DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
@@ -111,13 +322,13 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts,
int version,
std::size_t txnOpIndex,
boost::optional<Document> docKey,
UUID uuid) {
- return createCheckResumeToken(
+ return createDSEnsureResumeTokenPresent(
{ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
}
@@ -125,18 +336,18 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, docKey, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid);
}
/**
* Convenience method to create the class under test with a given timestamp, _id string, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid);
}
/**
@@ -148,28 +359,28 @@ protected:
return *uuid_gen;
}
- intrusive_ptr<DocumentSourceMock> _mock;
+ intrusive_ptr<DocumentSourceChangeStreamMock> _mock;
};
-class ShardCheckResumabilityTest : public CheckResumeTokenTest {
+class CheckResumabilityTest : public CheckResumeTokenTest {
protected:
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(
ResumeTokenData tokenData) {
- auto shardCheckResumability =
- DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData);
- shardCheckResumability->setSource(_mock.get());
- return shardCheckResumability;
+ auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
+ dsCheckResumability->setSource(_mock.get());
+ return dsCheckResumability;
}
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) {
- return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) {
+ return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
}
};
TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We should not see the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -177,9 +388,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
addPause();
- addDocument(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We see the pause we inserted, but not the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -190,10 +401,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp doc1Timestamp(100, 2);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
addPause();
- addDocument(doc1Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "2");
// Pause added explicitly.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -208,13 +419,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
Timestamp doc1Timestamp(100, 2);
Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
auto result1 = checkResumeToken->getNext();
ASSERT_TRUE(result1.isAdvanced());
@@ -228,23 +439,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken)
}
TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
- Timestamp resumeTimestamp(100, 1);
+ // The first timestamp in the oplog precedes the resume token, and the second matches it...
+ Timestamp doc1Timestamp(100, 1);
+ Timestamp resumeTimestamp(100, 2);
+ Timestamp doc2Timestamp = resumeTimestamp;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
- Timestamp doc1Timestamp(100, 2);
- Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ // ... but there's no entry in the oplog that matches the full token.
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) {
+TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+
+ // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it
+ // but don't throw - we haven't surpassed the token yet and still may see it in the next doc.
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -252,9 +468,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
Timestamp resumeTimestamp(100, 1);
auto resumeTokenUUID = UUID::gen();
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID);
auto otherUUID = UUID::gen();
- addDocument(resumeTimestamp, "1", otherUUID);
+ addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
@@ -265,9 +481,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc");
// We must not see the following document.
- addDocument(resumeTimestamp, "ABC");
+ addOplogEntryOnTestNS(resumeTimestamp, "ABC");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -279,13 +495,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
Timestamp doc1Timestamp(100, 1);
- addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
Timestamp doc2Timestamp(100, 2);
Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}};
- addDocument(doc2Timestamp, doc2DocKey);
+ addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey);
// We should skip doc1 since it satisfies the resume token, and retrieve doc2.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -300,10 +517,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -318,10 +536,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen
getExpCtx()->inMongos = true;
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -334,10 +552,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject)
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none);
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -350,11 +568,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) {
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -379,20 +598,20 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the higher-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime but a lower UUID. One of the documents has
// a lower docKey than the resume token, the other has a higher docKey; this demonstrates that
// the UUID is the discriminating factor.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and UUID whose docKey sorts after the token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -417,13 +636,13 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the lower-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
// Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed
// by a document which matches the resume token. This is not possible in practice, but it serves
// to demonstrate that the resume attempt fails even when the resume token is present.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -447,18 +666,18 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Add three documents which each have the invalidate resume token. We expect to see this in the
// event that we are starting after an invalidate and the invalidating event occurred on several
// shards at the same clusterTime.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
// Add a document representing an insert which recreated the collection after it was dropped.
auto expectedDocKey = Document{{"_id"_sd, 1}};
- addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(Timestamp{100, 2}, expectedDocKey, uuids[1]);
// DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
// and the next two invalidates, and return the insert event after the collection drop.
@@ -487,7 +706,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Create a second invalidate token with the same clusterTime but a different UUID.
auto unrelatedInvalidateToken = invalidateToken;
@@ -496,12 +715,12 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
// Add three documents which each have the invalidate resume token. We expect to see this in the
// event that we are starting after an invalidate and the invalidating event occurred on several
// shards at the same clusterTime.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
// Add a fourth document which has the unrelated invalidate at the same clusterTime.
- addDocument(unrelatedInvalidateToken);
+ addOplogEntryOnTestNS(unrelatedInvalidateToken);
// DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
// and the next two invalidates, but decline to swallow the unrelated invalidate.
@@ -512,39 +731,6 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken);
}
-TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) {
- Timestamp resumeTimestamp(100, 1);
-
- // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS.
- // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica
- // set, since this should never occur.
- getExpCtx()->inMongos = false;
-
- // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
- // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
- // stream after the old stream is invalidated.
- ResumeTokenData invalidateToken;
- invalidateToken.clusterTime = resumeTimestamp;
- invalidateToken.uuid = testUuid();
- invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
-
- // Add three documents which each have the invalidate resume token.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
-
- // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow
- // it. However, it should not swallow the subsequent two invalidates.
- for (size_t i = 0; i < 2; ++i) {
- const auto nextDocAfterResume = checkResumeToken->getNext();
- const auto tokenFromNextDocAfterResume =
- ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData();
- ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken);
- }
- ASSERT(checkResumeToken->getNext().isEOF());
-}
-
TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
Timestamp resumeTimestamp(100, 1);
@@ -554,21 +740,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
std::sort(uuids.begin(), uuids.end());
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime and version but a lower applyOps index. One
// of the documents has a lower uuid than the resume token, the other has a higher uuid; this
// demonstrates that the applyOps index is the discriminating factor.
- addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and version whose applyOps sorts after the
// resume token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -585,261 +771,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
-/**
- * A mock MongoProcessInterface which allows mocking a foreign pipeline.
- */
-class MockMongoInterface final : public StubMongoProcessInterface {
-public:
- MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
- : _mockResults(std::move(mockResults)) {}
-
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
- return false;
- }
-
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool localCursorOnly) final {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
- ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
- pipeline->addInitialSource(
- DocumentSourceMock::createForTest(_mockResults, pipeline->getContext()));
- return pipeline;
- }
-
-private:
- deque<DocumentSource::GetNextResult> _mockResults;
-};
-
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) {
Timestamp resumeTimestamp(100, 1);
- Timestamp oplogTimestamp(100, 2);
+ Timestamp oplogTimestamp(100, 1);
- ResumeTokenData tokenData(
- resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(tokenData);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) {
Timestamp resumeTimestamp(100, 1);
- ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
- // We should see the resume token.
+ // As with other tailable cursors, starting a change stream on an empty capped collection will
+ // cause the cursor to immediately and permanently EOF. This should never happen in practice,
+ // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING
+ // states, and there must be at least one entry in the oplog in order to reach those states.
+ auto shardCheckResumability = createDSCheckResumability(resumeTimestamp);
auto result = shardCheckResumability->getNext();
- ASSERT_TRUE(result.isAdvanced());
- auto& doc = result.getDocument();
- ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+ ASSERT_TRUE(result.isEOF());
+ ASSERT_TRUE(_mock->isPermanentlyEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isEOF());
+}
+
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ auto& doc = result.getDocument();
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+}
+
+TEST_F(CheckResumabilityTest,
ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
+TEST_F(CheckResumabilityTest,
+ ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp oplogTimestamp(100, 2);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ // Confirm that there are two documents queued in the mock oplog.
+ ASSERT_EQ(_mock->size(), 2);
+ ASSERT_THROWS_CODE(
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
+ // Confirm that only the first document was read before the assertion was thrown.
+ ASSERT_EQ(_mock->size(), 1);
+}
+
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog(
- {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- auto result2 = shardCheckResumability->getNext();
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isEOF());
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add the resume token, plus one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "3");
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "3");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the resume token...
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto doc = result.getDocument();
ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData());
// ... then the post-token event, and then finally EOF.
- result = shardCheckResumability->getNext();
+ result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument()}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument();
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the post-token event, followed by EOF.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument()}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument();
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index bdc3c5c7e2d..ae9e0ef4f63 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -55,6 +55,10 @@ const char* DocumentSourceMock::getSourceName() const {
return "mock";
}
+const size_t DocumentSourceMock::size() const {
+ return _queue.size();
+}
+
boost::intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest(
Document doc, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new DocumentSourceMock({std::move(doc)}, expCtx);
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index a538e3b9086..b845df5ac92 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -70,6 +70,8 @@ public:
const char* getSourceName() const override;
+ const size_t size() const;
+
void reattachToOperationContext(OperationContext* opCtx) {
isDetachedFromOpCtx = false;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index adec324007d..647380368db 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -655,7 +655,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) {
invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
+ QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
// If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The
diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp
index ad10da0c88c..6390a25dde6 100644
--- a/src/mongo/db/query/classic_stage_builder.cpp
+++ b/src/mongo/db/query/classic_stage_builder.cpp
@@ -76,6 +76,7 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
CollectionScanParams params;
params.tailable = csn->tailable;
params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
+ params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index a0f98e7734a..db4779c2500 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -217,6 +217,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->tailable = tailable;
csn->shouldTrackLatestOplogTimestamp =
params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ csn->assertMinTsHasNotFallenOffOplog =
+ params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
csn->shouldWaitForOplogVisibility =
params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index dc8bc35ef64..e5eebac0cd2 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -173,6 +173,9 @@ string optionString(size_t options) {
case QueryPlannerParams::PRESERVE_RECORD_ID:
ss << "PRESERVE_RECORD_ID ";
break;
+ case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG:
+ ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG ";
+ break;
case QueryPlannerParams::DEFAULT:
MONGO_UNREACHABLE;
break;
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index 4c6b0400f8e..3c2cc59c24e 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -97,6 +97,10 @@ struct QueryPlannerParams {
// ids. In some cases, record ids can be discarded as an optimization when they will not be
// consumed downstream.
PRESERVE_RECORD_ID = 1 << 10,
+
+ // Set this on an oplog scan to uassert that the oplog has not already rolled over the
+ // minimum 'ts' timestamp specified in the query.
+ ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 11,
};
// See Options enum above.
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 0e62859febf..0e6bdda0b91 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -212,6 +212,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->tailable = this->tailable;
copy->direction = this->direction;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
+ copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog;
copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
return copy;
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index cdb038cd61e..0020b6f8442 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -435,6 +435,9 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
// across a sharded cluster.
bool shouldTrackLatestOplogTimestamp = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
int direction{1};
// Whether or not to wait for oplog visibility on oplog collection scans.