diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-06-01 18:47:00 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-21 23:28:32 +0000 |
commit | f58bdb68407abc68fddc8a91c0d58785423db1b5 (patch) | |
tree | cdb930fe469052dd6e51fc19ffea9b4500e0daae | |
parent | ae47641690c8ff75bd0b729d28d705eacdb84c9d (diff) | |
download | mongo-f58bdb68407abc68fddc8a91c0d58785423db1b5.tar.gz |
SERVER-48523 Unconditionally check the first entry in the oplog when attempting to resume a change stream
(cherry picked from commit 694ed4153b9d5424b5d169fea5c68f99d4dfb45a)
(cherry picked from commit 9e38cbba7d3efefa59e25cb0411558591036d30b)
22 files changed, 711 insertions, 397 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 15f4907703b..19201195034 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -267,6 +267,9 @@ error_code("DataModifiedByRepair", 269); error_code("RepairedReplicaSetNode", 270); error_code("AlarmAlreadyFulfilled", 277) error_code("ChangeStreamFatalError", 280) + +error_code("OplogQueryMinTsMissing", 326) + # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index 5cf52d3a842..41f801be109 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -38,8 +38,8 @@ namespace mongo { * This class comprises a mock Collection for use by UUIDCatalog unit tests. */ class CollectionMock : virtual public Collection::Impl, - virtual CappedCallback, - virtual UpdateNotifier { + virtual public CappedCallback, + virtual public UpdateNotifier { public: CollectionMock(const NamespaceString& ns) : _ns(ns) {} ~CollectionMock() = default; diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 4cfe764d8c8..f4e16cb3b84 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -40,6 +40,7 @@ #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" @@ -72,6 +73,12 @@ CollectionScan::CollectionScan(OperationContext* opCtx, _specificStats.maxTs = params.maxTs; invariant(!_params.shouldTrackLatestOplogTimestamp || _params.collection->ns().isOplog()); + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. + if (params.assertMinTsHasNotFallenOffOplog) { + invariant(params.shouldTrackLatestOplogTimestamp); + invariant(params.minTs); + } + if (params.maxTs) { _endConditionBSON = BSON("$gte" << *(params.maxTs)); _endCondition = stdx::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName, @@ -172,19 +179,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { } if (!record) { - // We just hit EOF. If we are tailable and have already returned data, leave us in a - // state to pick up where we left off on the next call to work(). Otherwise EOF is - // permanent. + // We hit EOF. If we are tailable and have already seen data, leave us in a state to pick up + // where we left off on the next call to work(). Otherwise, the EOF is permanent. if (_params.tailable && !_lastSeenId.isNull()) { _cursor.reset(); } else { _commonStats.isEOF = true; } - return PlanStage::IS_EOF; } _lastSeenId = record->id; + if (_params.assertMinTsHasNotFallenOffOplog) { + assertMinTsHasNotFallenOffOplog(*record); + } if (_params.shouldTrackLatestOplogTimestamp) { auto status = setLatestOplogEntryTimestamp(*record); if (!status.isOK()) { @@ -215,6 +223,25 @@ Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) { return Status::OK(); } +void CollectionScan::assertMinTsHasNotFallenOffOplog(const Record& record) { + // If the first entry we see in the oplog is the replset initialization, then it doesn't matter + // if its timestamp is later than the specified minTs; no events earlier than the minTs can have + // fallen off this oplog. Otherwise, verify that the timestamp of the first observed oplog entry + // is earlier than or equal to the minTs time. + auto swOplogEntry = repl::OplogEntry::parse(record.data.toBson()); + invariant(_specificStats.docsTested == 0); + invariant(swOplogEntry.isOK()); + auto oplogEntry = std::move(swOplogEntry.getValue()); + const bool isNewRS = + oplogEntry.getObject().binaryEqual(BSON("msg" << repl::kInitiatingSetMsg)) && + oplogEntry.getOpType() == repl::OpTypeEnum::kNoop; + uassert(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog", + isNewRS || oplogEntry.getTimestamp() <= *_params.minTs); + // We don't need to check this assertion again after we've confirmed the first oplog event. + _params.assertMinTsHasNotFallenOffOplog = false; +} + PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index ca2e6d4a535..d9723d3fae3 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -94,6 +94,11 @@ private: */ Status setLatestOplogEntryTimestamp(const Record& record); + /** + * Asserts that the 'minTs' specified in the query filter has not already fallen off the oplog. + */ + void assertMinTsHasNotFallenOffOplog(const Record& record); + // WorkingSet is not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index eff1e5e986d..0579eae5ac1 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -51,6 +51,10 @@ struct CollectionScanParams { // not being invalidated before the first call to work(...). RecordId start; + // If present, the collection scan will seek directly to the RecordId of an oplog entry as + // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans. + boost::optional<Timestamp> minTs; + // If present, the collection scan will stop and return EOF the first time it sees a document // that does not pass the filter and has 'ts' greater than 'maxTs'. boost::optional<Timestamp> maxTs; @@ -60,6 +64,9 @@ struct CollectionScanParams { // Do we want the scan to be 'tailable'? Only meaningful if the collection is capped. bool tailable = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + // Should we keep track of the timestamp of the latest oplog entry we've seen? This information // is needed to merge cursors from the oplog in order of operation time when reading the oplog // across a sharded cluster. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index bf753f4be96..0e5744cce40 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -225,9 +225,13 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/catalog/catalog_impl', + '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/storage/devnull/storage_devnull_core', + '$BUILD_DIR/mongo/db/storage/kv/kv_engine_mock', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', '$BUILD_DIR/mongo/util/clock_source_mock', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 28acafc8726..1245d6a99d9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -385,7 +385,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline( // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'. startFrom = tokenData.clusterTime; if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) { - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, tokenData); + resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData); } else { resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData); } @@ -417,7 +417,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline( << " in a $changeStream stage.", !resumeAfterClusterTime); startFrom = *startAtOperationTime; - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); + resumeStage = DocumentSourceCheckResumability::create(expCtx, *startFrom); } // There might not be a starting point if we're on mongos, otherwise we should either have a diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index ef067f773a0..2f1eda6f7a6 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -1,4 +1,3 @@ - /** * Copyright (C) 2018-present MongoDB, Inc. * @@ -35,9 +34,13 @@ using boost::intrusive_ptr; namespace mongo { + +constexpr StringData DocumentSourceCheckResumability::kStageName; +constexpr StringData DocumentSourceEnsureResumeTokenPresent::kStageName; + namespace { -using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; +using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus; // Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, @@ -79,15 +82,14 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte : ResumeStatus::kCheckNextDoc; } + // If the document's 'applyOpsIndex' sorts before that of the client token, we must keep + // looking. if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { return ResumeStatus::kCheckNextDoc; } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) { // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being - // watched). If we are looking for the resume token on a shard then this simply means that - // the resume token may be on a different shard; otherwise, it indicates a corrupt token. - uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge); - // We are running on a merging shard. Signal that we have read beyond the resume token. + // watched). Signal that we have read beyond the resume token. return ResumeStatus::kSurpassedToken; } @@ -161,16 +163,9 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } } // namespace -const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { - return "$_ensureResumeTokenPresent"; -} - -Value DocumentSourceEnsureResumeTokenPresent::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // This stage is created by the DocumentSourceChangeStream stage, so serializing it here - // would result in it being created twice. - return Value(); -} +DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSourceCheckResumability(expCtx, std::move(token)) {} intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx, @@ -178,39 +173,40 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); } -DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { + return kStageName.rawData(); +} DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { pExpCtx->checkForInterrupt(); + // If we have already verified the resume token is present, return the next doc immediately. if (_resumeStatus == ResumeStatus::kFoundToken) { - // We've already verified the resume token is present. return pSource->getNext(); } - Document documentFromResumedStream; + auto nextInput = GetNextResult::makeEOF(); - // Keep iterating the stream until we see either the resume token we're looking for, - // or a change with a higher timestamp than our resume token. + // Keep iterating the stream until we see either the resume token we're looking for, or a change + // with a higher timestamp than our resume token. while (_resumeStatus == ResumeStatus::kCheckNextDoc) { - auto nextInput = pSource->getNext(); + // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This + // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token. + nextInput = DocumentSourceCheckResumability::getNext(); - if (!nextInput.isAdvanced()) + // If there are no more results, return EOF. We will continue checking for the resume token + // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed + // the resume token on this iteration. + if (!nextInput.isAdvanced()) { + invariant(_resumeStatus != ResumeStatus::kSurpassedToken); return nextInput; - - // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range - // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided - // token would sort before this received document we cannot resume the change stream. - _resumeStatus = compareAgainstClientResumeToken( - pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient); + } } uassert(ErrorCodes::ChangeStreamFatalError, str::stream() << "resume of change stream was not possible, as the resume token was not found. " - << documentFromResumedStream["_id"].getDocument().toString(), + << nextInput.getDocument()["_id"].getDocument().toString(), _resumeStatus != ResumeStatus::kSurpassedToken); // If we reach this point, then we've seen the resume token. @@ -220,102 +216,73 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() return pSource->getNext(); } -const char* DocumentSourceShardCheckResumability::getSourceName() const { - return "$_checkShardResumability"; -} - -Value DocumentSourceShardCheckResumability::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - // This stage is created by the DocumentSourceChangeStream stage, so serializing it here - // would result in it being created twice. - return Value(); -} +DocumentSourceCheckResumability::DocumentSourceCheckResumability( + const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { // We are resuming from a point in time, not an event. Seed the stage with a high water mark. return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData()); } -intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( +intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceShardCheckResumability(expCtx, std::move(token)); + return new DocumentSourceCheckResumability(expCtx, std::move(token)); } -DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} +const char* DocumentSourceCheckResumability::getSourceName() const { + return kStageName.rawData(); +} -DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { +DocumentSource::GetNextResult DocumentSourceCheckResumability::getNext() { pExpCtx->checkForInterrupt(); - if (_surpassedResumeToken) + if (_resumeStatus == ResumeStatus::kSurpassedToken) { return pSource->getNext(); + } - while (!_surpassedResumeToken) { - auto nextInput = pSource->getNext(); - - // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us - // from continually returning EOF in cases where the resume point has fallen off the oplog. + while (_resumeStatus != ResumeStatus::kSurpassedToken) { + // The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change + // stream filter has fallen off the oplog. Catch this and throw a more explanatory error. + auto nextInput = [this]() { + try { + return pSource->getNext(); + } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) { + uasserted(ErrorCodes::ChangeStreamFatalError, + "Resume of change stream was not possible, as the resume point may no " + "longer be in the oplog."); + } + }(); + + // If we hit EOF, return it immediately. if (!nextInput.isAdvanced()) { - _assertOplogHasEnoughHistory(nextInput); return nextInput; } + // Determine whether the current event sorts before, equal to or after the resume token. - auto resumeStatus = + _resumeStatus = compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient); - switch (resumeStatus) { + switch (_resumeStatus) { case ResumeStatus::kCheckNextDoc: // If the result was kCheckNextDoc, we are resumable but must swallow this event. - _verifiedOplogHasEnoughHistory = true; continue; case ResumeStatus::kSurpassedToken: - // In this case the resume token wasn't found; it must be on another shard. We must - // examine the oplog to ensure that its history reaches back to before the resume - // token, otherwise we may have missed events that fell off the oplog. If we can - // resume, fall through into the following case and set _surpassedResumeToken. - _assertOplogHasEnoughHistory(nextInput); + // In this case the resume token wasn't found; it may be on another shard. However, + // since the oplog scan did not throw, we know that we are resumable. Fall through + // into the following case and return the document. case ResumeStatus::kFoundToken: - // We found the actual token! Set _surpassedResumeToken and return the result. - _surpassedResumeToken = true; + // We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it. return nextInput; } } MONGO_UNREACHABLE; } -void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( - const GetNextResult& nextInput) { - // If we have already verified that this stream is resumable, return immediately. - if (_verifiedOplogHasEnoughHistory) { - return; - } - // Look up the first document in the oplog and compare it with the resume token's clusterTime. - auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); - auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); - if (auto first = pipeline->getNext()) { - auto firstOplogEntry = Value(*first); - // If the first entry in the oplog is the replset initialization, then it doesn't matter - // if its timestamp is later than the resume token. No events earlier than the token can - // have fallen off this oplog, and it is therefore safe to resume. Otherwise, verify that - // the timestamp of the first oplog entry is earlier than that of the resume token. - const bool isNewRS = - Value::compare(firstOplogEntry["o"]["msg"], Value("initiating set"_sd), nullptr) == 0 && - Value::compare(firstOplogEntry["op"], Value("n"_sd), nullptr) == 0; - uassert(ErrorCodes::ChangeStreamFatalError, - "Resume of change stream was not possible, as the resume point may no longer be in " - "the oplog. ", - isNewRS || firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime); - } else { - // Very unusual case: the oplog is empty. We can always resume. However, it should never be - // possible to have obtained a document that matched the filter if the oplog is empty. - uassert(ErrorCodes::ChangeStreamFatalError, - "Oplog was empty but found an event in the change stream pipeline. It should not " - "be possible for this to happen", - nextInput.isEOF()); - } - _verifiedOplogHasEnoughHistory = true; +Value DocumentSourceCheckResumability::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + // This stage is created by the DocumentSourceChangeStream stage, so serializing it here + // would result in it being created twice. + return Value(); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 10c3ebd1a73..360e1d14b1b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -39,30 +39,42 @@ namespace mongo { /** - * This checks for resumability on a single shard in the sharded case. The rules are + * This stage checks whether or not the oplog has enough history to resume the stream, and consumes + * all events up to the given resume point. It is deployed on all shards when resuming a stream on + * a sharded cluster, and is also used in the single-replicaset case when a stream is opened with + * startAtOperationTime or with a high-water-mark resume token. It defers to the COLLSCAN to check + * whether the first event (matching or non-matching) encountered in the oplog has a timestamp equal + * to or earlier than the minTs in the change stream filter. If not, the COLLSCAN will throw an + * assertion, which this stage catches and converts into a more comprehensible $changeStream + * specific exception. The rules are: * - * - If the first document in the pipeline for this shard has a matching timestamp, we can - * always resume. - * - If the oplog is empty, we can resume. An empty oplog is rare and can only occur - * on a secondary that has just started up from a primary that has not taken a write. - * In particular, an empty oplog cannot be the result of oplog truncation. - * - If neither of the above is true, the least-recent document in the oplog must precede the resume - * timestamp. If we do this check after seeing the first document in the pipeline in the shard, or - * after seeing that there are no documents in the pipeline after the resume token in the shard, - * we're guaranteed not to miss any documents. + * - If the first event seen in the oplog has the same timestamp as the requested resume token or + * startAtOperationTime, we can resume. + * - If the timestamp of the first event seen in the oplog is earlier than the requested resume + * token or startAtOperationTime, we can resume. + * - If the first entry in the oplog is a replica set initialization, then we can resume even if the + * token timestamp is earlier, since no events can have fallen off this oplog yet. This can happen + * in a sharded cluster when a new shard is added. * - * - Otherwise we cannot resume, as we do not know if this shard lost documents between the resume - * token and the first matching document in the pipeline. - * - * This source need only run on a sharded collection. For unsharded collections, - * DocumentSourceEnsureResumeTokenPresent is sufficient. + * - Otherwise we cannot resume, as we do not know if there were any events between the resume token + * and the first matching document in the oplog. */ -class DocumentSourceShardCheckResumability final : public DocumentSource { +class DocumentSourceCheckResumability : public DocumentSource { public: - GetNextResult getNext() final; - const char* getSourceName() const final; + static constexpr StringData kStageName = "$_internalCheckResumability"_sd; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { + // Used to record the results of comparing the token data extracted from documents in the + // resumed stream against the client's resume token. + enum class ResumeStatus { + kFoundToken, // The stream produced a document satisfying the client resume token. + kSurpassedToken, // The stream's latest document is more recent than the resume token. + kCheckNextDoc // The next document produced by the stream may contain the resume token. + }; + + GetNextResult getNext() override; + const char* getSourceName() const override; + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { return {StreamType::kStreaming, PositionRequirement::kNone, HostTypeRequirement::kAnyShard, @@ -74,42 +86,33 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts); - static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); -private: +protected: /** - * Use the create static method to create a DocumentSourceShardCheckResumability. + * Use the create static method to create a DocumentSourceCheckResumability. */ - DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token); + DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, + ResumeTokenData token); - void _assertOplogHasEnoughHistory(const GetNextResult& nextInput); - - ResumeTokenData _tokenFromClient; - bool _verifiedOplogHasEnoughHistory = false; - bool _surpassedResumeToken = false; + ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; + const ResumeTokenData _tokenFromClient; }; /** * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource, +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability, public NeedsMergerDocumentSource { public: - // Used to record the results of comparing the token data extracted from documents in the - // resumed stream against the client's resume token. - enum class ResumeStatus { - kFoundToken, // The stream produced a document satisfying the client resume token. - kSurpassedToken, // The stream's latest document is more recent than the resume token. - kCheckNextDoc // The next document produced by the stream may contain the resume token. - }; + static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; - GetNextResult getNext() final; + GetNextResult getNext() override; const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -127,11 +130,11 @@ public: /** * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point - * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards + * could be at any shard. Also add a DocumentSourceCheckResumability stage on the shards * pipeline to ensure that each shard has enough oplog history to resume the change stream. */ boost::intrusive_ptr<DocumentSource> getShardSource() final { - return DocumentSourceShardCheckResumability::create(pExpCtx, _tokenFromClient); + return DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient); }; std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { @@ -149,8 +152,6 @@ public: return {sortMergingPresorted, this}; }; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); @@ -160,9 +161,6 @@ private: */ DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); - - ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; - ResumeTokenData _tokenFromClient; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index a6fd999314a..b47212a6544 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -36,6 +36,11 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/collection_mock.h" +#include "mongo/db/catalog/database_holder_mock.h" +#include "mongo/db/catalog/database_impl.h" +#include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -45,86 +50,371 @@ #include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/devnull/devnull_kv_engine.h" +#include "mongo/db/storage/kv/kv_database_catalog_entry_mock.h" +#include "mongo/db/storage/kv/kv_storage_engine.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" using boost::intrusive_ptr; -using std::deque; namespace mongo { namespace { +static constexpr StringData kOtherNs = "test.other.ns"_sd; static constexpr StringData kTestNs = "test.ns"_sd; +class ChangeStreamOplogCursorMock : public SeekableRecordCursor { +public: + ChangeStreamOplogCursorMock(std::deque<Record>* records) : _records(records) {} + + virtual ~ChangeStreamOplogCursorMock() {} + + boost::optional<Record> next() override { + if (_records->empty()) { + return boost::none; + } + auto& next = _records->front(); + _records->pop_front(); + return next; + } + + boost::optional<Record> seekExact(const RecordId& id) override { + return Record{}; + } + void save() override {} + bool restore() override { + return true; + } + void detachFromOperationContext() override {} + void reattachToOperationContext(OperationContext* opCtx) override {} + +private: + std::deque<Record>* _records; +}; + +class ChangeStreamOplogCollectionMock : public CollectionMock { +public: + ChangeStreamOplogCollectionMock() : CollectionMock(NamespaceString::kRsOplogNamespace) { + _recordStore = + _devNullEngine.getRecordStore(nullptr, NamespaceString::kRsOplogNamespace.ns(), "", {}); + } + + void init(OperationContext* opCtx) override {} + + void push_back(Document doc) { + // Every entry we push into the oplog should have both 'ts' and 'ns' fields. + invariant(doc["ts"].getType() == BSONType::bsonTimestamp); + invariant(doc["ns"].getType() == BSONType::String); + // Events should always be added in ascending ts order. + auto lastTs = + _records.empty() ? Timestamp(0, 0) : _records.back().data.toBson()["ts"].timestamp(); + invariant(ValueComparator().compare(Value(lastTs), doc["ts"]) <= 0); + // Fill out remaining required fields in the oplog entry. + MutableDocument mutableDoc{doc}; + mutableDoc.setField("op", Value("n"_sd)); + mutableDoc.setField("o", Value(Document{})); + mutableDoc.setField("h", Value(1LL)); + mutableDoc.setField("wall", + Value(Date_t::fromMillisSinceEpoch(doc["ts"].getTimestamp().asLL()))); + // Must remove _id since the oplog expects either no _id or an OID. + mutableDoc.remove("_id"); + // Convert to owned BSON and create corresponding Records. + _data.push_back(mutableDoc.freeze().toBson()); + Record record; + record.data = {_data.back().objdata(), _data.back().objsize()}; + record.id = RecordId{static_cast<int64_t>(_data.size())}; + _records.push_back(std::move(record)); + } + + std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx, + bool forward) const override { + return std::make_unique<ChangeStreamOplogCursorMock>(&_records); + } + + const RecordStore* getRecordStore() const override { + return _recordStore.get(); + } + RecordStore* getRecordStore() override { + return _recordStore.get(); + } + +private: + // We retain the owned record queue here because cursors may be destroyed and recreated. + mutable std::deque<Record> _records; + std::deque<BSONObj> _data; + + // These are no-op structures which are required by the CollectionScan. + std::unique_ptr<RecordStore> _recordStore; + DevNullKVEngine _devNullEngine; +}; + +/** + * The RequiresCollectionStageBase class attempts to obtain the current epoch of the database + * containing the collection to be scanned (in this case, the oplog). Here we provide a dummy + * DatabaseHolder which will always return a valid pointer to the _database member variable. + */ +class ChangeStreamDatabaseHolderMock : public DatabaseHolderMock { +public: + ChangeStreamDatabaseHolderMock(){}; + + Database* get(OperationContext* opCtx, StringData ns) const override { + return createAndGetDB(opCtx, ns); + } + + Database* openDb(OperationContext* opCtx, StringData ns, bool* justCreated = nullptr) override { + return createAndGetDB(opCtx, ns); + } + +private: + Database* createAndGetDB(OperationContext* opCtx, StringData ns) const { + if (!_database) { + _storageEngine = std::make_unique<KVStorageEngine>( + &_devNullEngine, KVStorageEngineOptions{}, kvDatabaseCatalogEntryMockFactory); + _dbEntry = kvDatabaseCatalogEntryMockFactory(NamespaceString::kRsOplogNamespace.db(), + _storageEngine.get()); + _database = std::make_unique<Database>( + opCtx, NamespaceString::kRsOplogNamespace.db(), _dbEntry.get()); + } + return _database.get(); + } + + mutable std::unique_ptr<mongo::KVDatabaseCatalogEntryMock> _dbEntry; + mutable std::unique_ptr<KVStorageEngine> _storageEngine; + mutable std::unique_ptr<Database> _database; + mutable DevNullKVEngine _devNullEngine; +}; + +/** + * Acts as an initial source for the change stream pipeline, taking the place of DSOplogMatch. This + * class maintains its own queue of documents added by each test, but also pushes each doc into an + * underlying ChangeStreamOplogCollectionMock. When getNext() is called, it retrieves the next + * document by pulling it from the mocked oplog collection via a CollectionScan, in order to test + * the latter's changestream-specific functionality. The reason this class keeps its own queue in + * addition to the ChangeStreamOplogCollectionMock is twofold: + * + * - The _id must be stripped from each document before it can be added to the mocked oplog, since + * the _id field of the test document is a resume token but oplog entries are only permitted to + * have OID _ids. We therefore have to restore the _id field of the document pulled from the + * CollectionScan before passing it into the pipeline. + * + * - The concept of GetNextResult::ReturnStatus::kPauseExecution does not exist in CollectionScan; + * NEED_TIME is somewhat analogous but cannot be artificially induced. For tests which exercise + * kPauseExecution, these events are stored only in the DocumentSourceChangeStreamMock queue + * with no corresponding entry in the ChangeStreamOplogCollectionMock queue. + */ +class DocumentSourceChangeStreamMock : public DocumentSourceMock { +public: + DocumentSourceChangeStreamMock(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) { + // Create a ChangeStreamOplogCollectionMock and retain an unowned pointer to it. + auto csOplogColl = std::make_unique<ChangeStreamOplogCollectionMock>(); + _collection = csOplogColl.get(); + + // Use the ChangeStreamOplogCollectionMock to instantiate a Collection wrapper. + _collWrapper = std::make_unique<Collection>(std::move(csOplogColl)); + + // Set up the CollectionScanParams object and pass it a pointer to the Collection. + _filterExpr = BSON("ns" << kTestNs); + _filter = _parseAndNormalize(_filterExpr); + _params.assertMinTsHasNotFallenOffOplog = true; + _params.shouldTrackLatestOplogTimestamp = true; + _params.collection = _collWrapper.get(); + _params.minTs = Timestamp(0, 0); + _params.tailable = true; + } + + void setResumeToken(ResumeTokenData resumeToken) { + invariant(!_collScan); + _filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime)); + _filter = _parseAndNormalize(_filterExpr); + _params.minTs = resumeToken.clusterTime; + } + + void push_back(GetNextResult&& result) { + // We should never push an explicit EOF onto the queue. + invariant(!result.isEOF()); + // If there is a document supplied, add it to the mock collection. + if (result.isAdvanced()) { + _collection->push_back(result.getDocument()); + } + // Both documents and pauses are stored in the DSMock queue. + queue.push_back(std::move(result)); + } + + void push_back(const GetNextResult& result) { + MONGO_UNREACHABLE; + } + + bool isPermanentlyEOF() const { + return _collScan->getCommonStats()->isEOF; + } + +protected: + GetNextResult getNext() override { + // If this is the first call to getNext, we must create the COLLSCAN. + if (!_collScan) { + _collScan = + std::make_unique<CollectionScan>(pExpCtx->opCtx, _params, &_ws, _filter.get()); + // The first call to doWork will create the cursor and return NEED_TIME. But it won't + // actually scan any of the documents that are present in the mock cursor queue. + ASSERT_EQ(_collScan->doWork(nullptr), PlanStage::NEED_TIME); + ASSERT_EQ(_getNumDocsTested(), 0UL); + } + while (true) { + // If the next result is a pause, return it and don't collscan. + auto nextResult = DocumentSourceMock::getNext(); + if (nextResult.isPaused()) { + return nextResult; + } + // Otherwise, retrieve the document via the CollectionScan stage. + auto id = WorkingSet::INVALID_ID; + switch (_collScan->doWork(&id)) { + case PlanStage::IS_EOF: + invariant(nextResult.isEOF()); + return nextResult; + case PlanStage::ADVANCED: { + // We need to restore the _id field which was removed when we added this + // entry into the oplog. This is like a stripped-down DSCSTransform stage. + MutableDocument mutableDoc{Document{_ws.get(id)->obj.value()}}; + mutableDoc["_id"] = nextResult.getDocument()["_id"]; + return mutableDoc.freeze(); + } + case PlanStage::NEED_TIME: + continue; + case PlanStage::NEED_YIELD: + case PlanStage::FAILURE: + case PlanStage::DEAD: + MONGO_UNREACHABLE; + } + } + MONGO_UNREACHABLE; + } + +private: + std::unique_ptr<MatchExpression> _parseAndNormalize(BSONObj filterExpr) { + auto filter = uassertStatusOK(MatchExpressionParser::parse(filterExpr, pExpCtx)); + filter = MatchExpression::optimize(std::move(filter)); + CanonicalQuery::sortTree(filter.get()); + return filter; + } + + size_t _getNumDocsTested() { + return static_cast<const CollectionScanStats*>(_collScan->getSpecificStats())->docsTested; + } + + ChangeStreamOplogCollectionMock* _collection; + std::unique_ptr<Collection> _collWrapper; + + std::unique_ptr<CollectionScan> _collScan; + CollectionScanParams _params; + + std::unique_ptr<MatchExpression> _filter; + BSONObj _filterExpr; + + WorkingSet _ws; +}; + class CheckResumeTokenTest : public AggregationContextFixture { public: - CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {} + CheckResumeTokenTest() : _mock(new DocumentSourceChangeStreamMock(getExpCtx())) {} protected: /** + * Pushes a document with a resume token corresponding to the given ResumeTokenData into the + * mock queue. This document will have an ns field that matches the test namespace, and will + * appear in the change stream pipeline if its timestamp is at or after the resume timestamp. + */ + void addOplogEntryOnTestNS(ResumeTokenData tokenData) { + _mock->push_back(Document{{"ns", kTestNs}, + {"ts", tokenData.clusterTime}, + {"_id", + ResumeToken(std::move(tokenData)) + .toDocument(ResumeToken::SerializationFormat::kHexString)}}); + } + + /** * Pushes a document with a resume token corresponding to the given timestamp, version, * applyOpsIndex, docKey, and namespace into the mock queue. */ - void addDocument( - Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) { - _mock->queue.push_back( - Document{{"_id", - ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, uuid, Value(docKey))) - .toDocument(ResumeToken::SerializationFormat::kHexString)}}); + void addOplogEntryOnTestNS( + Timestamp ts, int version, std::size_t txnOpIndex, Document docKey, UUID uuid) { + return addOplogEntryOnTestNS({ts, version, txnOpIndex, uuid, Value(docKey)}); } /** * Pushes a document with a resume token corresponding to the given timestamp, version, * applyOpsIndex, docKey, and namespace into the mock queue. */ - void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, docKey, uuid); + void addOplogEntryOnTestNS(Timestamp ts, Document docKey, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, docKey, uuid); } /** * Pushes a document with a resume token corresponding to the given timestamp, _id string, and * namespace into the mock queue. */ - void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) { - addDocument(ts, 0, 0, Document{{"_id", id}}, uuid); + void addOplogEntryOnTestNS(Timestamp ts, std::string id, UUID uuid = testUuid()) { + addOplogEntryOnTestNS(ts, 0, 0, Document{{"_id", id}}, uuid); } + /** + * Pushes a document that does not match the test namespace into the mock oplog. This will be + * examined by the oplog CollectionScan but will not produce an event in the pipeline. + */ + void addOplogEntryOnOtherNS(Timestamp ts) { + _mock->push_back(Document{{"ns", kOtherNs}, {"ts", ts}}); + } + + /** + * Pushes a pause in execution into the pipeline queue. + */ void addPause() { _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); } /** + * Convenience method to create the class under test with a given ResumeTokenData. + */ + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( + ResumeTokenData tokenData) { + auto checkResumeToken = + DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); + checkResumeToken->setSource(_mock.get()); + return checkResumeToken; + } + + /** * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, int version, std::size_t applyOpsIndex, boost::optional<Document> docKey, UUID uuid) { - auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create( - getExpCtx(), {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()}); - checkResumeToken->setSource(_mock.get()); - return checkResumeToken; + return createDSEnsureResumeTokenPresent( + {ts, version, applyOpsIndex, uuid, docKey ? Value(*docKey) : Value()}); } /** * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, docKey, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid); } /** * Convenience method to create the class under test with a given timestamp, _id string, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( Timestamp ts, StringData id, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid); + return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid); } /** @@ -136,29 +426,29 @@ protected: return *uuid_gen; } - intrusive_ptr<DocumentSourceMock> _mock; + intrusive_ptr<DocumentSourceChangeStreamMock> _mock; }; -class ShardCheckResumabilityTest : public CheckResumeTokenTest { +class CheckResumabilityTest : public CheckResumeTokenTest { protected: - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability( + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability( ResumeTokenData tokenData) { - auto shardCheckResumability = - DocumentSourceShardCheckResumability::create(getExpCtx(), tokenData); - shardCheckResumability->setSource(_mock.get()); - return shardCheckResumability; + auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData); + _mock->setResumeToken(std::move(tokenData)); + dsCheckResumability->setSource(_mock.get()); + return dsCheckResumability; } - intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(Timestamp ts) { - return createShardCheckResumability( - ResumeToken::makeHighWaterMarkToken(ts, testUuid()).getData()); + intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) { + return createDSCheckResumability( + ResumeToken::makeHighWaterMarkToken(ts, boost::none).getData()); } }; TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We should not see the resume token. ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -166,9 +456,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); addPause(); - addDocument(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); // We see the pause we inserted, but not the resume token. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -179,10 +469,10 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { Timestamp resumeTimestamp(100, 1); Timestamp doc1Timestamp(100, 2); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); addPause(); - addDocument(doc1Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "2"); // Pause added explicitly. ASSERT_TRUE(checkResumeToken->getNext().isPaused()); @@ -197,13 +487,13 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); + addOplogEntryOnTestNS(resumeTimestamp, "0"); Timestamp doc1Timestamp(100, 2); Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); auto result1 = checkResumeToken->getNext(); ASSERT_TRUE(result1.isAdvanced()); @@ -217,23 +507,28 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) } TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { - Timestamp resumeTimestamp(100, 1); + // The first timestamp in the oplog precedes the resume token, and the second matches it... + Timestamp doc1Timestamp(100, 1); + Timestamp resumeTimestamp(100, 2); + Timestamp doc2Timestamp = resumeTimestamp; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); - Timestamp doc1Timestamp(100, 2); - Timestamp doc2Timestamp(101, 1); - addDocument(doc1Timestamp, "1"); - addDocument(doc2Timestamp, "2"); + // ... but there's no entry in the oplog that matches the full token. + addOplogEntryOnTestNS(doc1Timestamp, "1"); + addOplogEntryOnTestNS(doc2Timestamp, "2"); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierTimestamp) { +TEST_F(CheckResumeTokenTest, ShouldIgnoreChangeWithEarlierResumeToken) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1"); + + // Add an entry into the oplog with the same timestamp but a lower documentKey. We swallow it + // but don't throw - we haven't surpassed the token yet and still may see it in the next doc. + addOplogEntryOnTestNS(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -241,9 +536,9 @@ TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { Timestamp resumeTimestamp(100, 1); auto resumeTokenUUID = UUID::gen(); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", resumeTokenUUID); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "1", resumeTokenUUID); auto otherUUID = UUID::gen(); - addDocument(resumeTimestamp, "1", otherUUID); + addOplogEntryOnTestNS(resumeTimestamp, "1", otherUUID); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } @@ -254,9 +549,9 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "abc"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "abc"); // We must not see the following document. - addDocument(resumeTimestamp, "ABC"); + addOplogEntryOnTestNS(resumeTimestamp, "ABC"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -268,13 +563,14 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdM Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); Timestamp doc1Timestamp(100, 1); - addDocument(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); Timestamp doc2Timestamp(100, 2); Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}}; - addDocument(doc2Timestamp, doc2DocKey); + addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey); // We should skip doc1 since it satisfies the resume token, and retrieve doc2. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -289,10 +585,11 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 0}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -307,10 +604,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen getExpCtx()->inMongos = true; auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -323,10 +620,10 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, boost::none); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, boost::none); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"_id"_sd, 1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"_id"_sd, 2}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -339,11 +636,12 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) { Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, Document{{"x"_sd, 0}}); + auto checkResumeToken = + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"x"_sd, 0}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); - addDocument(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); - addDocument(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}, {"_id", 1}}); + addOplogEntryOnTestNS(Timestamp(100, 1), {{"x"_sd, 0}, {"y"_sd, -1}}); + addOplogEntryOnTestNS(Timestamp(100, 2), {{"x"_sd, 0}, {"y"_sd, -1}}); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); @@ -368,20 +666,20 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the higher-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime but a lower UUID. One of the documents has // a lower docKey than the resume token, the other has a higher docKey; this demonstrates that // the UUID is the discriminating factor. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 2}}, uuids[0]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and UUID whose docKey sorts after the token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -406,19 +704,19 @@ TEST_F(CheckResumeTokenTest, // Create the resume token using the lower-sorting UUID. auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); + createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}, uuids[0]); // Add a document which has the same clusterTime and a lower docKey but a higher UUID, followed // by a document which matches the resume token. This is not possible in practice, but it serves // to demonstrate that the resume attempt fails even when the resume token is present. - addDocument(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); - addDocument(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 0}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, {{"_id"_sd, 1}}, uuids[0]); ASSERT_THROWS_CODE( checkResumeToken->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) { +TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { Timestamp resumeTimestamp(100, 1); // Create an ordered array of 3 UUIDs. @@ -427,21 +725,21 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) { std::sort(uuids.begin(), uuids.end()); auto checkResumeToken = - createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); + createDSEnsureResumeTokenPresent(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); // Add two documents which have the same clusterTime and version but a lower applyOps index. One // of the documents has a lower uuid than the resume token, the other has a higher uuid; this // demonstrates that the applyOps index is the discriminating factor. - addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); - addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); // Add a third document that matches the resume token. - addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); // Add a fourth document with the same timestamp and version whose applyOps sorts after the // resume token. auto expectedDocKey = Document{{"_id"_sd, 3}}; - addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); + addOplogEntryOnTestNS(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); // We should skip the first two docs, swallow the resume token, and return the fourth doc. const auto firstDocAfterResume = checkResumeToken->getNext(); @@ -458,278 +756,255 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) { TEST_F(CheckResumeTokenTest, ShouldSucceedWithNoDocuments) { Timestamp resumeTimestamp(100, 1); - auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + auto checkResumeToken = createDSEnsureResumeTokenPresent(resumeTimestamp, "0"); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } -/** - * A mock MongoProcessInterface which allows mocking a foreign pipeline. - */ -class MockMongoInterface final : public StubMongoProcessInterface { -public: - MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) - : _mockResults(std::move(mockResults)) {} - - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { - return false; - } - - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); - } - - if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); - } - - return pipeline; - } - - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); - } - -private: - deque<DocumentSource::GetNextResult> _mockResults; -}; - -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, - ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, + ShouldSucceedIfResumeTokenIsPresentAndEarliestOplogEntryEqualToToken) { Timestamp resumeTimestamp(100, 1); - Timestamp oplogTimestamp(100, 2); + Timestamp oplogTimestamp(100, 1); - ResumeTokenData tokenData( - resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(tokenData); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(resumeTimestamp, "ID"); // We should see the resume token. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldPermanentlyEofIfOplogIsEmpty) { Timestamp resumeTimestamp(100, 1); - ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "ID"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - addDocument(resumeTimestamp, "ID"); - // We should see the resume token. + // As with other tailable cursors, starting a change stream on an empty capped collection will + // cause the cursor to immediately and permanently EOF. This should never happen in practice, + // since a replset member can only accept requests while in PRIMARY, SECONDARY or RECOVERING + // states, and there must be at least one entry in the oplog in order to reach those states. + auto shardCheckResumability = createDSCheckResumability(resumeTimestamp); auto result = shardCheckResumability->getNext(); - ASSERT_TRUE(result.isAdvanced()); - auto& doc = result.getDocument(); - ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); + ASSERT_TRUE(result.isEOF()); + ASSERT_TRUE(_mock->isPermanentlyEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, - ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { +TEST_F(CheckResumabilityTest, + ShouldSucceedWithNoDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isEOF()); +} + +TEST_F(CheckResumabilityTest, ShouldFailWithNoDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { +TEST_F(CheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplogIsEmpty) { Timestamp resumeTimestamp(100, 2); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryBeforeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } -TEST_F(ShardCheckResumabilityTest, +TEST_F(CheckResumabilityTest, + ShouldSucceedWithLaterDocumentsInPipelineAndEarliestOplogEntryEqualToToken) { + Timestamp oplogTimestamp(100, 1); + Timestamp resumeTimestamp(100, 1); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result = dsCheckResumability->getNext(); + ASSERT_TRUE(result.isAdvanced()); + auto& doc = result.getDocument(); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); +} + +TEST_F(CheckResumabilityTest, ShouldFailWithLaterDocumentsInPipelineAndEarliestOplogEntryAfterToken) { Timestamp resumeTimestamp(100, 1); Timestamp oplogTimestamp(100, 2); Timestamp docTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); ASSERT_THROWS_CODE( - shardCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { +TEST_F(CheckResumabilityTest, + ShouldFailWithoutReadingLaterDocumentsInPipelineIfEarliestOplogEntryAfterToken) { + Timestamp resumeTimestamp(100, 1); + Timestamp oplogTimestamp(100, 2); + Timestamp docTimestamp(100, 3); + + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + // Confirm that there are two documents queued in the mock oplog. + ASSERT_EQ(_mock->size(), 2u); + ASSERT_THROWS_CODE( + dsCheckResumability->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError); + // Confirm that only the first document was read before the assertion was thrown. + ASSERT_EQ(_mock->size(), 1u); +} + +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAfterResumeToken) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); Timestamp docTimestamp(100, 4); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - addDocument(docTimestamp, "ID"); - deque<DocumentSource::GetNextResult> mockOplog( - {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + addOplogEntryOnTestNS(docTimestamp, "ID"); + + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); - auto result2 = shardCheckResumability->getNext(); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { +TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { Timestamp oplogTimestamp(100, 1); Timestamp resumeTimestamp(100, 2); Timestamp oplogFutureTimestamp(100, 3); - auto shardCheckResumability = createShardCheckResumability(resumeTimestamp); - deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result1 = shardCheckResumability->getNext(); + auto dsCheckResumability = createDSCheckResumability(resumeTimestamp); + addOplogEntryOnOtherNS(oplogTimestamp); + auto result1 = dsCheckResumability->getNext(); ASSERT_TRUE(result1.isEOF()); - mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); - auto result2 = shardCheckResumability->getNext(); + addOplogEntryOnOtherNS(oplogFutureTimestamp); + auto result2 = dsCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add the resume token, plus one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "3"); - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "3"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the resume token... - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto doc = result.getDocument(); ASSERT_EQ(token, ResumeToken::parse(doc["_id"].getDocument()).getData()); // ... then the post-token event, and then finally EOF. - result = shardCheckResumability->getNext(); + result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument(ResumeToken::SerializationFormat::kHexString)}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(ResumeToken::SerializationFormat::kHexString); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } -TEST_F(ShardCheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { +TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceShardCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); - auto shardCheckResumability = createShardCheckResumability(token); - deque<DocumentSource::GetNextResult> mockOplog; - getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockOplog); + auto dsCheckResumability = createDSCheckResumability(token); // Add 2 events at the same clusterTime as the resume token but whose docKey sort before it. - addDocument(resumeTimestamp, "1"); - addDocument(resumeTimestamp, "2"); + addOplogEntryOnTestNS(resumeTimestamp, "1"); + addOplogEntryOnTestNS(resumeTimestamp, "2"); // Add one further event whose docKey sorts after the token. - addDocument(resumeTimestamp, "4"); + addOplogEntryOnTestNS(resumeTimestamp, "4"); // The first event we see should be the post-token event, followed by EOF. - auto result = shardCheckResumability->getNext(); + auto result = dsCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); - Document postResumeTokenDoc{ - {"_id", - ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) - .toDocument(ResumeToken::SerializationFormat::kHexString)}}; - ASSERT_DOCUMENT_EQ(result.getDocument(), postResumeTokenDoc); - ASSERT_TRUE(shardCheckResumability->getNext().isEOF()); + auto postResumeTokenDoc = + ResumeToken({resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "4"_sd}})}) + .toDocument(ResumeToken::SerializationFormat::kHexString); + ASSERT_DOCUMENT_EQ(result.getDocument()["_id"].getDocument(), postResumeTokenDoc); + ASSERT_TRUE(dsCheckResumability->getNext().isEOF()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index 26623df34a6..19771bdbc2c 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -106,4 +106,8 @@ DocumentSource::GetNextResult DocumentSourceMock::getNext() { queue.pop_front(); return next; } + +const size_t DocumentSourceMock::size() const { + return queue.size(); } +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index aeade157c7e..1b88503706d 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -78,6 +78,8 @@ public: static boost::intrusive_ptr<DocumentSourceMock> create( const std::initializer_list<const char*>& jsons); + const size_t size() const; + void reattachToOperationContext(OperationContext* opCtx) { isDetachedFromOpCtx = false; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 5291d665598..c4e19f548d5 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -401,7 +401,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep pipeline->getSources().empty() ? nullptr : pipeline->getSources().front().get(); if (firstSource && firstSource->constraints().isChangeStreamStage()) { invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); - plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | + QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 41b7d70f786..3e0d1885a83 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -650,11 +650,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( if (startLoc) { params.start = *startLoc; } + params.minTs = minTs; params.maxTs = maxTs; params.direction = CollectionScanParams::FORWARD; params.tailable = cq->getQueryRequest().isTailable(); params.shouldTrackLatestOplogTimestamp = plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + params.assertMinTsHasNotFallenOffOplog = + plannerOptions & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, params.tailable); diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index 7f2fa557584..d220eec241c 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -154,6 +154,8 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( csn->maxScan = query.getQueryRequest().getMaxScan(); csn->shouldTrackLatestOplogTimestamp = params.options & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; + csn->assertMinTsHasNotFallenOffOplog = + params.options & QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG; csn->shouldWaitForOplogVisibility = params.options & QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE; diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index 18944f8e17b..e4fc226157b 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -138,6 +138,9 @@ string optionString(size_t options) { case QueryPlannerParams::OPLOG_SCAN_WAIT_FOR_VISIBLE: ss << "OPLOG_SCAN_WAIT_FOR_VISIBLE "; break; + case QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG: + ss << "ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG "; + break; case QueryPlannerParams::ENUMERATE_OR_CHILDREN_LOCKSTEP: ss << "ENUMERATE_OR_CHILDREN_LOCKSTEP "; break; diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index 94cedcfbe0e..46ff6244726 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -103,6 +103,10 @@ struct QueryPlannerParams { // Set this so that collection scans on the oplog wait for visibility before reading. OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 13, + // Set this on an oplog scan to uassert that the oplog has not already rolled over the + // minimum 'ts' timestamp specified in the query. + ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG = 1 << 14, + // Instruct the plan enumerator to enumerate contained $ors in a special order. $or // enumeration can generate an exponential number of plans, and is therefore limited at some // arbitrary cutoff controlled by a parameter. When this limit is hit, the order of @@ -118,7 +122,7 @@ struct QueryPlannerParams { // order, we would get assignments [a_b, a_b], [a_c, a_c], [a_c, a_b], then [a_b, a_c]. This // is thought to be helpful in general, but particularly in cases where all children of the // $or use the same fields and have the same indexes available, as in this example. - ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 14, + ENUMERATE_OR_CHILDREN_LOCKSTEP = 1 << 15, }; // See Options enum above. diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index f7e85aaa9e4..39ab80a4a4b 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -250,6 +250,7 @@ QuerySolutionNode* CollectionScanNode::clone() const { copy->direction = this->direction; copy->maxScan = this->maxScan; copy->shouldTrackLatestOplogTimestamp = this->shouldTrackLatestOplogTimestamp; + copy->assertMinTsHasNotFallenOffOplog = this->assertMinTsHasNotFallenOffOplog; copy->shouldWaitForOplogVisibility = this->shouldWaitForOplogVisibility; return copy; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 2543b4e7c2b..1546ff1421d 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -319,6 +319,9 @@ struct CollectionScanNode : public QuerySolutionNode { // across a sharded cluster. bool shouldTrackLatestOplogTimestamp = false; + // Should we assert that the specified minTS has not fallen off the oplog? + bool assertMinTsHasNotFallenOffOplog = false; + int direction; // maxScan option to .find() limits how many docs we look at. diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 89a996be968..0f323f6aa23 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -81,6 +81,7 @@ PlanStage* buildStages(OperationContext* opCtx, params.collection = collection; params.tailable = csn->tailable; params.shouldTrackLatestOplogTimestamp = csn->shouldTrackLatestOplogTimestamp; + params.assertMinTsHasNotFallenOffOplog = csn->assertMinTsHasNotFallenOffOplog; params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.maxScan = csn->maxScan; diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 815de6a4074..f89016e8bc3 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -40,6 +40,11 @@ namespace mongo { namespace repl { /** + * The first oplog entry is a no-op with this message in its "msg" field. + */ +constexpr auto kInitiatingSetMsg = "initiating set"_sd; + +/** * A parsed oplog entry that inherits from the OplogEntryBase parsed by the IDL. * This class is immutable. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 8f3870337c9..092e6971ee8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -431,8 +431,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati WriteUnitOfWork wuow(opCtx); Helpers::putSingleton(opCtx, configCollectionName, config); - const auto msgObj = BSON("msg" - << "initiating set"); + const auto msgObj = BSON("msg" << kInitiatingSetMsg); _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); // ReplSetTest assumes that immediately after the replSetInitiate |