diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-06-01 18:47:00 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-27 13:09:38 +0000 |
commit | 694ed4153b9d5424b5d169fea5c68f99d4dfb45a (patch) | |
tree | b3cffb5dce360007663e53a2aba68a77f89fdf86 /src | |
parent | 1d3972cea1ae1a35e398fe61882cd455d78c01d1 (diff) | |
download | mongo-694ed4153b9d5424b5d169fea5c68f99d4dfb45a.tar.gz |
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
Diffstat (limited to 'src')
19 files changed, 623 insertions, 457 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 2ee434d2c08..fe34bf4f7d8 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -395,6 +395,7 @@ error_codes: - {code: 325, name: TenantMigrationAborted} + - {code: 326, name: OplogQueryMinTsMissing} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index 831d11d57f3..1466108d5ff 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -38,7 +38,7 @@ namespace mongo { /** * This class comprises a mock Collection for use by CollectionCatalog unit tests. */ -class CollectionMock final : public Collection { +class CollectionMock : public Collection { public: CollectionMock(const NamespaceString& ns) : CollectionMock(ns, std::unique_ptr<IndexCatalog>()) {} diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 00547983fc8..ffabd1886b4 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -79,6 +79,12 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, } invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. + if (params.assertMinTsHasNotFallenOffOplog) { + invariant(params.shouldTrackLatestOplogTimestamp); + invariant(params.minTs); + } + if (params.resumeAfterRecordId) { // The 'resumeAfterRecordId' parameter is used for resumable collection scans, which we // only support in the forward direction. @@ -187,19 +193,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } if (!record) { - // We just hit EOF. If we are tailable and have already returned data, leave us in a - // state to pick up where we left off on the next call to work(). Otherwise EOF is - // permanent. + // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up + // where we left off on the next call to work(). Otherwise, the EOF is permanent. if (_params.tailable && !_lastSeenId.isNull()) { _cursor.reset(); } else { _commonStats.isEOF = true; } - return PlanStage::IS_EOF; } _lastSeenId = record->id; + if (_params.assertMinTsHasNotFallenOffOplog) { + assertMinTsHasNotFallenOffOplog(*record); + } if (_params.shouldTrackLatestOplogTimestamp) { setLatestOplogEntryTimestamp(*record); } @@ -223,6 +230,23 @@ void CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { _latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp()); } +void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) { + // If the first entry we see in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the specified minTs; no events earlier than the minTs can have + // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry + // is earlier than or equal to the minTs time. + auto oplogEntry = invariantStatusOK(repl::OplogEntry::parse(record.data.toBson())); + invariant(_specificStats.docsTested == 0); + const bool isNewRS = + oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && + oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; + uassert(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog", + isNewRS || oplogEntry.getTimestamp() <= *_params.minTs); + // We don't need to check this assertion again after we've confirmed the first oplog event. + _params.assertMinTsHasNotFallenOffOplog = false; +} + PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 74d9d8ddb29..3d79915b0fe 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -100,6 +100,11 @@ private: */ void setLatestOplogEntryTimestamp(const Record& record); + /** + * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog. + */ + void assertMinTsHasNotFallenOffOplog(const Record& record); + // WorkingSet is not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 659f6659eb1..aa0c790d1fd 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -66,6 +66,9 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog // across a sharded cluster. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 509369c31ae..4798bc2d247 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -416,6 +416,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context_d', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/db/service_context_test_fixture', + '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/s/query/router_exec_stage', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 9405edb87a2..1d4480cb4ba 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -384,7 +384,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'. startFrom = tokenData.clusterTime; if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) { - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData); + resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData); } else { resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData); } @@ -396,7 +396,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression "Only one type of resume option is allowed, but multiple were found.", !resumeStage); startFrom = *startAtOperationTime; - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); + resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom); } // We can only run on a replica set, or through mongoS. Confirm that this is the case. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 8d023d1ece4..2930854ca2f 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -37,7 +37,7 @@ using boost::intrusive_ptr; namespace mongo { namespace { -using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; +using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus; // Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, @@ -85,10 +85,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } else if (tokenDataFromResumedStream.txnOpIndex > tokenDataFromClient.txnOpIndex) { // This could happen if the client provided a txnOpIndex of 0, yet the 0th document in the // applyOps was irrelevant (meaning it was an operation on a collection or DB not being - // watched). If we are looking for the resume token on a shard then this simply means that - // the resume token may be on a different shard; otherwise, it indicates a corrupt token. - uassert(50792, "Invalid resumeToken: txnOpIndex was skipped", expCtx->needsMerge); - // We are running on a merging shard. Signal that we have read beyond the resume token. + // watched). Signal that we have read beyond the resume token. return ResumeStatus::kSurpassedToken; } @@ -169,17 +166,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } } // namespace -const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { - return kStageName.rawData(); -} - -Value DocumentSourceEnsureResumeTokenPresent::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // We only serialize this stage in the context of explain. - return explain - ? Value(DOC(kStageName << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) - : Value(); -} +DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSourceCheckResumability(expCtx, std::move(token)) {} intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx, @@ -187,170 +176,121 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); } -DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { + return kStageName.rawData(); +} DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() { + // If we have already verified the resume token is present, return the next doc immediately. if (_resumeStatus == ResumeStatus::kSurpassedToken) { - // We've already verified the resume token is present. return pSource->getNext(); } - // The incoming documents are sorted by resume token. We examine a range of documents that have - // the same clusterTime as the client's resume token, until we either find (and swallow) a match - // for the token or pass the point in the stream where it should have been. + auto nextInput = GetNextResult::makeEOF(); + + // If we are starting after an 'invalidate' and the invalidating command (e.g. collection drop) + // occurred at the same clusterTime on more than one shard, then we may see multiple identical + // resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken. while (_resumeStatus != ResumeStatus::kSurpassedToken) { - auto nextInput = pSource->getNext(); + // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This + // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token. + nextInput = DocumentSourceCheckResumability::doGetNext(); - // If there are no more results, return EOF. We will continue checking for the client's - // resume token the next time the getNext method is called. + // If there are no more results, return EOF. We will continue checking for the resume token + // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed + // the resume token on this iteration. if (!nextInput.isAdvanced()) { + invariant(_resumeStatus != ResumeStatus::kSurpassedToken); return nextInput; } - // Check the current event. If we found and swallowed the resume token, then the result will - // be the first event in the stream which should be returned to the user. Otherwise, we keep - // iterating the stream until we find an event matching the client's resume token. - if (auto nextOutput = _checkNextDocAndSwallowResumeToken(nextInput)) { - return *nextOutput; - } - } - MONGO_UNREACHABLE; -} -boost::optional<DocumentSource::GetNextResult> -DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken( - const DocumentSource::GetNextResult& nextInput) { - // We should only ever call this method when we have a new event to examine. - invariant(nextInput.isAdvanced()); - auto resumeStatus = - compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); - switch (resumeStatus) { - case ResumeStatus::kCheckNextDoc: - return boost::none; - case ResumeStatus::kFoundToken: - // We found the resume token. If we are starting after an 'invalidate' token and the - // invalidating command (e.g. collection drop) occurred at the same clusterTime on - // more than one shard, then we will see multiple identical 'invalidate' events - // here. We should continue to swallow all of them to ensure that the new stream - // begins after the collection drop, and that it is not immediately re-invalidated. - if (pExpCtx->inMongos && _tokenFromClient.fromInvalidate) { - _resumeStatus = ResumeStatus::kFoundToken; - return boost::none; - } - // If the token is not an invalidate or if we are not running in a cluster, we mark - // the stream as having surpassed the resume token, skip the current event since the - // client has already seen it, and return the next event in the stream. - _resumeStatus = ResumeStatus::kSurpassedToken; - return pSource->getNext(); - case ResumeStatus::kSurpassedToken: - // If we have surpassed the point in the stream where the resume token should have - // been and we did not see the token itself, then this stream cannot be resumed. - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "cannot resume stream; the resume token was not found. " - << nextInput.getDocument()["_id"].getDocument().toString(), - _resumeStatus == ResumeStatus::kFoundToken); - _resumeStatus = ResumeStatus::kSurpassedToken; - return nextInput; + // When we reach here, we have either found the resume token or surpassed it. + invariant(_resumeStatus != ResumeStatus::kCheckNextDoc); + + // If the resume status is kFoundToken, record the fact that we have seen the token. When we + // have surpassed the resume token, we will assert that we saw the token before doing so. We + // cannot simply assert once and then assume we have surpassed the token, because in certain + // cases we may see 1..N identical tokens and must swallow them all before proceeding. + _hasSeenResumeToken = (_hasSeenResumeToken || _resumeStatus == ResumeStatus::kFoundToken); } - MONGO_UNREACHABLE; -} -const char* DocumentSourceShardCheckResumability::getSourceName() const { - return kStageName.rawData(); -} + // Assert that before surpassing the resume token, we observed the token itself in the stream. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "cannot resume stream; the resume token was not found. " + << nextInput.getDocument()["_id"].getDocument().toString(), + _hasSeenResumeToken); -Value DocumentSourceShardCheckResumability::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // We only serialize this stage in the context of explain. - return explain - ? Value(DOC(kStageName << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) - : Value(); + // At this point, we have seen the token and swallowed it. Return the next event to the client. + invariant(_hasSeenResumeToken && _resumeStatus == ResumeStatus::kSurpassedToken); + return nextInput; } -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +DocumentSourceCheckResumability::DocumentSourceCheckResumability( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSource(getSourceName(), expCtx), _tokenFromClient(std::move(token)) {} + +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { // We are resuming from a point in time, not an event. Seed the stage with a high water mark. return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData()); } -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceShardCheckResumability(expCtx, std::move(token)); + return new DocumentSourceCheckResumability(expCtx, std::move(token)); } -DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceCheckResumability::getSourceName() const { + return kStageName.rawData(); +} -DocumentSource::GetNextResult DocumentSourceShardCheckResumability::doGetNext() { - if (_surpassedResumeToken) +DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() { + if (_resumeStatus == ResumeStatus::kSurpassedToken) { return pSource->getNext(); + } - while (!_surpassedResumeToken) { - auto nextInput = pSource->getNext(); + while (_resumeStatus != ResumeStatus::kSurpassedToken) { + // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change + // stream filter has fallen off the oplog. Catch this and throw a more explanatory error. + auto nextInput = [this]() { + try { + return pSource->getNext(); + } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) { + uasserted(ErrorCodes::ChangeStreamHistoryLost, + "Resume of change stream was not possible, as the resume point may no " + "longer be in the oplog."); + } + }(); - // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us - // from continually returning EOF in cases where the resume point has fallen off the oplog. + // If we hit EOF, return it immediately. if (!nextInput.isAdvanced()) { - _assertOplogHasEnoughHistory(nextInput); return nextInput; } + // Determine whether the current event sorts before, equal to or after the resume token. - auto resumeStatus = + _resumeStatus = compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); - switch (resumeStatus) { + switch (_resumeStatus) { case ResumeStatus::kCheckNextDoc: // If the result was kCheckNextDoc, we are resumable but must swallow this event. - _verifiedOplogHasEnoughHistory = true; continue; case ResumeStatus::kSurpassedToken: - // In this case the resume token wasn't found; it must be on another shard. We must - // examine the oplog to ensure that its history reaches back to before the resume - // token, otherwise we may have missed events that fell off the oplog. If we can - // resume, fall through into the following case and set _surpassedResumeToken. - _assertOplogHasEnoughHistory(nextInput); + // In this case the resume token wasn't found; it may be on another shard. However, + // since the oplog scan did not throw, we know that we are resumable. Fall through + // into the following case and return the document. case ResumeStatus::kFoundToken: - // We found the actual token! Set _surpassedResumeToken and return the result. - _surpassedResumeToken = true; + // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it. return nextInput; } } MONGO_UNREACHABLE; } -void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( - const GetNextResult& nextInput) { - // If we have already verified that this stream is resumable, return immediately. - if (_verifiedOplogHasEnoughHistory) { - return; - } - // Look up the first document in the oplog and compare it with the resume token's clusterTime. - auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); - auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = Pipeline::makePipeline({matchSpec}, firstEntryExpCtx); - if (auto first = pipeline->getNext()) { - auto firstOplogEntry = Value(*first); - // If the first entry in the oplog is the replset initialization, then it doesn't matter - // if its timestamp is later than the resume token. No events earlier than the token can - // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that - // the timestamp of the first oplog entry is earlier than that of the resume token. - using repl::kInitiatingSetMsg; - const bool isNewRS = - Value::compare(firstOplogEntry["o"]["msg"], Value(kInitiatingSetMsg), nullptr) == 0 && - Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0; - uassert(ErrorCodes::ChangeStreamHistoryLost, - "Resume of change stream was not possible, as the resume point may no longer be in " - "the oplog. ", - isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); - } else { - // Very unusual case: the oplog is empty. We can always resume. However, it should never be - // possible to have obtained a document that matched the filter if the oplog is empty. - uassert(ErrorCodes::ChangeStreamFatalError, - "Oplog was empty but found an event in the change stream pipeline. It should not " - "be possible for this to happen", - nextInput.isEOF()); - } - _verifiedOplogHasEnoughHistory = true; +Value DocumentSourceCheckResumability::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + // We only serialize this stage in the context of explain. + return explain ? Value(DOC(getSourceName() + << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) + : Value(); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 0e50658f89d..c3e19a4a450 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -39,31 +39,41 @@ namespace mongo { /** - * This checks for resumability on a single shard in the sharded case. The rules are + * This stage checks whether or not the oplog has enough history to resume the stream, and consumes + * all events up to the given resume point. It is deployed on all shards when resuming a stream on + * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with + * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check + * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal + * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an + * assertion, which this stage catches and converts into a more comprehensible $changeStream + * specific exception. The rules are: * - * - If the first document in the pipeline for this shard has a matching timestamp, we can - * always resume. - * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur - * on a secondary that has just started up from a primary that has not taken a write. - * In particular, an empty oplog cannot be the result of oplog truncation. - * - If neither of the above is true, the least-recent document in the oplog must precede the resume - * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or - * after seeing that there are no documents in the pipeline after the resume token in the shard, - * we're guaranteed not to miss any documents. + * - If the first event seen in the oplog has the same timestamp as the requested resume token or + * startAtOperationTime, we can resume. + * - If the timestamp of the first event seen in the oplog is earlier than the requested resume + * token or startAtOperationTime, we can resume. + * - If the first entry in the oplog is a replica set initialization, then we can resume even if the + * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen + * in a sharded cluster when a new shard is added. * - * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume - * token and the first matching document in the pipeline. - * - * This source need only run on a sharded collection. For unsharded collections, - * DocumentSourceEnsureResumeTokenPresent is sufficient. + * - Otherwise we cannot resume, as we do not know if there were any events between the resume token + * and the first matching document in the oplog. */ -class DocumentSourceShardCheckResumability final : public DocumentSource { +class DocumentSourceCheckResumability : public DocumentSource { public: - static constexpr StringData kStageName = "$_internalCheckShardResumability"_sd; + static constexpr StringData kStageName = "$_internalCheckResumability"_sd; - const char* getSourceName() const final; + // Used to record the results of comparing the token data extracted from documents in the + // resumed stream against the client's resume token. + enum class ResumeStatus { + kFoundToken, // The stream produced a document satisfying the client resume token. + kSurpassedToken, // The stream's latest document is more recent than the resume token. + kCheckNextDoc // The next document produced by the stream may contain the resume token. + }; + + const char* getSourceName() const override; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(Pipeline::SplitState pipeState) const override { return {StreamType::kStreaming, PositionRequirement::kNone, HostTypeRequirement::kAnyShard, @@ -75,48 +85,38 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts); - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); -private: +protected: /** - * Use the create static method to create a DocumentSourceShardCheckResumability. + * Use the create static method to create a DocumentSourceCheckResumability. */ - DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token); + DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, + ResumeTokenData token); - GetNextResult doGetNext() final; - - void _assertOplogHasEnoughHistory(const GetNextResult& nextInput); + GetNextResult doGetNext() override; + ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; const ResumeTokenData _tokenFromClient; - bool _verifiedOplogHasEnoughHistory = false; - bool _surpassedResumeToken = false; }; /** * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability { public: static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; - // Used to record the results of comparing the token data extracted from documents in the - // resumed stream against the client's resume token. - enum class ResumeStatus { - kFoundToken, // The stream produced a document satisfying the client resume token. - kSurpassedToken, // The stream's latest document is more recent than the resume token. - kCheckNextDoc // The next document produced by the stream may contain the resume token. - }; const char* getSourceName() const final; @@ -142,13 +142,11 @@ public: logic.mergingStage = this; // Also add logic to the shards to ensure that each shard has enough oplog history to resume // the change stream. - logic.shardsStage = DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient); + logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient); logic.inputSortPattern = change_stream_constants::kSortSpec; return logic; }; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; - static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); @@ -161,15 +159,8 @@ private: GetNextResult doGetNext() final; - /** - * Check the given event to determine whether it matches the client's resume token. If so, we - * swallow this event and return the next event in the stream. Otherwise, return boost::none. - */ - boost::optional<DocumentSource::GetNextResult> _checkNextDocAndSwallowResumeToken( - const DocumentSource::GetNextResult& nextInput); - - ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; - const ResumeTokenData _tokenFromClient; + // Records whether we have observed the token in the resumed stream. + bool _hasSeenResumeToken = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index e6d5f4b92da..e6d3cda6a24 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -35,7 +35,10 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/collection_mock.h" +#include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -44,54 +47,261 @@ #include "mongo/db/pipeline/resume_token.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/devnull/devnull_kv_engine.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" using boost::intrusive_ptr; -using std::deque; namespace mongo { namespace { +static constexpr StringData kOtherNs = "test.other.ns"_sd; static constexpr StringData kTestNs = "test.ns"_sd; +class ChangeStreamOplogCursorMock : public SeekableRecordCursor { +public: + ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {} + + virtual ~ChangeStreamOplogCursorMock() {} + + boost::optional<Record> next() override { + if (_records->empty()) { + return boost::none; + } + auto& next = _records->front(); + _records->pop_front(); + return next; + } + + boost::optional<Record> seekExact(const RecordId& id) override { + return Record{}; + } + void save() override {} + bool restore() override { + return true; + } + void detachFromOperationContext() override {} + void reattachToOperationContext(OperationContext* opCtx) override {} + +private: + std::deque<Record>* _records; +}; + +class ChangeStreamOplogCollectionMock : public CollectionMock { +public: + ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) { + _recordStore = + _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {}); + } + + void push_back(Document doc) { + // Every entry we push into the oplog should have both 'ts' and 'ns' fields. + invariant(doc["ts"].getType() == BSONType::bsonTimestamp); + invariant(doc["ns"].getType() == BSONType::String); + // Events should always be added in ascending ts order. + auto lastTs = + _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp(); + invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0); + // Fill out remaining required fields in the oplog entry. + MutableDocument mutableDoc{doc}; + mutableDoc.setField("op", Value("n"_sd)); + mutableDoc.setField("o", Value(Document{})); + mutableDoc.setField("wall", + Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL()))); + // Must remove _id since the oplog expects either no _id or an OID. + mutableDoc.remove("_id"); + // Convert to owned BSON and create corresponding Records. + _data.push_back(mutableDoc.freeze().toBson()); + Record record; + record.data = {_data.back().objdata(), _data.back().objsize()}; + record.id = RecordId{static_cast<int64_t>(_data.size())}; + _records.push_back(std::move(record)); + } + + std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx, + bool forward) const override { + return std::make_unique<ChangeStreamOplogCursorMock>(&_records); + } + + const RecordStore* getRecordStore() const override { + return _recordStore.get(); + } + RecordStore* getRecordStore() override { + return _recordStore.get(); + } + +private: + // We retain the owned record queue here because cursors may be destroyed and recreated. + mutable std::deque<Record> _records; + std::deque<BSONObj> _data; + + // These are no-op structures which are required by the CollectionScan. + std::unique_ptr<RecordStore> _recordStore; + DevNullKVEngine _devNullEngine; +}; + +/** + * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This + * class maintains its own queue of documents added by each test, but also pushes each doc into an + * underlying ChangeStreamOplogCollectionMock. When doGetNext() is called, it retrieves the next + * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test + * the latter's changestream-specific functionality. The reason this class keeps its own queue in + * addition to the ChangeStreamOplogCollectionMock is twofold: + * + * - The _id must be stripped from each document before it can be added to the mocked oplog, since + * the _id field of the test document is a resume token but oplog entries are only permitted to + * have OID _ids. We therefore have to restore the _id field of the document pulled from the + * CollectionScan before passing it into the pipeline. + * + * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan; + * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise + * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue + * with no corresponding entry in the ChangeStreamOplogCollectionMock queue. + */ +class DocumentSourceChangeStreamMock : public DocumentSourceMock { +public: + DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContextForTest>& expCtx) + : DocumentSourceMock({}, expCtx) { + _filterExpr = BSON("ns" << kTestNs); + _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); + _params.assertMinTsHasNotFallenOffOplog = true; + _params.shouldTrackLatestOplogTimestamp = true; + _params.minTs = Timestamp(0, 0); + _params.tailable = true; + } + + void setResumeToken(ResumeTokenData resumeToken) { + invariant(!_collScan); + _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime)); + _filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx); + _params.minTs = resumeToken.clusterTime; + } + + void push_back(GetNextResult&& result) { + // We should never push an explicit EOF onto the queue. + invariant(!result.isEOF()); + // If there is a document supplied, add it to the mock collection. + if (result.isAdvanced()) { + _collection.push_back(result.getDocument()); + } + // Both documents and pauses are stored in the DSMock queue. + DocumentSourceMock::push_back(std::move(result)); + } + + void push_back(const GetNextResult& result) { + MONGO_UNREACHABLE; + } + + bool isPermanentlyEOF() const { + return _collScan->getCommonStats()->isEOF; + } + +protected: + GetNextResult doGetNext() override { + // If this is the first call to doGetNext, we must create the COLLSCAN. + if (!_collScan) { + _collScan = std::make_unique<CollectionScan>( + pExpCtx.get(), &_collection, _params, &_ws, _filter.get()); + // The first call to doWork will create the cursor and return NEED_TIME. But it won't + // actually scan any of the documents that are present in the mock cursor queue. + ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME); + ASSERT_EQ(_getNumDocsTested(), 0); + } + while (true) { + // If the next result is a pause, return it and don't collscan. + auto nextResult = DocumentSourceMock::doGetNext(); + if (nextResult.isPaused()) { + return nextResult; + } + // Otherwise, retrieve the document via the CollectionScan stage. + auto id = WorkingSet::INVALID_ID; + switch (_collScan->doWork(&id)) { + case PlanStage::IS_EOF: + invariant(nextResult.isEOF()); + return nextResult; + case PlanStage::ADVANCED: { + // We need to restore the _id field which was removed when we added this + // entry into the oplog. This is like a stripped-down DSCSTransform stage. + MutableDocument mutableDoc{_ws.get(id)->doc.value()}; + mutableDoc["_id"] = nextResult.getDocument()["_id"]; + return mutableDoc.freeze(); + } + case PlanStage::NEED_TIME: + continue; + case PlanStage::NEED_YIELD: + MONGO_UNREACHABLE; + } + } + MONGO_UNREACHABLE; + } + +private: + size_t _getNumDocsTested() { + return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested; + } + + ChangeStreamOplogCollectionMock _collection; + std::unique_ptr<CollectionScan> _collScan; + CollectionScanParams _params; + + std::unique_ptr<MatchExpression> _filter; + BSONObj _filterExpr; + + WorkingSet _ws; +}; + class CheckResumeTokenTest : public AggregationContextFixture { public: - CheckResumeTokenTest() : _mock(DocumentSourceMock::createForTest(getExpCtx())) {} + CheckResumeTokenTest() : _mock(make_intrusive<DocumentSourceChangeStreamMock>(getExpCtx())) {} protected: /** * Pushes a document with a resume token corresponding to the given ResumeTokenData into the - * mock queue. + * mock queue. This document will have an ns field that matches the test namespace, and will + * appear in the change stream pipeline if its timestamp is at or after the resume timestamp. */ - void addDocument(ResumeTokenData tokenData) { - _mock->push_back(Document{{"_id", ResumeToken(std::move(tokenData)).toDocument()}}); + void addOplogEntryOnTestNS(ResumeTokenData tokenData) { + _mock->push_back(Document{{"ns", kTestNs}, + {"ts", tokenData.clusterTime}, + {"_id", ResumeToken(std::move(tokenData)).toDocument()}}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, * txnOpIndex, docKey, and namespace into the mock queue. */ - void addDocument( + void addOplogEntryOnTestNS( Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) { - return addDocument({ts, version, txnOpIndex, uuid, Value(docKey)}); + return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, * txnOpIndex, docKey, and namespace into the mock queue. */ - void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, docKey, uuid); + void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid); } /** * Pushes a document with a resume token corresponding to the given timestamp, _id string, and * namespace into the mock queue. */ - void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, Document{{"_id", id}}, uuid); + void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid); + } + + /** + * Pushes a document that does not match the test namespace into the mock oplog. This will be + * examined by the oplog CollectionScan but will not produce an event in the pipeline. + */ + void addOplogEntryOnOtherNS(Timestamp ts) { + _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}}); } + /** + * Pushes a pause in execution into the pipeline queue. + */ void addPause() { _mock->push_back(DocumentSource::GetNextResult::makePauseExecution()); } @@ -99,10 +309,11 @@ protected: /** * Convenience method to create the class under test with a given ResumeTokenData. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( ResumeTokenData tokenData) { auto checkResumeToken = - DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), std::move(tokenData)); + DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); checkResumeToken->setSource(_mock.get()); return checkResumeToken; } @@ -111,13 +322,13 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, int version, std::size_t txnOpIndex, boost::optional<Document> docKey, UUID uuid) { - return createCheckResumeToken( + return createDSEnsureResumeTokenPresent( {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); } @@ -125,18 +336,18 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, docKey, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid); } /** * Convenience method to create the class under test with a given timestamp, _id string, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, StringData id, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid); } /** @@ -148,28 +359,28 @@ protected: return *uuid_gen; } - intrusive_ptr<DocumentSourceMock> _mock; + intrusive_ptr<DocumentSourceChangeStreamMock> _mock; }; -class ShardCheckResumabilityTest : public CheckResumeTokenTest { +class CheckResumabilityTest : public CheckResumeTokenTest { protected: - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability( + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability( ResumeTokenData tokenData) { - auto shardCheckResumability = - DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData); - shardCheckResumability->setSource(_mock.get()); - return shardCheckResumability; + auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); + dsCheckResumability->setSource(_mock.get()); + return dsCheckResumability; } - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) { - return createShardCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) { + return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); } }; TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We should not see the resume token. ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -177,9 +388,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); addPause(); - addDocument(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We see the pause we inserted, but not the resume token. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -190,10 +401,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { Timestamp resumeTimestamp(100, 1); Timestamp doc1Timestamp(100, 2); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); addPause(); - addDocument(doc1Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "2"); // Pause added explicitly. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -208,13 +419,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); + addOplogEntryOnTestNS(resumeTimestamp, "0"); Timestamp doc1Timestamp(100, 2); Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); auto result1 = checkResumeToken->getNext(); ASSERT_TRUE(result1.isAdvanced()); @@ -228,23 +439,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) } TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { - Timestamp resumeTimestamp(100, 1); + // The first timestamp in the oplog precedes the resume token, and the second matches it... + Timestamp doc1Timestamp(100, 1); + Timestamp resumeTimestamp(100, 2); + Timestamp doc2Timestamp = resumeTimestamp; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); - Timestamp doc1Timestamp(100, 2); - Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + // ... but there's no entry in the oplog that matches the full token. + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) { +TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + + // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it + // but don't throw - we haven't surpassed the token yet and still may see it in the next doc. + addOplogEntryOnTestNS(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -252,9 +468,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { Timestamp resumeTimestamp(100, 1); auto resumeTokenUUID = UUID::gen(); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID); auto otherUUID = UUID::gen(); - addDocument(resumeTimestamp, "1", otherUUID); + addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } @@ -265,9 +481,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc"); // We must not see the following document. - addDocument(resumeTimestamp, "ABC"); + addOplogEntryOnTestNS(resumeTimestamp, "ABC"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -279,13 +495,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); Timestamp doc1Timestamp(100, 1); - addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); Timestamp doc2Timestamp(100, 2); Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}}; - addDocument(doc2Timestamp, doc2DocKey); + addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey); // We should skip doc1 since it satisfies the resume token, and retrieve doc2. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -300,10 +517,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -318,10 +536,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen getExpCtx()->inMongos = true; auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -334,10 +552,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -350,11 +568,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) { Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -379,20 +598,20 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the higher-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime but a lower UUID. One of the documents has // a lower docKey than the resume token, the other has a higher docKey; this demonstrates that // the UUID is the discriminating factor. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and UUID whose docKey sorts after the token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -417,13 +636,13 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the lower-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); // Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed // by a document which matches the resume token. This is not possible in practice, but it serves // to demonstrate that the resume attempt fails even when the resume token is present. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -447,18 +666,18 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); + auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Add three documents which each have the invalidate resume token. We expect to see this in the // event that we are starting after an invalidate and the invalidating event occurred on several // shards at the same clusterTime. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); // Add a document representing an insert which recreated the collection after it was dropped. auto expectedDocKey = Document{{"_id"_sd, 1}}; - addDocument(Timestamp{100, 2}, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(Timestamp{100, 2}, expectedDocKey, uuids[1]); // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it // and the next two invalidates, and return the insert event after the collection drop. @@ -487,7 +706,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); + auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Create a second invalidate token with the same clusterTime but a different UUID. auto unrelatedInvalidateToken = invalidateToken; @@ -496,12 +715,12 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv // Add three documents which each have the invalidate resume token. We expect to see this in the // event that we are starting after an invalidate and the invalidating event occurred on several // shards at the same clusterTime. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); + addOplogEntryOnTestNS(invalidateToken); // Add a fourth document which has the unrelated invalidate at the same clusterTime. - addDocument(unrelatedInvalidateToken); + addOplogEntryOnTestNS(unrelatedInvalidateToken); // DSEnsureResumeTokenPresent should confirm that the invalidate event is present, swallow it // and the next two invalidates, but decline to swallow the unrelated invalidate. @@ -512,39 +731,6 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv ASSERT_EQ(tokenFromFirstDocAfterResume, unrelatedInvalidateToken); } -TEST_F(CheckResumeTokenTest, ShouldSwallowOnlyFirstInvalidateForStartAfterInvalidateInReplSet) { - Timestamp resumeTimestamp(100, 1); - - // We only swallow multiple invalidates when DSEnsureResumeTokenPresent is running on mongoS. - // Set {inMongos:false} to verify that we do not swallow additional invalidates on a replica - // set, since this should never occur. - getExpCtx()->inMongos = false; - - // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A - // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new - // stream after the old stream is invalidated. - ResumeTokenData invalidateToken; - invalidateToken.clusterTime = resumeTimestamp; - invalidateToken.uuid = testUuid(); - invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - auto checkResumeToken = createCheckResumeToken(invalidateToken); - - // Add three documents which each have the invalidate resume token. - addDocument(invalidateToken); - addDocument(invalidateToken); - addDocument(invalidateToken); - - // DSEnsureResumeTokenPresent should confirm that the invalidate event is present and swallow - // it. However, it should not swallow the subsequent two invalidates. - for (size_t i = 0; i < 2; ++i) { - const auto nextDocAfterResume = checkResumeToken->getNext(); - const auto tokenFromNextDocAfterResume = - ResumeToken::parse(nextDocAfterResume.getDocument()["_id"].getDocument()).getData(); - ASSERT_EQ(tokenFromNextDocAfterResume, invalidateToken); - } - ASSERT(checkResumeToken->getNext().isEOF()); -} - TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { Timestamp resumeTimestamp(100, 1); @@ -554,21 +740,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { std::sort(uuids.begin(), uuids.end()); auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime and version but a lower applyOps index. One // of the documents has a lower uuid than the resume token, the other has a higher uuid; this // demonstrates that the applyOps index is the discriminating factor. - addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and version whose applyOps sorts after the // resume token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -585,261 +771,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } -/** - * A mock MongoProcessInterface which allows mocking a foreign pipeline. - */ -class MockMongoInterface final : public StubMongoProcessInterface { -public: - MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) - : _mockResults(std::move(mockResults)) {} - - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { - return false; - } - - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* ownedPipeline, bool localCursorOnly) final { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline( - ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx)); - pipeline->addInitialSource( - DocumentSourceMock::createForTest(_mockResults, pipeline->getContext())); - return pipeline; - } - -private: - deque<DocumentSource::GetNextResult> _mockResults; -}; - -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, + ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) { Timestamp resumeTimestamp(100, 1); - Timestamp oplogTimestamp(100, 2); + Timestamp oplogTimestamp(100, 1); - ResumeTokenData tokenData( - resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(tokenData); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) { Timestamp resumeTimestamp(100, 1); - ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); - // We should see the resume token. + // As with other tailable cursors, starting a change stream on an empty capped collection will + // cause the cursor to immediately and permanently EOF. This should never happen in practice, + // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING + // states, and there must be at least one entry in the oplog in order to reach those states. + auto shardCheckResumability = createDSCheckResumability(resumeTimestamp); auto result = shardCheckResumability->getNext(); - ASSERT_TRUE(result.isAdvanced()); - auto& doc = result.getDocument(); - ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); + ASSERT_TRUE(result.isEOF()); + ASSERT_TRUE(_mock->isPermanentlyEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isEOF()); +} + +TEST_F(CheckResumabilityTest, + ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, - ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, + ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isAdvanced()); + auto& doc = result.getDocument(); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); +} + +TEST_F(CheckResumabilityTest, ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { +TEST_F(CheckResumabilityTest, + ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) { + Timestamp resumeTimestamp(100, 1); + Timestamp oplogTimestamp(100, 2); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + // Confirm that there are two documents queued in the mock oplog. + ASSERT_EQ(_mock->size(), 2); + ASSERT_THROWS_CODE( + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamHistoryLost); + // Confirm that only the first document was read before the assertion was thrown. + ASSERT_EQ(_mock->size(), 1); +} + +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog( - {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - auto result2 = shardCheckResumability->getNext(); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isEOF()); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add the resume token, plus one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "3"); - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "3"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the resume token... - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto doc = result.getDocument(); ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData()); // ... then the post-token event, and then finally EOF. - result = shardCheckResumability->getNext(); + result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument()}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the post-token event, followed by EOF. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument()}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index bdc3c5c7e2d..ae9e0ef4f63 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -55,6 +55,10 @@ const char* DocumentSourceMock::getSourceName() const { return "mock"; } +const size_t DocumentSourceMock::size() const { + return _queue.size(); +} + boost::intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest( Document doc, const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new DocumentSourceMock({std::move(doc)}, expCtx); diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index a538e3b9086..b845df5ac92 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -70,6 +70,8 @@ public: const char* getSourceName() const override; + const size_t size() const; + void reattachToOperationContext(OperationContext* opCtx) { isDetachedFromOpCtx = false; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index adec324007d..647380368db 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -655,7 +655,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep if (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) { invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); - plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | + QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } // If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index ad10da0c88c..6390a25dde6 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -76,6 +76,7 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r CollectionScanParams params; params.tailable = csn->tailable; params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; + params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index a0f98e7734a..db4779c2500 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -217,6 +217,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->tailable = tailable; csn->shouldTrackLatestOplogTimestamp = params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + csn->assertMinTsHasNotFallenOffOplog = + params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; csn->shouldWaitForOplogVisibility = params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index dc8bc35ef64..e5eebac0cd2 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -173,6 +173,9 @@ string optionString(size_t options) { case QueryPlannerParams::PRESERVE_RECORD_ID: ss << "PRESERVE_RECORD_ID "; break; + case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG: + ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG "; + break; case QueryPlannerParams::DEFAULT: MONGO_UNREACHABLE; break; diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index 4c6b0400f8e..3c2cc59c24e 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -97,6 +97,10 @@ struct QueryPlannerParams { // ids. In some cases, record ids can be discarded as an optimization when they will not be // consumed downstream. PRESERVE_RECORD_ID = 1 << 10, + + // Set this on an oplog scan to uassert that the oplog has not already rolled over the + // minimum 'ts' timestamp specified in the query. + ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 11, }; // See Options enum above. diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 0e62859febf..0e6bdda0b91 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -212,6 +212,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->tailable = this->tailable; copy->direction = this->direction; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; + copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog; copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; return copy; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index cdb038cd61e..0020b6f8442 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -435,6 +435,9 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet { // across a sharded cluster. bool shouldTrackLatestOplogTimestamp = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + int direction{1}; // Whether or not to wait for oplog visibility on oplog collection scans. |