diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-06-01 18:47:00 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-11 20:23:51 +0000 |
commit | 9e38cbba7d3efefa59e25cb0411558591036d30b (patch) | |
tree | c7afeda3e077f9c669950dec5396d21821f83ad3 | |
parent | dfb31874bf10e55c39e615ef86290b68ba0f43b3 (diff) | |
download | mongo-9e38cbba7d3efefa59e25cb0411558591036d30b.tar.gz |
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
(cherry picked from commit 694ed4153b9d5424b5d169fea5c68f99d4dfb45a)
22 files changed, 677 insertions, 476 deletions
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js index 517253bfcc0..426b0b3e3d3 100644 --- a/jstests/sharding/resume_change_stream.js +++ b/jstests/sharding/resume_change_stream.js @@ -124,14 +124,14 @@ function testResume(mongosColl, collToWatch) { db: mongosDB, collName: collToWatch, pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}], - expectedCode: ErrorCodes.ChangeStreamFatalError + expectedCode: ErrorCodes.ChangeStreamHistoryLost }); ChangeStreamTest.assertChangeStreamThrowsCode({ db: mongosDB, collName: collToWatch, pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}], - expectedCode: ErrorCodes.ChangeStreamFatalError + expectedCode: ErrorCodes.ChangeStreamHistoryLost }); // Test that the change stream can't resume if the resume token *is* present in the oplog, @@ -143,7 +143,7 @@ function testResume(mongosColl, collToWatch) { db: mongosDB, collName: collToWatch, pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], - expectedCode: ErrorCodes.ChangeStreamFatalError + expectedCode: ErrorCodes.ChangeStreamHistoryLost }); // Drop the collection. diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index efab12a5d34..dedeac665f8 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -294,6 +294,7 @@ error_code("WaitForMajorityServiceEarlierOpTimeAvailable", 289) error_code("NoQueryExecutionPlans", 291) error_code("HierarchicalAcquisitionLevelViolation", 297) error_code("PeriodicJobIsStopped", 310) +error_code("OplogQueryMinTsMissing", 326) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 0b3f9b1c23c..a901384fb7c 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -69,6 +69,12 @@ CollectionScan::CollectionScan(OperationContext* opCtx, _specificStats.maxTs = params.maxTs; invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. + if (params.assertMinTsHasNotFallenOffOplog) { + invariant(params.shouldTrackLatestOplogTimestamp); + invariant(params.minTs); + } + if (params.maxTs) { _endConditionBSON = BSON("$gte" << *(params.maxTs)); _endCondition = stdx::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName, @@ -141,19 +147,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } if (!record) { - // We just hit EOF. If we are tailable and have already returned data, leave us in a - // state to pick up where we left off on the next call to work(). Otherwise EOF is - // permanent. + // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up + // where we left off on the next call to work(). Otherwise, the EOF is permanent. if (_params.tailable && !_lastSeenId.isNull()) { _cursor.reset(); } else { _commonStats.isEOF = true; } - return PlanStage::IS_EOF; } _lastSeenId = record->id; + if (_params.assertMinTsHasNotFallenOffOplog) { + assertMinTsHasNotFallenOffOplog(*record); + } if (_params.shouldTrackLatestOplogTimestamp) { auto status = setLatestOplogEntryTimestamp(*record); if (!status.isOK()) { @@ -184,6 +191,25 @@ Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { return Status::OK(); } +void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) { + // If the first entry we see in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the specified minTs; no events earlier than the minTs can have + // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry + // is earlier than or equal to the minTs time. + auto swOplogEntry = repl::OplogEntry::parse(record.data.toBson()); + invariant(_specificStats.docsTested == 0); + invariant(swOplogEntry.isOK()); + auto oplogEntry = std::move(swOplogEntry.getValue()); + const bool isNewRS = + oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && + oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; + uassert(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog", + isNewRS || oplogEntry.getTimestamp() <= *_params.minTs); + // We don't need to check this assertion again after we've confirmed the first oplog event. + _params.assertMinTsHasNotFallenOffOplog = false; +} + PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index ee50293a5c9..fb182168d99 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -96,6 +96,11 @@ private: */ Status setLatestOplogEntryTimestamp(const Record& record); + /** + * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog. + */ + void assertMinTsHasNotFallenOffOplog(const Record& record); + // WorkingSet is not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index b655872f6fa..2e6fafce451 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -45,6 +45,10 @@ struct CollectionScanParams { // The RecordId to which we should seek to as the first document of the scan. RecordId start; + // If present, the collection scan will seek directly to the RecordId of an oplog entry as + // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans. + boost::optional<Timestamp> minTs; + // If present, the collection scan will stop and return EOF the first time it sees a document // that does not pass the filter and has 'ts' greater than 'maxTs'. boost::optional<Timestamp> maxTs; @@ -54,6 +58,9 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog // across a sharded cluster. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d11cd088fa1..73bfdbdb404 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -248,9 +248,12 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/catalog/catalog_impl', + '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/query/router_exec_stage', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 9050b9990dd..012c16ffbaa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -383,7 +383,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'. startFrom = tokenData.clusterTime; if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) { - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData); + resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData); } else { resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData); } @@ -394,7 +394,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression "Only one type of resume option is allowed, but multiple were found.", !resumeStage); startFrom = *startAtOperationTime; - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); + resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom); } // There might not be a starting point if we're on mongos, otherwise we should either have a diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index cb7e1484d2c..46168e1ac9a 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -36,7 +36,7 @@ using boost::intrusive_ptr; namespace mongo { namespace { -using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; +using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus; // Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, @@ -84,10 +84,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) { // This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the // applyOps was irrelevant (meaning it was an operation on a collection or DB not being - // watched). If we are looking for the resume token on a shard then this simply means that - // the resume token may be on a different shard; otherwise, it indicates a corrupt token. - uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge); - // We are running on a merging shard. Signal that we have read beyond the resume token. + // watched). Signal that we have read beyond the resume token. return ResumeStatus::kSurpassedToken; } @@ -167,16 +164,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } } // namespace -const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { - return "$_ensureResumeTokenPresent"; -} - -Value DocumentSourceEnsureResumeTokenPresent::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // This stage is created by the DocumentSourceChangeStream stage, so serializing it here - // would result in it being created twice. - return Value(); -} +DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSourceCheckResumability(expCtx, std::move(token)) {} intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx, @@ -184,172 +174,125 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); } -DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { + return kStageName.rawData(); +} DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { pExpCtx->checkForInterrupt(); + // If we have already verified the resume token is present, return the next doc immediately. if (_resumeStatus == ResumeStatus::kSurpassedToken) { - // We've already verified the resume token is present. return pSource->getNext(); } - // The incoming documents are sorted by resume token. We examine a range of documents that have - // the same clusterTime as the client's resume token, until we either find (and swallow) a match - // for the token or pass the point in the stream where it should have been. + auto nextInput = GetNextResult::makeEOF(); + + // If we are starting after an 'invalidate' and the invalidating command (e.g. collection drop) + // occurred at the same clusterTime on more than one shard, then we may see multiple identical + // resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken. while (_resumeStatus != ResumeStatus::kSurpassedToken) { - auto nextInput = pSource->getNext(); + // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This + // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token. + nextInput = DocumentSourceCheckResumability::getNext(); - // If there are no more results, return EOF. We will continue checking for the client's - // resume token the next time the getNext method is called. + // If there are no more results, return EOF. We will continue checking for the resume token + // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed + // the resume token on this iteration. if (!nextInput.isAdvanced()) { + invariant(_resumeStatus != ResumeStatus::kSurpassedToken); return nextInput; } - // Check the current event. If we found and swallowed the resume token, then the result will - // be the first event in the stream which should be returned to the user. Otherwise, we keep - // iterating the stream until we find an event matching the client's resume token. - if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) { - return *nextOutput; - } - } - MONGO_UNREACHABLE; -} -boost::optional<DocumentSource::GetNextResult> -DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken( - const DocumentSource::GetNextResult& nextInput) { - // We should only ever call this method when we have a new event to examine. - invariant(nextInput.isAdvanced()); - auto resumeStatus = - compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); - switch (resumeStatus) { - case ResumeStatus::kCheckNextDoc: - return boost::none; - case ResumeStatus::kFoundToken: - // We found the resume token. If we are starting after an 'invalidate' token and the - // invalidating command (e.g. collection drop) occurred at the same clusterTime on - // more than one shard, then we will see multiple identical 'invalidate' events - // here. We should continue to swallow all of them to ensure that the new stream - // begins after the collection drop, and that it is not immediately re-invalidated. - if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) { - _resumeStatus = ResumeStatus::kFoundToken; - return boost::none; - } - // If the token is not an invalidate or if we are not running in a cluster, we mark - // the stream as having surpassed the resume token, skip the current event since the - // client has already seen it, and return the next event in the stream. - _resumeStatus = ResumeStatus::kSurpassedToken; - return pSource->getNext(); - case ResumeStatus::kSurpassedToken: - // If we have surpassed the point in the stream where the resume token should have - // been and we did not see the token itself, then this stream cannot be resumed. - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "cannot resume stream; the resume token was not found. " - << nextInput.getDocument()["_id"].getDocument().toString(), - _resumeStatus == ResumeStatus::kFoundToken); - _resumeStatus = ResumeStatus::kSurpassedToken; - return nextInput; + // When we reach here, we have either found the resume token or surpassed it. + invariant(_resumeStatus != ResumeStatus::kCheckNextDoc); + + // If the resume status is kFoundToken, record the fact that we have seen the token. When we + // have surpassed the resume token, we will assert that we saw the token before doing so. We + // cannot simply assert once and then assume we have surpassed the token, because in certain + // cases we may see 1..N identical tokens and must swallow them all before proceeding. + _hasSeenResumeToken = (_hasSeenResumeToken || _resumeStatus == ResumeStatus::kFoundToken); } - MONGO_UNREACHABLE; -} -const char* DocumentSourceShardCheckResumability::getSourceName() const { - return "$_checkShardResumability"; -} + // Assert that before surpassing the resume token, we observed the token itself in the stream. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "cannot resume stream; the resume token was not found. " + << nextInput.getDocument()["_id"].getDocument().toString(), + _hasSeenResumeToken); -Value DocumentSourceShardCheckResumability::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // This stage is created by the DocumentSourceChangeStream stage, so serializing it here - // would result in it being created twice. - return Value(); + // At this point, we have seen the token and swallowed it. Return the next event to the client. + invariant(_hasSeenResumeToken && _resumeStatus == ResumeStatus::kSurpassedToken); + return nextInput; } -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +DocumentSourceCheckResumability::DocumentSourceCheckResumability( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} + +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { // We are resuming from a point in time, not an event. Seed the stage with a high water mark. return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData()); } -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceShardCheckResumability(expCtx, std::move(token)); + return new DocumentSourceCheckResumability(expCtx, std::move(token)); } -DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceCheckResumability::getSourceName() const { + return kStageName.rawData(); +} -DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { +DocumentSource::GetNextResult DocumentSourceCheckResumability::getNext() { pExpCtx->checkForInterrupt(); - if (_surpassedResumeToken) + if (_resumeStatus == ResumeStatus::kSurpassedToken) { return pSource->getNext(); + } - while (!_surpassedResumeToken) { - auto nextInput = pSource->getNext(); + while (_resumeStatus != ResumeStatus::kSurpassedToken) { + // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change + // stream filter has fallen off the oplog. Catch this and throw a more explanatory error. + auto nextInput = [this]() { + try { + return pSource->getNext(); + } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) { + uasserted(ErrorCodes::ChangeStreamHistoryLost, + "Resume of change stream was not possible, as the resume point may no " + "longer be in the oplog."); + } + }(); - // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us - // from continually returning EOF in cases where the resume point has fallen off the oplog. + // If we hit EOF, return it immediately. if (!nextInput.isAdvanced()) { - _assertOplogHasEnoughHistory(nextInput); return nextInput; } + // Determine whether the current event sorts before, equal to or after the resume token. - auto resumeStatus = + _resumeStatus = compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); - switch (resumeStatus) { + switch (_resumeStatus) { case ResumeStatus::kCheckNextDoc: // If the result was kCheckNextDoc, we are resumable but must swallow this event. - _verifiedOplogHasEnoughHistory = true; continue; case ResumeStatus::kSurpassedToken: - // In this case the resume token wasn't found; it must be on another shard. We must - // examine the oplog to ensure that its history reaches back to before the resume - // token, otherwise we may have missed events that fell off the oplog. If we can - // resume, fall through into the following case and set _surpassedResumeToken. - _assertOplogHasEnoughHistory(nextInput); + // In this case the resume token wasn't found; it may be on another shard. However, + // since the oplog scan did not throw, we know that we are resumable. Fall through + // into the following case and return the document. case ResumeStatus::kFoundToken: - // We found the actual token! Set _surpassedResumeToken and return the result. - _surpassedResumeToken = true; + // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it. return nextInput; } } MONGO_UNREACHABLE; } -void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( - const GetNextResult& nextInput) { - // If we have already verified that this stream is resumable, return immediately. - if (_verifiedOplogHasEnoughHistory) { - return; - } - // Look up the first document in the oplog and compare it with the resume token's clusterTime. - auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); - auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); - if (auto first = pipeline->getNext()) { - auto firstOplogEntry = Value(*first); - // If the first entry in the oplog is the replset initialization, then it doesn't matter - // if its timestamp is later than the resume token. No events earlier than the token can - // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that - // the timestamp of the first oplog entry is earlier than that of the resume token. - const bool isNewRS = - Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 && - Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0; - uassert(ErrorCodes::ChangeStreamFatalError, - "Resume of change stream was not possible, as the resume point may no longer be in " - "the oplog. ", - isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); - } else { - // Very unusual case: the oplog is empty. We can always resume. However, it should never be - // possible to have obtained a document that matched the filter if the oplog is empty. - uassert(ErrorCodes::ChangeStreamFatalError, - "Oplog was empty but found an event in the change stream pipeline. It should not " - "be possible for this to happen", - nextInput.isEOF()); - } - _verifiedOplogHasEnoughHistory = true; +Value DocumentSourceCheckResumability::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + // We only serialize this stage in the context of explain. + return explain ? Value(DOC(getSourceName() + << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) + : Value(); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 8c90a88b564..b70c6dd8228 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -39,30 +39,42 @@ namespace mongo { /** - * This checks for resumability on a single shard in the sharded case. The rules are + * This stage checks whether or not the oplog has enough history to resume the stream, and consumes + * all events up to the given resume point. It is deployed on all shards when resuming a stream on + * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with + * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check + * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal + * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an + * assertion, which this stage catches and converts into a more comprehensible $changeStream + * specific exception. The rules are: * - * - If the first document in the pipeline for this shard has a matching timestamp, we can - * always resume. - * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur - * on a secondary that has just started up from a primary that has not taken a write. - * In particular, an empty oplog cannot be the result of oplog truncation. - * - If neither of the above is true, the least-recent document in the oplog must precede the resume - * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or - * after seeing that there are no documents in the pipeline after the resume token in the shard, - * we're guaranteed not to miss any documents. + * - If the first event seen in the oplog has the same timestamp as the requested resume token or + * startAtOperationTime, we can resume. + * - If the timestamp of the first event seen in the oplog is earlier than the requested resume + * token or startAtOperationTime, we can resume. + * - If the first entry in the oplog is a replica set initialization, then we can resume even if the + * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen + * in a sharded cluster when a new shard is added. * - * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume - * token and the first matching document in the pipeline. - * - * This source need only run on a sharded collection. For unsharded collections, - * DocumentSourceEnsureResumeTokenPresent is sufficient. + * - Otherwise we cannot resume, as we do not know if there were any events between the resume token + * and the first matching document in the oplog. */ -class DocumentSourceShardCheckResumability final : public DocumentSource { +class DocumentSourceCheckResumability : public DocumentSource { public: - GetNextResult getNext() final; - const char* getSourceName() const final; + static constexpr StringData kStageName = "$_internalCheckResumability"_sd; + + // Used to record the results of comparing the token data extracted from documents in the + // resumed stream against the client's resume token. + enum class ResumeStatus { + kFoundToken, // The stream produced a document satisfying the client resume token. + kSurpassedToken, // The stream's latest document is more recent than the resume token. + kCheckNextDoc // The next document produced by the stream may contain the resume token. + }; + + GetNextResult getNext() override; + const char* getSourceName() const override; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(Pipeline::SplitState pipeState) const override { return {StreamType::kStreaming, PositionRequirement::kNone, HostTypeRequirement::kAnyShard, @@ -73,47 +85,38 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts); - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); -private: +protected: /** - * Use the create static method to create a DocumentSourceShardCheckResumability. + * Use the create static method to create a DocumentSourceCheckResumability. */ - DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token); - - void _assertOplogHasEnoughHistory(const GetNextResult& nextInput); + DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, + ResumeTokenData token); - ResumeTokenData _tokenFromClient; - bool _verifiedOplogHasEnoughHistory = false; - bool _surpassedResumeToken = false; + ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; + const ResumeTokenData _tokenFromClient; }; /** * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability { public: - // Used to record the results of comparing the token data extracted from documents in the - // resumed stream against the client's resume token. - enum class ResumeStatus { - kFoundToken, // The stream produced a document satisfying the client resume token. - kSurpassedToken, // The stream's latest document is more recent than the resume token. - kCheckNextDoc // The next document produced by the stream may contain the resume token. - }; + static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; - GetNextResult getNext() final; + GetNextResult getNext() override; const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState) const final { @@ -137,13 +140,11 @@ public: logic.mergingStage = this; // Also add logic to the shards to ensure that each shard has enough oplog history to resume // the change stream. - logic.shardsStage = DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient); + logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient); logic.inputSortPattern = change_stream_constants::kSortSpec; return logic; }; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); @@ -154,15 +155,8 @@ private: DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); - /** - * Check the given event to determine whether it matches the client's resume token. If so, we - * swallow this event and return the next event in the stream. Otherwise, return boost::none. - */ - boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken( - const DocumentSource::GetNextResult& nextInput); - - ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; - ResumeTokenData _tokenFromClient; + // Records whether we have observed the token in the resumed stream. + bool _hasSeenResumeToken = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index afe4758e056..fdf1bcbb14c 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -35,6 +35,11 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/collection_mock.h" +#include "mongo/db/catalog/database_holder_mock.h" +#include "mongo/db/catalog/database_impl.h" +#include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -44,55 +49,290 @@ #include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/devnull/devnull_kv_engine.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" using boost::intrusive_ptr; -using std::deque; namespace mongo { namespace { +static constexpr StringData kOtherNs = "test.other.ns"_sd; static constexpr StringData kTestNs = "test.ns"_sd; +class ChangeStreamOplogCursorMock : public SeekableRecordCursor { +public: + ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {} + + virtual ~ChangeStreamOplogCursorMock() {} + + boost::optional<Record> next() override { + if (_records->empty()) { + return boost::none; + } + auto& next = _records->front(); + _records->pop_front(); + return next; + } + + boost::optional<Record> seekExact(const RecordId& id) override { + return Record{}; + } + void save() override {} + bool restore() override { + return true; + } + void detachFromOperationContext() override {} + void reattachToOperationContext(OperationContext* opCtx) override {} + +private: + std::deque<Record>* _records; +}; + +class ChangeStreamOplogCollectionMock : public CollectionMock { +public: + ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) { + _recordStore = + _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {}); + } + + void push_back(Document doc) { + // Every entry we push into the oplog should have both 'ts' and 'ns' fields. + invariant(doc["ts"].getType() == BSONType::bsonTimestamp); + invariant(doc["ns"].getType() == BSONType::String); + // Events should always be added in ascending ts order. + auto lastTs = + _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp(); + invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0); + // Fill out remaining required fields in the oplog entry. + MutableDocument mutableDoc{doc}; + mutableDoc.setField("op", Value("n"_sd)); + mutableDoc.setField("o", Value(Document{})); + mutableDoc.setField("wall", + Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL()))); + // Must remove _id since the oplog expects either no _id or an OID. + mutableDoc.remove("_id"); + // Convert to owned BSON and create corresponding Records. + _data.push_back(mutableDoc.freeze().toBson()); + Record record; + record.data = {_data.back().objdata(), _data.back().objsize()}; + record.id = RecordId{static_cast<int64_t>(_data.size())}; + _records.push_back(std::move(record)); + } + + std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx, + bool forward) const override { + return std::make_unique<ChangeStreamOplogCursorMock>(&_records); + } + + const RecordStore* getRecordStore() const override { + return _recordStore.get(); + } + RecordStore* getRecordStore() override { + return _recordStore.get(); + } + +private: + // We retain the owned record queue here because cursors may be destroyed and recreated. + mutable std::deque<Record> _records; + std::deque<BSONObj> _data; + + // These are no-op structures which are required by the CollectionScan. + std::unique_ptr<RecordStore> _recordStore; + DevNullKVEngine _devNullEngine; +}; + +/** + * The RequiresCollectionStageBase class attempts to obtain the current epoch of the database + * containing the collection to be scanned (in this case, the oplog). Here we provide a dummy + * DatabaseHolder which will always return a valid pointer to the _database member variable. + */ +class ChangeStreamDatabaseHolderMock : public DatabaseHolderMock { +public: + ChangeStreamDatabaseHolderMock() : _database(std::make_unique<DatabaseImpl>("local"_sd, 0)){}; + + Database* getDb(OperationContext* opCtx, StringData ns) const override { + return _database.get(); + } + +private: + std::unique_ptr<Database> _database; +}; + +/** + * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This + * class maintains its own queue of documents added by each test, but also pushes each doc into an + * underlying ChangeStreamOplogCollectionMock. When getNext() is called, it retrieves the next + * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test + * the latter's changestream-specific functionality. The reason this class keeps its own queue in + * addition to the ChangeStreamOplogCollectionMock is twofold: + * + * - The _id must be stripped from each document before it can be added to the mocked oplog, since + * the _id field of the test document is a resume token but oplog entries are only permitted to + * have OID _ids. We therefore have to restore the _id field of the document pulled from the + * CollectionScan before passing it into the pipeline. + * + * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan; + * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise + * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue + * with no corresponding entry in the ChangeStreamOplogCollectionMock queue. + */ +class DocumentSourceChangeStreamMock : public DocumentSourceMock { +public: + DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) { + _filterExpr = BSON("ns" << kTestNs); + _filter = _parseAndNormalize(_filterExpr); + _params.assertMinTsHasNotFallenOffOplog = true; + _params.shouldTrackLatestOplogTimestamp = true; + _params.minTs = Timestamp(0, 0); + _params.tailable = true; + } + + void setResumeToken(ResumeTokenData resumeToken) { + invariant(!_collScan); + _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime)); + _filter = _parseAndNormalize(_filterExpr); + _params.minTs = resumeToken.clusterTime; + } + + void push_back(GetNextResult&& result) { + // We should never push an explicit EOF onto the queue. + invariant(!result.isEOF()); + // If there is a document supplied, add it to the mock collection. + if (result.isAdvanced()) { + _collection.push_back(result.getDocument()); + } + // Both documents and pauses are stored in the DSMock queue. + DocumentSourceMock::push_back(std::move(result)); + } + + void push_back(const GetNextResult& result) { + MONGO_UNREACHABLE; + } + + bool isPermanentlyEOF() const { + return _collScan->getCommonStats()->isEOF; + } + +protected: + GetNextResult getNext() override { + // If this is the first call to getNext, we must create the COLLSCAN. + if (!_collScan) { + _collScan = std::make_unique<CollectionScan>( + pExpCtx->opCtx, &_collection, _params, &_ws, _filter.get()); + // The first call to doWork will create the cursor and return NEED_TIME. But it won't + // actually scan any of the documents that are present in the mock cursor queue. + ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME); + ASSERT_EQ(_getNumDocsTested(), 0UL); + } + while (true) { + // If the next result is a pause, return it and don't collscan. + auto nextResult = DocumentSourceMock::getNext(); + if (nextResult.isPaused()) { + return nextResult; + } + // Otherwise, retrieve the document via the CollectionScan stage. + auto id = WorkingSet::INVALID_ID; + switch (_collScan->doWork(&id)) { + case PlanStage::IS_EOF: + invariant(nextResult.isEOF()); + return nextResult; + case PlanStage::ADVANCED: { + // We need to restore the _id field which was removed when we added this + // entry into the oplog. This is like a stripped-down DSCSTransform stage. + MutableDocument mutableDoc{Document{_ws.get(id)->obj.value()}}; + mutableDoc["_id"] = nextResult.getDocument()["_id"]; + return mutableDoc.freeze(); + } + case PlanStage::NEED_TIME: + continue; + case PlanStage::NEED_YIELD: + case PlanStage::FAILURE: + MONGO_UNREACHABLE; + } + } + MONGO_UNREACHABLE; + } + +private: + std::unique_ptr<MatchExpression> _parseAndNormalize(BSONObj filterExpr) { + auto filter = uassertStatusOK(MatchExpressionParser::parse(filterExpr, pExpCtx)); + filter = MatchExpression::optimize(std::move(filter)); + CanonicalQuery::sortTree(filter.get()); + return filter; + } + + size_t _getNumDocsTested() { + return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested; + } + + ChangeStreamOplogCollectionMock _collection; + std::unique_ptr<CollectionScan> _collScan; + CollectionScanParams _params; + + std::unique_ptr<MatchExpression> _filter; + BSONObj _filterExpr; + + WorkingSet _ws; +}; + class CheckResumeTokenTest : public AggregationContextFixture { public: - CheckResumeTokenTest() : _mock(DocumentSourceMock::createForTest()) {} + CheckResumeTokenTest() : _mock(make_intrusive<DocumentSourceChangeStreamMock>(getExpCtx())) { + DatabaseHolder::set(getServiceContext(), + std::make_unique<ChangeStreamDatabaseHolderMock>()); + } protected: /** * Pushes a document with a resume token corresponding to the given ResumeTokenData into the - * mock queue. + * mock queue. This document will have an ns field that matches the test namespace, and will + * appear in the change stream pipeline if its timestamp is at or after the resume timestamp. */ - void addDocument(ResumeTokenData tokenData) { - _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}}); + void addOplogEntryOnTestNS(ResumeTokenData tokenData) { + _mock->push_back(Document{{"ns", kTestNs}, + {"ts", tokenData.clusterTime}, + {"_id", ResumeToken(std::move(tokenData)).toDocument()}}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, * txnOpIndex, docKey, and namespace into the mock queue. */ - void addDocument( + void addOplogEntryOnTestNS( Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) { - return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)}); + return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, * txnOpIndex, docKey, and namespace into the mock queue. */ - void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, docKey, uuid); + void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid); } /** * Pushes a document with a resume token corresponding to the given timestamp, _id string, and * namespace into the mock queue. */ - void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, Document{{"_id", id}}, uuid); + void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid); } + /** + * Pushes a document that does not match the test namespace into the mock oplog. This will be + * examined by the oplog CollectionScan but will not produce an event in the pipeline. + */ + void addOplogEntryOnOtherNS(Timestamp ts) { + _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}}); + } + + /** + * Pushes a pause in execution into the pipeline queue. + */ void addPause() { _mock->push_back(DocumentSource::GetNextResult::makePauseExecution()); } @@ -100,10 +340,11 @@ protected: /** * Convenience method to create the class under test with a given ResumeTokenData. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( ResumeTokenData tokenData) { auto checkResumeToken = - DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData)); + DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); checkResumeToken->setSource(_mock.get()); return checkResumeToken; } @@ -112,13 +353,13 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, int version, std::size_t txnOpIndex, boost::optional<Document> docKey, UUID uuid) { - return createCheckResumeToken( + return createDSEnsureResumeTokenPresent( {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); } @@ -126,18 +367,18 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, docKey, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid); } /** * Convenience method to create the class under test with a given timestamp, _id string, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, StringData id, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid); } /** @@ -149,28 +390,28 @@ protected: return *uuid_gen; } - intrusive_ptr<DocumentSourceMock> _mock; + intrusive_ptr<DocumentSourceChangeStreamMock> _mock; }; -class ShardCheckResumabilityTest : public CheckResumeTokenTest { +class CheckResumabilityTest : public CheckResumeTokenTest { protected: - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability( + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability( ResumeTokenData tokenData) { - auto shardCheckResumability = - DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData); - shardCheckResumability->setSource(_mock.get()); - return shardCheckResumability; + auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); + dsCheckResumability->setSource(_mock.get()); + return dsCheckResumability; } - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) { - return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) { + return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); } }; TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We should not see the resume token. ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -178,9 +419,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); addPause(); - addDocument(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We see the pause we inserted, but not the resume token. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -191,10 +432,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { Timestamp resumeTimestamp(100, 1); Timestamp doc1Timestamp(100, 2); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); addPause(); - addDocument(doc1Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "2"); // Pause added explicitly. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -209,13 +450,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); + addOplogEntryOnTestNS(resumeTimestamp, "0"); Timestamp doc1Timestamp(100, 2); Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); auto result1 = checkResumeToken->getNext(); ASSERT_TRUE(result1.isAdvanced()); @@ -229,23 +470,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) } TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { - Timestamp resumeTimestamp(100, 1); + // The first timestamp in the oplog precedes the resume token, and the second matches it... + Timestamp doc1Timestamp(100, 1); + Timestamp resumeTimestamp(100, 2); + Timestamp doc2Timestamp = resumeTimestamp; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); - Timestamp doc1Timestamp(100, 2); - Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + // ... but there's no entry in the oplog that matches the full token. + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) { +TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + + // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it + // but don't throw - we haven't surpassed the token yet and still may see it in the next doc. + addOplogEntryOnTestNS(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -253,9 +499,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { Timestamp resumeTimestamp(100, 1); auto resumeTokenUUID = UUID::gen(); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID); auto otherUUID = UUID::gen(); - addDocument(resumeTimestamp, "1", otherUUID); + addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } @@ -266,9 +512,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc"); // We must not see the following document. - addDocument(resumeTimestamp, "ABC"); + addOplogEntryOnTestNS(resumeTimestamp, "ABC"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -280,13 +526,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); Timestamp doc1Timestamp(100, 1); - addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); Timestamp doc2Timestamp(100, 2); Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}}; - addDocument(doc2Timestamp, doc2DocKey); + addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey); // We should skip doc1 since it satisfies the resume token, and retrieve doc2. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -301,10 +548,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -319,10 +567,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen getExpCtx()->inMongos = true; auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -335,10 +583,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -351,11 +599,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) { Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -380,20 +629,20 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the higher-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime but a lower UUID. One of the documents has // a lower docKey than the resume token, the other has a higher docKey; this demonstrates that // the UUID is the discriminating factor. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and UUID whose docKey sorts after the token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -418,13 +667,13 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the lower-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); // Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed // by a document which matches the resume token. This is not possible in practice, but it serves // to demonstrate that the resume attempt fails even when the resume token is present. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -448,18 +697,18 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); + auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Add three documents which each have the invalidate resume token. We expect to see this in the // event that we are starting after an invalidate and the invalidating event occurred on several // shards at the same clusterTime. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); // Add a document representing an insert which recreated the collection after it was dropped. auto expectedDocKey = Document{{"_id"_sd, 1}}; - addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(Timestamp{100, 2}, expectedDocKey, uuids[1]); // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it // and the next two invalidates, and return the insert event after the collection drop. @@ -488,7 +737,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); + auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Create a second invalidate token with the same clusterTime but a different UUID. auto unrelatedInvalidateToken = invalidateToken; @@ -497,12 +746,12 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv // Add three documents which each have the invalidate resume token. We expect to see this in the // event that we are starting after an invalidate and the invalidating event occurred on several // shards at the same clusterTime. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); // Add a fourth document which has the unrelated invalidate at the same clusterTime. - addDocument(unrelatedInvalidateToken); + addOplogEntryOnTestNS(unrelatedInvalidateToken); // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it // and the next two invalidates, but decline to swallow the unrelated invalidate. @@ -513,39 +762,6 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken); } -TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) { - Timestamp resumeTimestamp(100, 1); - - // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS. - // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica - // set, since this should never occur. - getExpCtx()->inMongos = false; - - // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A - // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new - // stream after the old stream is invalidated. - ResumeTokenData invalidateToken; - invalidateToken.clusterTime = resumeTimestamp; - invalidateToken.uuid = testUuid(); - invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); - - // Add three documents which each have the invalidate resume token. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); - - // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow - // it. However, it should not swallow the subsequent two invalidates. - for (size_t i = 0; i < 2; ++i) { - const auto nextDocAfterResume = checkResumeToken->getNext(); - const auto tokenFromNextDocAfterResume = - ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData(); - ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken); - } - ASSERT(checkResumeToken->getNext().isEOF()); -} - TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { Timestamp resumeTimestamp(100, 1); @@ -555,21 +771,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { std::sort(uuids.begin(), uuids.end()); auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime and version but a lower applyOps index. One // of the documents has a lower uuid than the resume token, the other has a higher uuid; this // demonstrates that the applyOps index is the discriminating factor. - addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and version whose applyOps sorts after the // resume token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -586,277 +802,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } -/** - * A mock MongoProcessInterface which allows mocking a foreign pipeline. - */ -class MockMongoInterface final : public StubMongoProcessInterface { -public: - MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) - : _mockResults(std::move(mockResults)) {} - - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { - return false; - } - - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); - } - - return pipeline; - } - - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) final { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - pipeline->addInitialSource(DocumentSourceMock::createForTest(_mockResults)); - return pipeline; - } - -private: - deque<DocumentSource::GetNextResult> _mockResults; -}; - -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, + ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) { Timestamp resumeTimestamp(100, 1); - Timestamp oplogTimestamp(100, 2); + Timestamp oplogTimestamp(100, 1); - ResumeTokenData tokenData( - resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(tokenData); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) { Timestamp resumeTimestamp(100, 1); - ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); - // We should see the resume token. + // As with other tailable cursors, starting a change stream on an empty capped collection will + // cause the cursor to immediately and permanently EOF. This should never happen in practice, + // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING + // states, and there must be at least one entry in the oplog in order to reach those states. + auto shardCheckResumability = createDSCheckResumability(resumeTimestamp); auto result = shardCheckResumability->getNext(); - ASSERT_TRUE(result.isAdvanced()); - auto& doc = result.getDocument(); - ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); + ASSERT_TRUE(result.isEOF()); + ASSERT_TRUE(_mock->isPermanentlyEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, - ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, + ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isEOF()); +} + +TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, + ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isAdvanced()); + auto& doc = result.getDocument(); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); +} + +TEST_F(CheckResumabilityTest, ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + ASSERT_THROWS_CODE( + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); +} + +TEST_F(CheckResumabilityTest, + ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) { + Timestamp resumeTimestamp(100, 1); + Timestamp oplogTimestamp(100, 2); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + // Confirm that there are two documents queued in the mock oplog. + ASSERT_EQ(_mock->size(), 2u); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); + // Confirm that only the first document was read before the assertion was thrown. + ASSERT_EQ(_mock->size(), 1u); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog( - {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - auto result2 = shardCheckResumability->getNext(); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isEOF()); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add the resume token, plus one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "3"); - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "3"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the resume token... - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto doc = result.getDocument(); ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData()); // ... then the post-token event, and then finally EOF. - result = shardCheckResumability->getNext(); + result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument()}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the post-token event, followed by EOF. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument()}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index 86e9ebda0ee..8cd2b7a6162 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -47,6 +47,10 @@ const char* DocumentSourceMock::getSourceName() const { return "mock"; } +const size_t DocumentSourceMock::size() const { + return _queue.size(); +} + intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest(Document doc) { return new DocumentSourceMock({std::move(doc)}); } diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 738fc802cc4..e660dadab18 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -71,6 +71,8 @@ public: const char* getSourceName() const override; + const size_t size() const; + void reattachToOperationContext(OperationContext* opCtx) { isDetachedFromOpCtx = false; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 1e93bd3d0c9..75d9948ba05 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -679,7 +679,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) { invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); - plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | + QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } if (rewrittenGroupStage) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 642a48fbd5b..eb0f93d823c 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -722,11 +722,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( LOG(3) << "Using direct oplog seek"; params.start = *startLoc; } + params.minTs = minTs; params.maxTs = maxTs; params.direction = CollectionScanParams::FORWARD; params.tailable = cq->getQueryRequest().isTailable(); params.shouldTrackLatestOplogTimestamp = plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + params.assertMinTsHasNotFallenOffOplog = + plannerOptions & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, params.tailable); diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index 7065fa25e9b..a2d61a4fd1b 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -154,6 +154,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->tailable = tailable; csn->shouldTrackLatestOplogTimestamp = params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + csn->assertMinTsHasNotFallenOffOplog = + params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; csn->shouldWaitForOplogVisibility = params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index 316044294ee..2e48896c24d 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -135,6 +135,9 @@ string optionString(size_t options) { case QueryPlannerParams::STRICT_DISTINCT_ONLY: ss << "STRICT_DISTINCT_ONLY "; break; + case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG: + ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG "; + break; case QueryPlannerParams::ENUMERATE_OR_CHILDREN_LOCKSTEP: ss << "ENUMERATE_OR_CHILDREN_LOCKSTEP "; break; diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index f8848b2011e..1c3d86229a7 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -99,6 +99,10 @@ struct QueryPlannerParams { // declaration of getExecutorDistinct() for more detail. STRICT_DISTINCT_ONLY = 1 << 11, + // Set this on an oplog scan to uassert that the oplog has not already rolled over the + // minimum 'ts' timestamp specified in the query. + ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 12, + // Instruct the plan enumerator to enumerate contained $ors in a special order. $or // enumeration can generate an exponential number of plans, and is therefore limited at some // arbitrary cutoff controlled by a parameter. When this limit is hit, the order of @@ -114,7 +118,7 @@ struct QueryPlannerParams { // order, we would get assignments [a_b, a_b], [a_c, a_c], [a_c, a_b], then [a_b, a_c]. This // is thought to be helpful in general, but particularly in cases where all children of the // $or use the same fields and have the same indexes available, as in this example. - ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 12, + ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 13, }; // See Options enum above. diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 3115496fce1..f29ba473d40 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -249,6 +249,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->tailable = this->tailable; copy->direction = this->direction; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; + copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog; copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; return copy; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 44c63d4bfb6..1f8ee232517 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -327,6 +327,9 @@ struct CollectionScanNode : public QuerySolutionNode { // across a sharded cluster. bool shouldTrackLatestOplogTimestamp = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + int direction; // Whether or not to wait for oplog visibility on oplog collection scans. diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index ec3e47a04a6..c14054eebd4 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -78,6 +78,7 @@ PlanStage* buildStages(OperationContext* opCtx, CollectionScanParams params; params.tailable = csn->tailable; params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; + params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index a672e73f333..5deaf149a73 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -40,6 +40,11 @@ namespace mongo { namespace repl { /** + * The first oplog entry is a no-op with this message in its "msg" field. + */ +constexpr auto kInitiatingSetMsg = "initiating set"_sd; + +/** * A parsed DurableReplOperation along with information about the operation that should only exist * in-memory. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index f3d7d478f1f..dd414df0b28 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -407,8 +407,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati Lock::GlobalWrite globalWrite(opCtx); WriteUnitOfWork wuow(opCtx); Helpers::putSingleton(opCtx, configCollectionName, config); - const auto msgObj = BSON("msg" - << "initiating set"); + const auto msgObj = BSON("msg" << kInitiatingSetMsg); _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); // ReplSetTest assumes that immediately after the replSetInitiate |