summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-06-01 18:47:00 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-11 20:23:51 +0000
commit9e38cbba7d3efefa59e25cb0411558591036d30b (patch)
treec7afeda3e077f9c669950dec5396d21821f83ad3
parentdfb31874bf10e55c39e615ef86290b68ba0f43b3 (diff)
downloadmongo-9e38cbba7d3efefa59e25cb0411558591036d30b.tar.gz
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
(cherry picked from commit 694ed4153b9d5424b5d169fea5c68f99d4dfb45a)
-rw-r--r--jstests/sharding/resume_change_stream.js6
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/exec/collection_scan.cpp34
-rw-r--r--src/mongo/db/exec/collection_scan.h5
-rw-r--r--src/mongo/db/exec/collection_scan_common.h7
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp209
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h98
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp750
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h2
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/query/get_executor.cpp3
-rw-r--r--src/mongo/db/query/planner_access.cpp2
-rw-r--r--src/mongo/db/query/query_planner.cpp3
-rw-r--r--src/mongo/db/query/query_planner_params.h6
-rw-r--r--src/mongo/db/query/query_solution.cpp1
-rw-r--r--src/mongo/db/query/query_solution.h3
-rw-r--r--src/mongo/db/query/stage_builder.cpp1
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
22 files changed, 677 insertions, 476 deletions
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
index 517253bfcc0..426b0b3e3d3 100644
--- a/jstests/sharding/resume_change_stream.js
+++ b/jstests/sharding/resume_change_stream.js
@@ -124,14 +124,14 @@ function testResume(mongosColl, collToWatch) {
db: mongosDB,
collName: collToWatch,
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}],
- expectedCode: ErrorCodes.ChangeStreamFatalError
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
});
ChangeStreamTest.assertChangeStreamThrowsCode({
db: mongosDB,
collName: collToWatch,
pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}],
- expectedCode: ErrorCodes.ChangeStreamFatalError
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
});
// Test that the change stream can't resume if the resume token *is* present in the oplog,
@@ -143,7 +143,7 @@ function testResume(mongosColl, collToWatch) {
db: mongosDB,
collName: collToWatch,
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
- expectedCode: ErrorCodes.ChangeStreamFatalError
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
});
// Drop the collection.
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index efab12a5d34..dedeac665f8 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -294,6 +294,7 @@ error_code("WaitForMajorityServiceEarlierOpTimeAvailable", 289)
error_code("NoQueryExecutionPlans", 291)
error_code("HierarchicalAcquisitionLevelViolation", 297)
error_code("PeriodicJobIsStopped", 310)
+error_code("OplogQueryMinTsMissing", 326)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 0b3f9b1c23c..a901384fb7c 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -69,6 +69,12 @@ CollectionScan::CollectionScan(OperationContext* opCtx,
_specificStats.maxTs = params.maxTs;
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present.
+ if (params.assertMinTsHasNotFallenOffOplog) {
+ invariant(params.shouldTrackLatestOplogTimestamp);
+ invariant(params.minTs);
+ }
+
if (params.maxTs) {
_endConditionBSON = BSON("$gte" << *(params.maxTs));
_endCondition = stdx::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName,
@@ -141,19 +147,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
}
if (!record) {
- // We just hit EOF. If we are tailable and have already returned data, leave us in a
- // state to pick up where we left off on the next call to work(). Otherwise EOF is
- // permanent.
+ // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up
+ // where we left off on the next call to work(). Otherwise, the EOF is permanent.
if (_params.tailable && !_lastSeenId.isNull()) {
_cursor.reset();
} else {
_commonStats.isEOF = true;
}
-
return PlanStage::IS_EOF;
}
_lastSeenId = record->id;
+ if (_params.assertMinTsHasNotFallenOffOplog) {
+ assertMinTsHasNotFallenOffOplog(*record);
+ }
if (_params.shouldTrackLatestOplogTimestamp) {
auto status = setLatestOplogEntryTimestamp(*record);
if (!status.isOK()) {
@@ -184,6 +191,25 @@ Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
return Status::OK();
}
+void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) {
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't matter
+ // if its timestamp is later than the specified minTs; no events earlier than the minTs can have
+ // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry
+ // is earlier than or equal to the minTs time.
+ auto swOplogEntry = repl::OplogEntry::parse(record.data.toBson());
+ invariant(_specificStats.docsTested == 0);
+ invariant(swOplogEntry.isOK());
+ auto oplogEntry = std::move(swOplogEntry.getValue());
+ const bool isNewRS =
+ oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) &&
+ oplogEntry.getOpType() == repl::OpTypeEnum::kNoop;
+ uassert(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog",
+ isNewRS || oplogEntry.getTimestamp() <= *_params.minTs);
+ // We don't need to check this assertion again after we've confirmed the first oplog event.
+ _params.assertMinTsHasNotFallenOffOplog = false;
+}
+
PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index ee50293a5c9..fb182168d99 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -96,6 +96,11 @@ private:
*/
Status setLatestOplogEntryTimestamp(const Record& record);
+ /**
+ * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog.
+ */
+ void assertMinTsHasNotFallenOffOplog(const Record& record);
+
// WorkingSet is not owned by us.
WorkingSet* _workingSet;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index b655872f6fa..2e6fafce451 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -45,6 +45,10 @@ struct CollectionScanParams {
// The RecordId to which we should seek to as the first document of the scan.
RecordId start;
+ // If present, the collection scan will seek directly to the RecordId of an oplog entry as
+ // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
+ boost::optional<Timestamp> minTs;
+
// If present, the collection scan will stop and return EOF the first time it sees a document
// that does not pass the filter and has 'ts' greater than 'maxTs'.
boost::optional<Timestamp> maxTs;
@@ -54,6 +58,9 @@ struct CollectionScanParams {
// Do we want the scan to be 'tailable'? Only meaningful if the collection is capped.
bool tailable = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
// Should we keep track of the timestamp of the latest oplog entry we've seen? This information
// is needed to merge cursors from the oplog in order of operation time when reading the oplog
// across a sharded cluster.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index d11cd088fa1..73bfdbdb404 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -248,9 +248,12 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/catalog/catalog_impl',
+ '$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/query/router_exec_stage',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 9050b9990dd..012c16ffbaa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -383,7 +383,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
// to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'.
startFrom = tokenData.clusterTime;
if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) {
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData);
} else {
resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData);
}
@@ -394,7 +394,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
"Only one type of resume option is allowed, but multiple were found.",
!resumeStage);
startFrom = *startAtOperationTime;
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
+ resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom);
}
// There might not be a starting point if we're on mongos, otherwise we should either have a
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index cb7e1484d2c..46168e1ac9a 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -36,7 +36,7 @@ using boost::intrusive_ptr;
namespace mongo {
namespace {
-using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
+using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus;
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
@@ -84,10 +84,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
} else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) {
// This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the
// applyOps was irrelevant (meaning it was an operation on a collection or DB not being
- // watched). If we are looking for the resume token on a shard then this simply means that
- // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
- uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge);
- // We are running on a merging shard. Signal that we have read beyond the resume token.
+ // watched). Signal that we have read beyond the resume token.
return ResumeStatus::kSurpassedToken;
}
@@ -167,16 +164,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
}
} // namespace
-const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
- return "$_ensureResumeTokenPresent";
-}
-
-Value DocumentSourceEnsureResumeTokenPresent::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
- // would result in it being created twice.
- return Value();
-}
+DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSourceCheckResumability(expCtx, std::move(token)) {}
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
@@ -184,172 +174,125 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon
return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
-DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
+ return kStageName.rawData();
+}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
+ // If we have already verified the resume token is present, return the next doc immediately.
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
- // We've already verified the resume token is present.
return pSource->getNext();
}
- // The incoming documents are sorted by resume token. We examine a range of documents that have
- // the same clusterTime as the client's resume token, until we either find (and swallow) a match
- // for the token or pass the point in the stream where it should have been.
+ auto nextInput = GetNextResult::makeEOF();
+
+ // If we are starting after an 'invalidate' and the invalidating command (e.g. collection drop)
+ // occurred at the same clusterTime on more than one shard, then we may see multiple identical
+ // resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken.
while (_resumeStatus != ResumeStatus::kSurpassedToken) {
- auto nextInput = pSource->getNext();
+ // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This
+ // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token.
+ nextInput = DocumentSourceCheckResumability::getNext();
- // If there are no more results, return EOF. We will continue checking for the client's
- // resume token the next time the getNext method is called.
+ // If there are no more results, return EOF. We will continue checking for the resume token
+ // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed
+ // the resume token on this iteration.
if (!nextInput.isAdvanced()) {
+ invariant(_resumeStatus != ResumeStatus::kSurpassedToken);
return nextInput;
}
- // Check the current event. If we found and swallowed the resume token, then the result will
- // be the first event in the stream which should be returned to the user. Otherwise, we keep
- // iterating the stream until we find an event matching the client's resume token.
- if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) {
- return *nextOutput;
- }
- }
- MONGO_UNREACHABLE;
-}
-boost::optional<DocumentSource::GetNextResult>
-DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken(
- const DocumentSource::GetNextResult& nextInput) {
- // We should only ever call this method when we have a new event to examine.
- invariant(nextInput.isAdvanced());
- auto resumeStatus =
- compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
- switch (resumeStatus) {
- case ResumeStatus::kCheckNextDoc:
- return boost::none;
- case ResumeStatus::kFoundToken:
- // We found the resume token. If we are starting after an 'invalidate' token and the
- // invalidating command (e.g. collection drop) occurred at the same clusterTime on
- // more than one shard, then we will see multiple identical 'invalidate' events
- // here. We should continue to swallow all of them to ensure that the new stream
- // begins after the collection drop, and that it is not immediately re-invalidated.
- if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) {
- _resumeStatus = ResumeStatus::kFoundToken;
- return boost::none;
- }
- // If the token is not an invalidate or if we are not running in a cluster, we mark
- // the stream as having surpassed the resume token, skip the current event since the
- // client has already seen it, and return the next event in the stream.
- _resumeStatus = ResumeStatus::kSurpassedToken;
- return pSource->getNext();
- case ResumeStatus::kSurpassedToken:
- // If we have surpassed the point in the stream where the resume token should have
- // been and we did not see the token itself, then this stream cannot be resumed.
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "cannot resume stream; the resume token was not found. "
- << nextInput.getDocument()["_id"].getDocument().toString(),
- _resumeStatus == ResumeStatus::kFoundToken);
- _resumeStatus = ResumeStatus::kSurpassedToken;
- return nextInput;
+ // When we reach here, we have either found the resume token or surpassed it.
+ invariant(_resumeStatus != ResumeStatus::kCheckNextDoc);
+
+ // If the resume status is kFoundToken, record the fact that we have seen the token. When we
+ // have surpassed the resume token, we will assert that we saw the token before doing so. We
+ // cannot simply assert once and then assume we have surpassed the token, because in certain
+ // cases we may see 1..N identical tokens and must swallow them all before proceeding.
+ _hasSeenResumeToken = (_hasSeenResumeToken || _resumeStatus == ResumeStatus::kFoundToken);
}
- MONGO_UNREACHABLE;
-}
-const char* DocumentSourceShardCheckResumability::getSourceName() const {
- return "$_checkShardResumability";
-}
+ // Assert that before surpassing the resume token, we observed the token itself in the stream.
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "cannot resume stream; the resume token was not found. "
+ << nextInput.getDocument()["_id"].getDocument().toString(),
+ _hasSeenResumeToken);
-Value DocumentSourceShardCheckResumability::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
- // would result in it being created twice.
- return Value();
+ // At this point, we have seen the token and swallowed it. Return the next event to the client.
+ invariant(_hasSeenResumeToken && _resumeStatus == ResumeStatus::kSurpassedToken);
+ return nextInput;
}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+DocumentSourceCheckResumability::DocumentSourceCheckResumability(
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
+
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
// We are resuming from a point in time, not an event. Seed the stage with a high water mark.
return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData());
}
-intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
+intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
+ return new DocumentSourceCheckResumability(expCtx, std::move(token));
}
-DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
+const char* DocumentSourceCheckResumability::getSourceName() const {
+ return kStageName.rawData();
+}
-DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
+DocumentSource::GetNextResult DocumentSourceCheckResumability::getNext() {
pExpCtx->checkForInterrupt();
- if (_surpassedResumeToken)
+ if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
+ }
- while (!_surpassedResumeToken) {
- auto nextInput = pSource->getNext();
+ while (_resumeStatus != ResumeStatus::kSurpassedToken) {
+ // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change
+ // stream filter has fallen off the oplog. Catch this and throw a more explanatory error.
+ auto nextInput = [this]() {
+ try {
+ return pSource->getNext();
+ } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
+ uasserted(ErrorCodes::ChangeStreamHistoryLost,
+ "Resume of change stream was not possible, as the resume point may no "
+ "longer be in the oplog.");
+ }
+ }();
- // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us
- // from continually returning EOF in cases where the resume point has fallen off the oplog.
+ // If we hit EOF, return it immediately.
if (!nextInput.isAdvanced()) {
- _assertOplogHasEnoughHistory(nextInput);
return nextInput;
}
+
// Determine whether the current event sorts before, equal to or after the resume token.
- auto resumeStatus =
+ _resumeStatus =
compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
- switch (resumeStatus) {
+ switch (_resumeStatus) {
case ResumeStatus::kCheckNextDoc:
// If the result was kCheckNextDoc, we are resumable but must swallow this event.
- _verifiedOplogHasEnoughHistory = true;
continue;
case ResumeStatus::kSurpassedToken:
- // In this case the resume token wasn't found; it must be on another shard. We must
- // examine the oplog to ensure that its history reaches back to before the resume
- // token, otherwise we may have missed events that fell off the oplog. If we can
- // resume, fall through into the following case and set _surpassedResumeToken.
- _assertOplogHasEnoughHistory(nextInput);
+ // In this case the resume token wasn't found; it may be on another shard. However,
+ // since the oplog scan did not throw, we know that we are resumable. Fall through
+ // into the following case and return the document.
case ResumeStatus::kFoundToken:
- // We found the actual token! Set _surpassedResumeToken and return the result.
- _surpassedResumeToken = true;
+ // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it.
return nextInput;
}
}
MONGO_UNREACHABLE;
}
-void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
- const GetNextResult& nextInput) {
- // If we have already verified that this stream is resumable, return immediately.
- if (_verifiedOplogHasEnoughHistory) {
- return;
- }
- // Look up the first document in the oplog and compare it with the resume token's clusterTime.
- auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
- auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
- if (auto first = pipeline->getNext()) {
- auto firstOplogEntry = Value(*first);
- // If the first entry in the oplog is the replset initialization, then it doesn't matter
- // if its timestamp is later than the resume token. No events earlier than the token can
- // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that
- // the timestamp of the first oplog entry is earlier than that of the resume token.
- const bool isNewRS =
- Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 &&
- Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0;
- uassert(ErrorCodes::ChangeStreamFatalError,
- "Resume of change stream was not possible, as the resume point may no longer be in "
- "the oplog. ",
- isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
- } else {
- // Very unusual case: the oplog is empty. We can always resume. However, it should never be
- // possible to have obtained a document that matched the filter if the oplog is empty.
- uassert(ErrorCodes::ChangeStreamFatalError,
- "Oplog was empty but found an event in the change stream pipeline. It should not "
- "be possible for this to happen",
- nextInput.isEOF());
- }
- _verifiedOplogHasEnoughHistory = true;
+Value DocumentSourceCheckResumability::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ // We only serialize this stage in the context of explain.
+ return explain ? Value(DOC(getSourceName()
+ << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
+ : Value();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 8c90a88b564..b70c6dd8228 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -39,30 +39,42 @@
namespace mongo {
/**
- * This checks for resumability on a single shard in the sharded case. The rules are
+ * This stage checks whether or not the oplog has enough history to resume the stream, and consumes
+ * all events up to the given resume point. It is deployed on all shards when resuming a stream on
+ * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with
+ * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check
+ * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal
+ * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an
+ * assertion, which this stage catches and converts into a more comprehensible $changeStream
+ * specific exception. The rules are:
*
- * - If the first document in the pipeline for this shard has a matching timestamp, we can
- * always resume.
- * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur
- * on a secondary that has just started up from a primary that has not taken a write.
- * In particular, an empty oplog cannot be the result of oplog truncation.
- * - If neither of the above is true, the least-recent document in the oplog must precede the resume
- * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or
- * after seeing that there are no documents in the pipeline after the resume token in the shard,
- * we're guaranteed not to miss any documents.
+ * - If the first event seen in the oplog has the same timestamp as the requested resume token or
+ * startAtOperationTime, we can resume.
+ * - If the timestamp of the first event seen in the oplog is earlier than the requested resume
+ * token or startAtOperationTime, we can resume.
+ * - If the first entry in the oplog is a replica set initialization, then we can resume even if the
+ * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen
+ * in a sharded cluster when a new shard is added.
*
- * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume
- * token and the first matching document in the pipeline.
- *
- * This source need only run on a sharded collection. For unsharded collections,
- * DocumentSourceEnsureResumeTokenPresent is sufficient.
+ * - Otherwise we cannot resume, as we do not know if there were any events between the resume token
+ * and the first matching document in the oplog.
*/
-class DocumentSourceShardCheckResumability final : public DocumentSource {
+class DocumentSourceCheckResumability : public DocumentSource {
public:
- GetNextResult getNext() final;
- const char* getSourceName() const final;
+ static constexpr StringData kStageName = "$_internalCheckResumability"_sd;
+
+ // Used to record the results of comparing the token data extracted from documents in the
+ // resumed stream against the client's resume token.
+ enum class ResumeStatus {
+ kFoundToken, // The stream produced a document satisfying the client resume token.
+ kSurpassedToken, // The stream's latest document is more recent than the resume token.
+ kCheckNextDoc // The next document produced by the stream may contain the resume token.
+ };
+
+ GetNextResult getNext() override;
+ const char* getSourceName() const override;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
return {StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
@@ -73,47 +85,38 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
return boost::none;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts);
- static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
-private:
+protected:
/**
- * Use the create static method to create a DocumentSourceShardCheckResumability.
+ * Use the create static method to create a DocumentSourceCheckResumability.
*/
- DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token);
-
- void _assertOplogHasEnoughHistory(const GetNextResult& nextInput);
+ DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ ResumeTokenData token);
- ResumeTokenData _tokenFromClient;
- bool _verifiedOplogHasEnoughHistory = false;
- bool _surpassedResumeToken = false;
+ ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
+ const ResumeTokenData _tokenFromClient;
};
/**
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability {
public:
- // Used to record the results of comparing the token data extracted from documents in the
- // resumed stream against the client's resume token.
- enum class ResumeStatus {
- kFoundToken, // The stream produced a document satisfying the client resume token.
- kSurpassedToken, // The stream's latest document is more recent than the resume token.
- kCheckNextDoc // The next document produced by the stream may contain the resume token.
- };
+ static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
- GetNextResult getNext() final;
+ GetNextResult getNext() override;
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState) const final {
@@ -137,13 +140,11 @@ public:
logic.mergingStage = this;
// Also add logic to the shards to ensure that each shard has enough oplog history to resume
// the change stream.
- logic.shardsStage = DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient);
+ logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient);
logic.inputSortPattern = change_stream_constants::kSortSpec;
return logic;
};
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
-
static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
@@ -154,15 +155,8 @@ private:
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeTokenData token);
- /**
- * Check the given event to determine whether it matches the client's resume token. If so, we
- * swallow this event and return the next event in the stream. Otherwise, return boost::none.
- */
- boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken(
- const DocumentSource::GetNextResult& nextInput);
-
- ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
- ResumeTokenData _tokenFromClient;
+ // Records whether we have observed the token in the resumed stream.
+ bool _hasSeenResumeToken = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index afe4758e056..fdf1bcbb14c 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -35,6 +35,11 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/collection_mock.h"
+#include "mongo/db/catalog/database_holder_mock.h"
+#include "mongo/db/catalog/database_impl.h"
+#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -44,55 +49,290 @@
#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/devnull/devnull_kv_engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
using boost::intrusive_ptr;
-using std::deque;
namespace mongo {
namespace {
+static constexpr StringData kOtherNs = "test.other.ns"_sd;
static constexpr StringData kTestNs = "test.ns"_sd;
+class ChangeStreamOplogCursorMock : public SeekableRecordCursor {
+public:
+ ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {}
+
+ virtual ~ChangeStreamOplogCursorMock() {}
+
+ boost::optional<Record> next() override {
+ if (_records->empty()) {
+ return boost::none;
+ }
+ auto& next = _records->front();
+ _records->pop_front();
+ return next;
+ }
+
+ boost::optional<Record> seekExact(const RecordId& id) override {
+ return Record{};
+ }
+ void save() override {}
+ bool restore() override {
+ return true;
+ }
+ void detachFromOperationContext() override {}
+ void reattachToOperationContext(OperationContext* opCtx) override {}
+
+private:
+ std::deque<Record>* _records;
+};
+
+class ChangeStreamOplogCollectionMock : public CollectionMock {
+public:
+ ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) {
+ _recordStore =
+ _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {});
+ }
+
+ void push_back(Document doc) {
+ // Every entry we push into the oplog should have both 'ts' and 'ns' fields.
+ invariant(doc["ts"].getType() == BSONType::bsonTimestamp);
+ invariant(doc["ns"].getType() == BSONType::String);
+ // Events should always be added in ascending ts order.
+ auto lastTs =
+ _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp();
+ invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0);
+ // Fill out remaining required fields in the oplog entry.
+ MutableDocument mutableDoc{doc};
+ mutableDoc.setField("op", Value("n"_sd));
+ mutableDoc.setField("o", Value(Document{}));
+ mutableDoc.setField("wall",
+ Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL())));
+ // Must remove _id since the oplog expects either no _id or an OID.
+ mutableDoc.remove("_id");
+ // Convert to owned BSON and create corresponding Records.
+ _data.push_back(mutableDoc.freeze().toBson());
+ Record record;
+ record.data = {_data.back().objdata(), _data.back().objsize()};
+ record.id = RecordId{static_cast<int64_t>(_data.size())};
+ _records.push_back(std::move(record));
+ }
+
+ std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
+ bool forward) const override {
+ return std::make_unique<ChangeStreamOplogCursorMock>(&_records);
+ }
+
+ const RecordStore* getRecordStore() const override {
+ return _recordStore.get();
+ }
+ RecordStore* getRecordStore() override {
+ return _recordStore.get();
+ }
+
+private:
+ // We retain the owned record queue here because cursors may be destroyed and recreated.
+ mutable std::deque<Record> _records;
+ std::deque<BSONObj> _data;
+
+ // These are no-op structures which are required by the CollectionScan.
+ std::unique_ptr<RecordStore> _recordStore;
+ DevNullKVEngine _devNullEngine;
+};
+
+/**
+ * The RequiresCollectionStageBase class attempts to obtain the current epoch of the database
+ * containing the collection to be scanned (in this case, the oplog). Here we provide a dummy
+ * DatabaseHolder which will always return a valid pointer to the _database member variable.
+ */
+class ChangeStreamDatabaseHolderMock : public DatabaseHolderMock {
+public:
+ ChangeStreamDatabaseHolderMock() : _database(std::make_unique<DatabaseImpl>("local"_sd, 0)){};
+
+ Database* getDb(OperationContext* opCtx, StringData ns) const override {
+ return _database.get();
+ }
+
+private:
+ std::unique_ptr<Database> _database;
+};
+
+/**
+ * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This
+ * class maintains its own queue of documents added by each test, but also pushes each doc into an
+ * underlying ChangeStreamOplogCollectionMock. When getNext() is called, it retrieves the next
+ * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test
+ * the latter's changestream-specific functionality. The reason this class keeps its own queue in
+ * addition to the ChangeStreamOplogCollectionMock is twofold:
+ *
+ * - The _id must be stripped from each document before it can be added to the mocked oplog, since
+ * the _id field of the test document is a resume token but oplog entries are only permitted to
+ * have OID _ids. We therefore have to restore the _id field of the document pulled from the
+ * CollectionScan before passing it into the pipeline.
+ *
+ * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan;
+ * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise
+ * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue
+ * with no corresponding entry in the ChangeStreamOplogCollectionMock queue.
+ */
+class DocumentSourceChangeStreamMock : public DocumentSourceMock {
+public:
+ DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {
+ _filterExpr = BSON("ns" << kTestNs);
+ _filter = _parseAndNormalize(_filterExpr);
+ _params.assertMinTsHasNotFallenOffOplog = true;
+ _params.shouldTrackLatestOplogTimestamp = true;
+ _params.minTs = Timestamp(0, 0);
+ _params.tailable = true;
+ }
+
+ void setResumeToken(ResumeTokenData resumeToken) {
+ invariant(!_collScan);
+ _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
+ _filter = _parseAndNormalize(_filterExpr);
+ _params.minTs = resumeToken.clusterTime;
+ }
+
+ void push_back(GetNextResult&& result) {
+ // We should never push an explicit EOF onto the queue.
+ invariant(!result.isEOF());
+ // If there is a document supplied, add it to the mock collection.
+ if (result.isAdvanced()) {
+ _collection.push_back(result.getDocument());
+ }
+ // Both documents and pauses are stored in the DSMock queue.
+ DocumentSourceMock::push_back(std::move(result));
+ }
+
+ void push_back(const GetNextResult& result) {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isPermanentlyEOF() const {
+ return _collScan->getCommonStats()->isEOF;
+ }
+
+protected:
+ GetNextResult getNext() override {
+ // If this is the first call to getNext, we must create the COLLSCAN.
+ if (!_collScan) {
+ _collScan = std::make_unique<CollectionScan>(
+ pExpCtx->opCtx, &_collection, _params, &_ws, _filter.get());
+ // The first call to doWork will create the cursor and return NEED_TIME. But it won't
+ // actually scan any of the documents that are present in the mock cursor queue.
+ ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME);
+ ASSERT_EQ(_getNumDocsTested(), 0UL);
+ }
+ while (true) {
+ // If the next result is a pause, return it and don't collscan.
+ auto nextResult = DocumentSourceMock::getNext();
+ if (nextResult.isPaused()) {
+ return nextResult;
+ }
+ // Otherwise, retrieve the document via the CollectionScan stage.
+ auto id = WorkingSet::INVALID_ID;
+ switch (_collScan->doWork(&id)) {
+ case PlanStage::IS_EOF:
+ invariant(nextResult.isEOF());
+ return nextResult;
+ case PlanStage::ADVANCED: {
+ // We need to restore the _id field which was removed when we added this
+ // entry into the oplog. This is like a stripped-down DSCSTransform stage.
+ MutableDocument mutableDoc{Document{_ws.get(id)->obj.value()}};
+ mutableDoc["_id"] = nextResult.getDocument()["_id"];
+ return mutableDoc.freeze();
+ }
+ case PlanStage::NEED_TIME:
+ continue;
+ case PlanStage::NEED_YIELD:
+ case PlanStage::FAILURE:
+ MONGO_UNREACHABLE;
+ }
+ }
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ std::unique_ptr<MatchExpression> _parseAndNormalize(BSONObj filterExpr) {
+ auto filter = uassertStatusOK(MatchExpressionParser::parse(filterExpr, pExpCtx));
+ filter = MatchExpression::optimize(std::move(filter));
+ CanonicalQuery::sortTree(filter.get());
+ return filter;
+ }
+
+ size_t _getNumDocsTested() {
+ return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested;
+ }
+
+ ChangeStreamOplogCollectionMock _collection;
+ std::unique_ptr<CollectionScan> _collScan;
+ CollectionScanParams _params;
+
+ std::unique_ptr<MatchExpression> _filter;
+ BSONObj _filterExpr;
+
+ WorkingSet _ws;
+};
+
class CheckResumeTokenTest : public AggregationContextFixture {
public:
- CheckResumeTokenTest() : _mock(DocumentSourceMock::createForTest()) {}
+ CheckResumeTokenTest() : _mock(make_intrusive<DocumentSourceChangeStreamMock>(getExpCtx())) {
+ DatabaseHolder::set(getServiceContext(),
+ std::make_unique<ChangeStreamDatabaseHolderMock>());
+ }
protected:
/**
* Pushes a document with a resume token corresponding to the given ResumeTokenData into the
- * mock queue.
+ * mock queue. This document will have an ns field that matches the test namespace, and will
+ * appear in the change stream pipeline if its timestamp is at or after the resume timestamp.
*/
- void addDocument(ResumeTokenData tokenData) {
- _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}});
+ void addOplogEntryOnTestNS(ResumeTokenData tokenData) {
+ _mock->push_back(Document{{"ns", kTestNs},
+ {"ts", tokenData.clusterTime},
+ {"_id", ResumeToken(std::move(tokenData)).toDocument()}});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* txnOpIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(
+ void addOplogEntryOnTestNS(
Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) {
- return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)});
+ return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, version,
* txnOpIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, docKey, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid);
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, _id string, and
* namespace into the mock queue.
*/
- void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- addDocument(ts, 0, 0, Document{{"_id", id}}, uuid);
+ void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) {
+ addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid);
}
+ /**
+ * Pushes a document that does not match the test namespace into the mock oplog. This will be
+ * examined by the oplog CollectionScan but will not produce an event in the pipeline.
+ */
+ void addOplogEntryOnOtherNS(Timestamp ts) {
+ _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}});
+ }
+
+ /**
+ * Pushes a pause in execution into the pipeline queue.
+ */
void addPause() {
_mock->push_back(DocumentSource::GetNextResult::makePauseExecution());
}
@@ -100,10 +340,11 @@ protected:
/**
* Convenience method to create the class under test with a given ResumeTokenData.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
ResumeTokenData tokenData) {
auto checkResumeToken =
- DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData));
+ DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
@@ -112,13 +353,13 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts,
int version,
std::size_t txnOpIndex,
boost::optional<Document> docKey,
UUID uuid) {
- return createCheckResumeToken(
+ return createDSEnsureResumeTokenPresent(
{ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
}
@@ -126,18 +367,18 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, docKey, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid);
}
/**
* Convenience method to create the class under test with a given timestamp, _id string, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid);
+ return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid);
}
/**
@@ -149,28 +390,28 @@ protected:
return *uuid_gen;
}
- intrusive_ptr<DocumentSourceMock> _mock;
+ intrusive_ptr<DocumentSourceChangeStreamMock> _mock;
};
-class ShardCheckResumabilityTest : public CheckResumeTokenTest {
+class CheckResumabilityTest : public CheckResumeTokenTest {
protected:
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(
ResumeTokenData tokenData) {
- auto shardCheckResumability =
- DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData);
- shardCheckResumability->setSource(_mock.get());
- return shardCheckResumability;
+ auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData);
+ _mock->setResumeToken(std::move(tokenData));
+ dsCheckResumability->setSource(_mock.get());
+ return dsCheckResumability;
}
- intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) {
- return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
+ intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) {
+ return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
}
};
TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We should not see the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -178,9 +419,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
addPause();
- addDocument(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
// We see the pause we inserted, but not the resume token.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -191,10 +432,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp doc1Timestamp(100, 2);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
addPause();
- addDocument(doc1Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "2");
// Pause added explicitly.
ASSERT_TRUE(checkResumeToken->getNext().isPaused());
@@ -209,13 +450,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
Timestamp doc1Timestamp(100, 2);
Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
auto result1 = checkResumeToken->getNext();
ASSERT_TRUE(result1.isAdvanced());
@@ -229,23 +470,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken)
}
TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
- Timestamp resumeTimestamp(100, 1);
+ // The first timestamp in the oplog precedes the resume token, and the second matches it...
+ Timestamp doc1Timestamp(100, 1);
+ Timestamp resumeTimestamp(100, 2);
+ Timestamp doc2Timestamp = resumeTimestamp;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
- Timestamp doc1Timestamp(100, 2);
- Timestamp doc2Timestamp(101, 1);
- addDocument(doc1Timestamp, "1");
- addDocument(doc2Timestamp, "2");
+ // ... but there's no entry in the oplog that matches the full token.
+ addOplogEntryOnTestNS(doc1Timestamp, "1");
+ addOplogEntryOnTestNS(doc2Timestamp, "2");
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
-TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) {
+TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1");
+
+ // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it
+ // but don't throw - we haven't surpassed the token yet and still may see it in the next doc.
+ addOplogEntryOnTestNS(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -253,9 +499,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
Timestamp resumeTimestamp(100, 1);
auto resumeTokenUUID = UUID::gen();
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID);
auto otherUUID = UUID::gen();
- addDocument(resumeTimestamp, "1", otherUUID);
+ addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
}
@@ -266,9 +512,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc");
// We must not see the following document.
- addDocument(resumeTimestamp, "ABC");
+ addOplogEntryOnTestNS(resumeTimestamp, "ABC");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -280,13 +526,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
Timestamp doc1Timestamp(100, 1);
- addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
Timestamp doc2Timestamp(100, 2);
Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}};
- addDocument(doc2Timestamp, doc2DocKey);
+ addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey);
// We should skip doc1 since it satisfies the resume token, and retrieve doc2.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -301,10 +548,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -319,10 +567,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen
getExpCtx()->inMongos = true;
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -335,10 +583,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject)
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none);
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -351,11 +599,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) {
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}});
+ auto checkResumeToken =
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
- addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
- addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}});
+ addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}});
+ addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}});
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -380,20 +629,20 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the higher-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime but a lower UUID. One of the documents has
// a lower docKey than the resume token, the other has a higher docKey; this demonstrates that
// the UUID is the discriminating factor.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and UUID whose docKey sorts after the token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -418,13 +667,13 @@ TEST_F(CheckResumeTokenTest,
// Create the resume token using the lower-sorting UUID.
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]);
// Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed
// by a document which matches the resume token. This is not possible in practice, but it serves
// to demonstrate that the resume attempt fails even when the resume token is present.
- addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
- addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]);
ASSERT_THROWS_CODE(
checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
@@ -448,18 +697,18 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Add three documents which each have the invalidate resume token. We expect to see this in the
// event that we are starting after an invalidate and the invalidating event occurred on several
// shards at the same clusterTime.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
// Add a document representing an insert which recreated the collection after it was dropped.
auto expectedDocKey = Document{{"_id"_sd, 1}};
- addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(Timestamp{100, 2}, expectedDocKey, uuids[1]);
// DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
// and the next two invalidates, and return the insert event after the collection drop.
@@ -488,7 +737,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Create a second invalidate token with the same clusterTime but a different UUID.
auto unrelatedInvalidateToken = invalidateToken;
@@ -497,12 +746,12 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
// Add three documents which each have the invalidate resume token. We expect to see this in the
// event that we are starting after an invalidate and the invalidating event occurred on several
// shards at the same clusterTime.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
+ addOplogEntryOnTestNS(invalidateToken);
// Add a fourth document which has the unrelated invalidate at the same clusterTime.
- addDocument(unrelatedInvalidateToken);
+ addOplogEntryOnTestNS(unrelatedInvalidateToken);
// DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it
// and the next two invalidates, but decline to swallow the unrelated invalidate.
@@ -513,39 +762,6 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken);
}
-TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) {
- Timestamp resumeTimestamp(100, 1);
-
- // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS.
- // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica
- // set, since this should never occur.
- getExpCtx()->inMongos = false;
-
- // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
- // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
- // stream after the old stream is invalidated.
- ResumeTokenData invalidateToken;
- invalidateToken.clusterTime = resumeTimestamp;
- invalidateToken.uuid = testUuid();
- invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- auto checkResumeToken = createCheckResumeToken(invalidateToken);
-
- // Add three documents which each have the invalidate resume token.
- addDocument(invalidateToken);
- addDocument(invalidateToken);
- addDocument(invalidateToken);
-
- // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow
- // it. However, it should not swallow the subsequent two invalidates.
- for (size_t i = 0; i < 2; ++i) {
- const auto nextDocAfterResume = checkResumeToken->getNext();
- const auto tokenFromNextDocAfterResume =
- ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData();
- ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken);
- }
- ASSERT(checkResumeToken->getNext().isEOF());
-}
-
TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
Timestamp resumeTimestamp(100, 1);
@@ -555,21 +771,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
std::sort(uuids.begin(), uuids.end());
auto checkResumeToken =
- createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
+ createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
// Add two documents which have the same clusterTime and version but a lower applyOps index. One
// of the documents has a lower uuid than the resume token, the other has a higher uuid; this
// demonstrates that the applyOps index is the discriminating factor.
- addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
- addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
// Add a third document that matches the resume token.
- addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
// Add a fourth document with the same timestamp and version whose applyOps sorts after the
// resume token.
auto expectedDocKey = Document{{"_id"_sd, 3}};
- addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
+ addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
// We should skip the first two docs, swallow the resume token, and return the fourth doc.
const auto firstDocAfterResume = checkResumeToken->getNext();
@@ -586,277 +802,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) {
TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) {
Timestamp resumeTimestamp(100, 1);
- auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0");
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
-/**
- * A mock MongoProcessInterface which allows mocking a foreign pipeline.
- */
-class MockMongoInterface final : public StubMongoProcessInterface {
-public:
- MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
- : _mockResults(std::move(mockResults)) {}
-
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
- return false;
- }
-
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
- }
-
- return pipeline;
- }
-
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) final {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
- pipeline->addInitialSource(DocumentSourceMock::createForTest(_mockResults));
- return pipeline;
- }
-
-private:
- deque<DocumentSource::GetNextResult> _mockResults;
-};
-
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) {
Timestamp resumeTimestamp(100, 1);
- Timestamp oplogTimestamp(100, 2);
+ Timestamp oplogTimestamp(100, 1);
- ResumeTokenData tokenData(
- resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(tokenData);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(resumeTimestamp, "ID");
// We should see the resume token.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) {
Timestamp resumeTimestamp(100, 1);
- ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- addDocument(resumeTimestamp, "ID");
- // We should see the resume token.
+ // As with other tailable cursors, starting a change stream on an empty capped collection will
+ // cause the cursor to immediately and permanently EOF. This should never happen in practice,
+ // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING
+ // states, and there must be at least one entry in the oplog in order to reach those states.
+ auto shardCheckResumability = createDSCheckResumability(resumeTimestamp);
auto result = shardCheckResumability->getNext();
- ASSERT_TRUE(result.isAdvanced());
- auto& doc = result.getDocument();
- ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+ ASSERT_TRUE(result.isEOF());
+ ASSERT_TRUE(_mock->isPermanentlyEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
- ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isEOF());
+}
+
+TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) {
Timestamp resumeTimestamp(100, 2);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
-TEST_F(ShardCheckResumabilityTest,
+TEST_F(CheckResumabilityTest,
+ ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) {
+ Timestamp oplogTimestamp(100, 1);
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result = dsCheckResumability->getNext();
+ ASSERT_TRUE(result.isAdvanced());
+ auto& doc = result.getDocument();
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
+}
+
+TEST_F(CheckResumabilityTest,
ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) {
Timestamp resumeTimestamp(100, 1);
Timestamp oplogTimestamp(100, 2);
Timestamp docTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ ASSERT_THROWS_CODE(
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
+}
+
+TEST_F(CheckResumabilityTest,
+ ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp oplogTimestamp(100, 2);
+ Timestamp docTimestamp(100, 3);
+
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ // Confirm that there are two documents queued in the mock oplog.
+ ASSERT_EQ(_mock->size(), 2u);
ASSERT_THROWS_CODE(
- shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
+ dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost);
+ // Confirm that only the first document was read before the assertion was thrown.
+ ASSERT_EQ(_mock->size(), 1u);
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
Timestamp docTimestamp(100, 4);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- addDocument(docTimestamp, "ID");
- deque<DocumentSource::GetNextResult> mockOplog(
- {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ addOplogEntryOnTestNS(docTimestamp, "ID");
+
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
- auto result2 = shardCheckResumability->getNext();
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
+TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
Timestamp oplogTimestamp(100, 1);
Timestamp resumeTimestamp(100, 2);
Timestamp oplogFutureTimestamp(100, 3);
- auto shardCheckResumability = createShardCheckResumability(resumeTimestamp);
- deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result1 = shardCheckResumability->getNext();
+ auto dsCheckResumability = createDSCheckResumability(resumeTimestamp);
+ addOplogEntryOnOtherNS(oplogTimestamp);
+ auto result1 = dsCheckResumability->getNext();
ASSERT_TRUE(result1.isEOF());
- mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
- auto result2 = shardCheckResumability->getNext();
+ addOplogEntryOnOtherNS(oplogFutureTimestamp);
+ auto result2 = dsCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add the resume token, plus one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "3");
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "3");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the resume token...
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto doc = result.getDocument();
ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData());
// ... then the post-token event, and then finally EOF.
- result = shardCheckResumability->getNext();
+ result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument()}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument();
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
-TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
+TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
- auto shardCheckResumability = createShardCheckResumability(token);
- deque<DocumentSource::GetNextResult> mockOplog;
- getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog);
+ auto dsCheckResumability = createDSCheckResumability(token);
// Add 2 events at the same clusterTime as the resume token but whose docKey sort before it.
- addDocument(resumeTimestamp, "1");
- addDocument(resumeTimestamp, "2");
+ addOplogEntryOnTestNS(resumeTimestamp, "1");
+ addOplogEntryOnTestNS(resumeTimestamp, "2");
// Add one further event whose docKey sorts after the token.
- addDocument(resumeTimestamp, "4");
+ addOplogEntryOnTestNS(resumeTimestamp, "4");
// The first event we see should be the post-token event, followed by EOF.
- auto result = shardCheckResumability->getNext();
+ auto result = dsCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
- Document postResumeTokenDoc{
- {"_id",
- ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
- .toDocument()}};
- ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc);
- ASSERT_TRUE(shardCheckResumability->getNext().isEOF());
+ auto postResumeTokenDoc =
+ ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})})
+ .toDocument();
+ ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc);
+ ASSERT_TRUE(dsCheckResumability->getNext().isEOF());
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 86e9ebda0ee..8cd2b7a6162 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -47,6 +47,10 @@ const char* DocumentSourceMock::getSourceName() const {
return "mock";
}
+const size_t DocumentSourceMock::size() const {
+ return _queue.size();
+}
+
intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest(Document doc) {
return new DocumentSourceMock({std::move(doc)});
}
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 738fc802cc4..e660dadab18 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -71,6 +71,8 @@ public:
const char* getSourceName() const override;
+ const size_t size() const;
+
void reattachToOperationContext(OperationContext* opCtx) {
isDetachedFromOpCtx = false;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 1e93bd3d0c9..75d9948ba05 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -679,7 +679,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) {
invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
+ QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
if (rewrittenGroupStage) {
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 642a48fbd5b..eb0f93d823c 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -722,11 +722,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
LOG(3) << "Using direct oplog seek";
params.start = *startLoc;
}
+ params.minTs = minTs;
params.maxTs = maxTs;
params.direction = CollectionScanParams::FORWARD;
params.tailable = cq->getQueryRequest().isTailable();
params.shouldTrackLatestOplogTimestamp =
plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ params.assertMinTsHasNotFallenOffOplog =
+ plannerOptions & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
params.shouldWaitForOplogVisibility =
shouldWaitForOplogVisibility(opCtx, collection, params.tailable);
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index 7065fa25e9b..a2d61a4fd1b 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -154,6 +154,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
csn->tailable = tailable;
csn->shouldTrackLatestOplogTimestamp =
params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
+ csn->assertMinTsHasNotFallenOffOplog =
+ params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG;
csn->shouldWaitForOplogVisibility =
params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE;
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index 316044294ee..2e48896c24d 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -135,6 +135,9 @@ string optionString(size_t options) {
case QueryPlannerParams::STRICT_DISTINCT_ONLY:
ss << "STRICT_DISTINCT_ONLY ";
break;
+ case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG:
+ ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG ";
+ break;
case QueryPlannerParams::ENUMERATE_OR_CHILDREN_LOCKSTEP:
ss << "ENUMERATE_OR_CHILDREN_LOCKSTEP ";
break;
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index f8848b2011e..1c3d86229a7 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -99,6 +99,10 @@ struct QueryPlannerParams {
// declaration of getExecutorDistinct() for more detail.
STRICT_DISTINCT_ONLY = 1 << 11,
+ // Set this on an oplog scan to uassert that the oplog has not already rolled over the
+ // minimum 'ts' timestamp specified in the query.
+ ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 12,
+
// Instruct the plan enumerator to enumerate contained $ors in a special order. $or
// enumeration can generate an exponential number of plans, and is therefore limited at some
// arbitrary cutoff controlled by a parameter. When this limit is hit, the order of
@@ -114,7 +118,7 @@ struct QueryPlannerParams {
// order, we would get assignments [a_b, a_b], [a_c, a_c], [a_c, a_b], then [a_b, a_c]. This
// is thought to be helpful in general, but particularly in cases where all children of the
// $or use the same fields and have the same indexes available, as in this example.
- ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 12,
+ ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 13,
};
// See Options enum above.
diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp
index 3115496fce1..f29ba473d40 100644
--- a/src/mongo/db/query/query_solution.cpp
+++ b/src/mongo/db/query/query_solution.cpp
@@ -249,6 +249,7 @@ QuerySolutionNode* CollectionScanNode::clone() const {
copy->tailable = this->tailable;
copy->direction = this->direction;
copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp;
+ copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog;
copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility;
return copy;
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index 44c63d4bfb6..1f8ee232517 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -327,6 +327,9 @@ struct CollectionScanNode : public QuerySolutionNode {
// across a sharded cluster.
bool shouldTrackLatestOplogTimestamp = false;
+ // Should we assert that the specified minTS has not fallen off the oplog?
+ bool assertMinTsHasNotFallenOffOplog = false;
+
int direction;
// Whether or not to wait for oplog visibility on oplog collection scans.
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index ec3e47a04a6..c14054eebd4 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -78,6 +78,7 @@ PlanStage* buildStages(OperationContext* opCtx,
CollectionScanParams params;
params.tailable = csn->tailable;
params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp;
+ params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog;
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index a672e73f333..5deaf149a73 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -40,6 +40,11 @@ namespace mongo {
namespace repl {
/**
+ * The first oplog entry is a no-op with this message in its "msg" field.
+ */
+constexpr auto kInitiatingSetMsg = "initiating set"_sd;
+
+/**
* A parsed DurableReplOperation along with information about the operation that should only exist
* in-memory.
*
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index f3d7d478f1f..dd414df0b28 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -407,8 +407,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
Lock::GlobalWrite globalWrite(opCtx);
WriteUnitOfWork wuow(opCtx);
Helpers::putSingleton(opCtx, configCollectionName, config);
- const auto msgObj = BSON("msg"
- << "initiating set");
+ const auto msgObj = BSON("msg" << kInitiatingSetMsg);
_service->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
// ReplSetTest assumes that immediately after the replSetInitiate