diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-05-18 09:19:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-20 11:54:09 +0000 |
commit | ac173f107cf2642007bbae401eac5fa62106eda7 (patch) | |
tree | ed0794aaa46b4ecfffbcbdfd423e390996d5ff0c /src/mongo/db/pipeline | |
parent | 6997403b9fab1e397e1e13bbdf6e899aeee8b6fd (diff) | |
download | mongo-ac173f107cf2642007bbae401eac5fa62106eda7.tar.gz |
SERVER-55424: Rename change streams DocumentSources to include 'DocumentSourceChangeStream' prefix.
Diffstat (limited to 'src/mongo/db/pipeline')
25 files changed, 284 insertions, 247 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index af8c34fcbf3..9c167fb3b3b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -341,17 +341,17 @@ env.Library( 'change_stream_document_diff_parser.cpp', 'change_stream_helpers_legacy.cpp', 'document_source_change_stream.cpp', + 'document_source_change_stream_check_invalidate.cpp', + 'document_source_change_stream_check_resumability.cpp', 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_ensure_resume_token_present.cpp', + 'document_source_change_stream_lookup_post_image.cpp', + 'document_source_change_stream_lookup_pre_image.cpp', 'document_source_change_stream_oplog_match.cpp', 'document_source_change_stream_topology_change.cpp', 'document_source_change_stream_transform.cpp', 'document_source_change_stream_unwind_transactions.cpp', - 'document_source_check_invalidate.cpp', - 'document_source_check_resume_token.cpp', - 'document_source_lookup_change_post_image.cpp', - 'document_source_lookup_change_pre_image.cpp', - 'document_source_update_on_add_shard.cpp', + 'document_source_change_stream_update_on_add_shard.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/pipeline/pipeline', @@ -423,7 +423,7 @@ env.CppUnitTest( 'document_source_internal_shard_filter_test.cpp', 'document_source_internal_split_pipeline_test.cpp', 'document_source_limit_test.cpp', - 'document_source_lookup_change_post_image_test.cpp', + 'document_source_change_stream_lookup_post_image_test.cpp', 'document_source_lookup_test.cpp', 'document_source_match_test.cpp', 'document_source_merge_cursors_test.cpp', diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 5d83629a751..65baaeb3e6b 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -31,17 +31,17 @@ #include "mongo/db/pipeline/change_stream_helpers_legacy.h" +#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" -#include "mongo/db/pipeline/document_source_check_invalidate.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" -#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" -#include "mongo/db/pipeline/document_source_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include "mongo/db/pipeline/expression.h" namespace mongo { @@ -70,7 +70,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( const ResumeTokenData tokenData = token.getData(); // If resuming from an "invalidate" using "startAfter", pass along the resume token data to - // DocumentSourceCheckInvalidate to signify that another invalidate should not be generated. + // DocumentSourceChangeStreamCheckInvalidate to signify that another invalidate should not + // be generated. if (startAfter && tokenData.fromInvalidate) { startAfterInvalidate = tokenData; } @@ -93,9 +94,10 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // before any event that would sort after it. High water mark tokens, however, do not refer // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'. if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) { - resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData); + resumeStage = DocumentSourceChangeStreamCheckResumability::create(expCtx, tokenData); } else { - resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData); + resumeStage = + DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, tokenData); } } @@ -104,7 +106,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( uassert(40674, "Only one type of resume option is allowed, but multiple were found.", !resumeStage); - resumeStage = DocumentSourceCheckResumability::create(expCtx, *startAtOperationTime); + resumeStage = + DocumentSourceChangeStreamCheckResumability::create(expCtx, *startAtOperationTime); } auto transformStage = DocumentSourceChangeStreamTransform::createFromBson(rawSpec, expCtx); @@ -115,7 +118,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies // upon the fact that it is always the first stage in the pipeline. - stages.push_back(DocumentSourceOplogMatch::create(expCtx, showMigrationEvents)); + stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, showMigrationEvents)); stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); stages.push_back(transformStage); @@ -131,24 +134,26 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. - stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate)); + stages.push_back( + DocumentSourceChangeStreamCheckInvalidate::create(expCtx, startAfterInvalidate)); - // The resume stage 'DocumentSourceCheckResumability' should come before the split point stage - // 'DocumentSourceUpdateOnAddShard'. + // The resume stage 'DocumentSourceChangeStreamCheckResumability' should come before the split + // point stage 'DocumentSourceChangeStreamUpdateOnAddShard'. if (resumeStage && - resumeStage->getSourceName() == DocumentSourceCheckResumability::kStageName) { + resumeStage->getSourceName() == DocumentSourceChangeStreamCheckResumability::kStageName) { stages.push_back(resumeStage); resumeStage.reset(); } - // If the pipeline is build on MongoS, then the stage 'DocumentSourceUpdateOnAddShard' acts as - // the split point for the pipline. All stages before this stages will run on shards and all - // stages after and inclusive of this stage will run on the MongoS. + // If the pipeline is build on MongoS, then the stage + // 'DocumentSourceChangeStreamUpdateOnAddShard' acts as the split point for the pipline. All + // stages before this stages will run on shards and all stages after and inclusive of this stage + // will run on the MongoS. if (expCtx->inMongos) { - stages.push_back(DocumentSourceUpdateOnAddShard::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamUpdateOnAddShard::create(expCtx)); } - // This resume stage should be 'DocumentSourceEnsureResumeTokenPresent'. + // This resume stage should be 'DocumentSourceChangeStreamEnsureResumeTokenPresent'. if (resumeStage) { stages.push_back(resumeStage); } @@ -158,7 +163,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // There should only be one close cursor stage. If we're on the shards and producing // input to be merged, do not add a close cursor stage, since the mongos will already // have one. - stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx)); } // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here @@ -169,13 +174,13 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // TODO SERVER-36941: figure out how to get this to work in a sharded cluster. if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) { invariant(!expCtx->inMongos); - stages.push_back(DocumentSourceLookupChangePreImage::create(expCtx, spec)); + stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec)); } // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { - stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx)); } } @@ -183,7 +188,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( } } // namespace change_stream_legacy -Value DocumentSourceOplogMatch::serializeLegacy( +Value DocumentSourceChangeStreamOplogMatch::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(Document{{"$_internalOplogMatch"_sd, Document{}}}) : Value(); } @@ -201,13 +206,13 @@ Value DocumentSourceChangeStreamTransform::serializeLegacy( return Value(Document{{getSourceName(), _changeStreamSpec.toBSON()}}); } -Value DocumentSourceCheckInvalidate::serializeLegacy( +Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { // We only serialize this stage in the context of explain. return explain ? Value(DOC(kStageName << Document())) : Value(); } -Value DocumentSourceCheckResumability::serializeLegacy( +Value DocumentSourceChangeStreamCheckResumability::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { // We only serialize this stage in the context of explain. return explain ? Value(DOC(getSourceName() @@ -225,12 +230,12 @@ Value DocumentSourceChangeStreamUnwindTransaction::serializeLegacy( return Value(); } -Value DocumentSourceLookupChangePreImage::serializeLegacy( +Value DocumentSourceChangeStreamLookupPreImage::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { return (explain ? Value{Document{{kStageName, Document()}}} : Value()); } -Value DocumentSourceLookupChangePostImage::serializeLegacy( +Value DocumentSourceChangeStreamLookupPostImage::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { return (explain ? Value{Document{{kStageName, Document()}}} : Value()); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a1543b79dfd..94c6f58adf0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -38,16 +38,16 @@ #include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" -#include "mongo/db/pipeline/document_source_check_invalidate.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" -#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/resume_token.h" diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index fb8d12521ff..a5ee091e326 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_check_invalidate.h" +#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/util/assert_util.h" @@ -43,7 +43,7 @@ using DSCS = DocumentSourceChangeStream; REGISTER_INTERNAL_DOCUMENT_SOURCE( _internalChangeStreamCheckInvalidate, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceCheckInvalidate::createFromBson, + DocumentSourceChangeStreamCheckInvalidate::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); namespace { @@ -65,7 +65,8 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace -boost::intrusive_ptr<DocumentSourceCheckInvalidate> DocumentSourceCheckInvalidate::createFromBson( +boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> +DocumentSourceChangeStreamCheckInvalidate::createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467602, str::stream() << "the '" << kStageName << "' object spec must be an object", @@ -74,14 +75,14 @@ boost::intrusive_ptr<DocumentSourceCheckInvalidate> DocumentSourceCheckInvalidat auto parsed = DocumentSourceChangeStreamCheckInvalidateSpec::parse( IDLParserErrorContext("DocumentSourceChangeStreamCheckInvalidateSpec"), spec.embeddedObject()); - return new DocumentSourceCheckInvalidate( + return new DocumentSourceChangeStreamCheckInvalidate( expCtx, parsed.getStartAfterInvalidate() ? boost::optional<ResumeTokenData>(parsed.getStartAfterInvalidate()->getData()) : boost::none); } -DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNext() { // To declare a change stream as invalidated, this stage first emits an invalidate event and // then throws a 'ChangeStreamInvalidated' exception on the next call to this method. @@ -160,7 +161,7 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() { return nextInput; } -Value DocumentSourceCheckInvalidate::serializeLatest( +Value DocumentSourceChangeStreamCheckInvalidate::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(Document{{DocumentSourceChangeStream::kStageName, @@ -171,7 +172,7 @@ Value DocumentSourceCheckInvalidate::serializeLatest( if (_startAfterInvalidate) { spec.setStartAfterInvalidate(ResumeToken(*_startAfterInvalidate)); } - return Value(Document{{DocumentSourceCheckInvalidate::kStageName, spec.toBSON()}}); + return Value(Document{{DocumentSourceChangeStreamCheckInvalidate::kStageName, spec.toBSON()}}); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h index 88c9d3372f1..92647a6b48d 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -39,14 +39,15 @@ namespace mongo { * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop * for a single-collection change stream). It is not intended to be created by the user. */ -class DocumentSourceCheckInvalidate final : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamCheckInvalidate final + : public DocumentSource, + public ChangeStreamStageSerializationInterface { public: static constexpr StringData kStageName = "$_internalChangeStreamCheckInvalidate"_sd; const char* getSourceName() const final { // This is used in error reporting. - return DocumentSourceCheckInvalidate::kStageName.rawData(); + return DocumentSourceChangeStreamCheckInvalidate::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -69,20 +70,21 @@ public: return ChangeStreamStageSerializationInterface::serializeToValue(explain); } - static boost::intrusive_ptr<DocumentSourceCheckInvalidate> createFromBson( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx); - static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<ResumeTokenData> startAfterInvalidate) { - return new DocumentSourceCheckInvalidate(expCtx, std::move(startAfterInvalidate)); + return new DocumentSourceChangeStreamCheckInvalidate(expCtx, + std::move(startAfterInvalidate)); } private: /** - * Use the create static method to create a DocumentSourceCheckInvalidate. + * Use the create static method to create a DocumentSourceChangeStreamCheckInvalidate. */ - DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<ResumeTokenData> startAfterInvalidate) + DocumentSourceChangeStreamCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<ResumeTokenData> startAfterInvalidate) : DocumentSource(kStageName, expCtx), _startAfterInvalidate(std::move(startAfterInvalidate)) { invariant(!_startAfterInvalidate || diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index d359e7f0bb3..ffb7ff6cd72 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/curop.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/repl/oplog_entry.h" @@ -41,10 +41,10 @@ namespace { REGISTER_INTERNAL_DOCUMENT_SOURCE( _internalChangeStreamCheckResumability, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceCheckResumability::createFromBson, + DocumentSourceChangeStreamCheckResumability::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); -using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus; +using ResumeStatus = DocumentSourceChangeStreamCheckResumability::ResumeStatus; // Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, @@ -173,26 +173,28 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } } // namespace -DocumentSourceCheckResumability::DocumentSourceCheckResumability( +DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResumability( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) : DocumentSource(getSourceName(), expCtx), _tokenFromClient(std::move(token)) {} -intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( - const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) { +intrusive_ptr<DocumentSourceChangeStreamCheckResumability> +DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx, + Timestamp ts) { // We are resuming from a point in time, not an event. Seed the stage with a high water mark. return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData()); } -intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create( - const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceCheckResumability(expCtx, std::move(token)); +intrusive_ptr<DocumentSourceChangeStreamCheckResumability> +DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx, + ResumeTokenData token) { + return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token)); } -const char* DocumentSourceCheckResumability::getSourceName() const { +const char* DocumentSourceChangeStreamCheckResumability::getSourceName() const { return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGetNext() { if (_resumeStatus == ResumeStatus::kSurpassedToken) { return pSource->getNext(); } @@ -234,7 +236,7 @@ DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() { MONGO_UNREACHABLE; } -Value DocumentSourceCheckResumability::serializeLatest( +Value DocumentSourceChangeStreamCheckResumability::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(DOC(DocumentSourceChangeStream::kStageName @@ -242,7 +244,7 @@ Value DocumentSourceCheckResumability::serializeLatest( << "internalCheckResumability"_sd << "resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) : Value(Document{ - {DocumentSourceCheckResumability::kStageName, + {DocumentSourceChangeStreamCheckResumability::kStageName, DocumentSourceChangeStreamCheckResumabilitySpec(ResumeToken(_tokenFromClient)) .toBSON()}}); } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h index ad546f51b4b..9cbed08490a 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -59,8 +59,8 @@ namespace mongo { * - Otherwise we cannot resume, as we do not know if there were any events between the resume token * and the first matching document in the oplog. */ -class DocumentSourceCheckResumability : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamCheckResumability : public DocumentSource, + public ChangeStreamStageSerializationInterface { public: static constexpr StringData kStageName = "$_internalChangeStreamCheckResumability"_sd; @@ -94,7 +94,7 @@ public: return ChangeStreamStageSerializationInterface::serializeToValue(explain); } - static boost::intrusive_ptr<DocumentSourceCheckResumability> createFromBson( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467603, str::stream() << "the '" << kStageName << "' object spec must be an object", @@ -106,18 +106,18 @@ public: return create(expCtx, parsed.getResumeToken().getData()); } - static boost::intrusive_ptr<DocumentSourceCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts); - static boost::intrusive_ptr<DocumentSourceCheckResumability> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); protected: /** - * Use the create static method to create a DocumentSourceCheckResumability. + * Use the create static method to create a DocumentSourceChangeStreamCheckResumability. */ - DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token); + DocumentSourceChangeStreamCheckResumability( + const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); GetNextResult doGetNext() override; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp index a4a43c654ee..a7db8e647a0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp @@ -52,7 +52,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace -DocumentSource::GetNextResult DocumentSourceCloseCursor::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamCloseCursor::doGetNext() { // Close cursor if we have returned an invalidate entry. if (_shouldCloseCursor) { uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index 59be3f550b3..16a1e13f2d4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -41,13 +41,13 @@ namespace mongo { * "invalidate" entries. * It is not intended to be created by the user. */ -class DocumentSourceCloseCursor final : public DocumentSource { +class DocumentSourceChangeStreamCloseCursor final : public DocumentSource { public: static constexpr StringData kStageName = "$changeStream"_sd; const char* getSourceName() const final { // This is used in error reporting. - return DocumentSourceCloseCursor::kStageName.rawData(); + return DocumentSourceChangeStreamCloseCursor::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -71,9 +71,9 @@ public: return explain ? Value(DOC(kStageName << Document())) : Value(); } - static boost::intrusive_ptr<DocumentSourceCloseCursor> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamCloseCursor> create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceCloseCursor(expCtx); + return new DocumentSourceChangeStreamCloseCursor(expCtx); } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { @@ -82,9 +82,9 @@ public: private: /** - * Use the create static method to create a DocumentSourceCloseCursor. + * Use the create static method to create a DocumentSourceChangeStreamCloseCursor. */ - DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx) + DocumentSourceChangeStreamCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index 61f382e3e9d..cb6dfc0ede1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -33,21 +33,22 @@ namespace mongo { -DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSourceCheckResumability(expCtx, std::move(token)) {} +DocumentSourceChangeStreamEnsureResumeTokenPresent:: + DocumentSourceChangeStreamEnsureResumeTokenPresent( + const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) + : DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token)) {} -boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> -DocumentSourceEnsureResumeTokenPresent::create( +boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> +DocumentSourceChangeStreamEnsureResumeTokenPresent::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); + return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(token)); } -const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { +const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName() const { return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamEnsureResumeTokenPresent::doGetNext() { // If we have already verified the resume token is present, return the next doc immediately. if (_resumeStatus == ResumeStatus::kSurpassedToken) { return pSource->getNext(); @@ -59,9 +60,10 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext( // occurred at the same clusterTime on more than one shard, then we may see multiple identical // resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken. while (_resumeStatus != ResumeStatus::kSurpassedToken) { - // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This - // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token. - nextInput = DocumentSourceCheckResumability::doGetNext(); + // Delegate to DocumentSourceChangeStreamCheckResumability to consume all events up to the + // token. This will also set '_resumeStatus' to indicate whether we have seen or surpassed + // the token. + nextInput = DocumentSourceChangeStreamCheckResumability::doGetNext(); // If there are no more results, return EOF. We will continue checking for the resume token // the next time the getNext method is called. If we hit EOF, then we cannot have surpassed @@ -92,7 +94,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext( return nextInput; } -Value DocumentSourceEnsureResumeTokenPresent::serialize( +Value DocumentSourceChangeStreamEnsureResumeTokenPresent::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { // We only serialize this stage in the context of explain. if (explain) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h index c81f87b3617..b380f72d643 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h @@ -29,14 +29,15 @@ #pragma once -#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" namespace mongo { /** * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability { +class DocumentSourceChangeStreamEnsureResumeTokenPresent final + : public DocumentSourceChangeStreamCheckResumability { public: static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; @@ -56,17 +57,17 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; private: /** - * Use the create static method to create a DocumentSourceEnsureResumeTokenPresent. + * Use the create static method to create a DocumentSourceChangeStreamEnsureResumeTokenPresent. */ - DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token); + DocumentSourceChangeStreamEnsureResumeTokenPresent( + const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp index caa1bc5c058..8659f71e325 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -29,20 +29,20 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/bson/simple_bsonelement_comparator.h" namespace mongo { -constexpr StringData DocumentSourceLookupChangePostImage::kStageName; -constexpr StringData DocumentSourceLookupChangePostImage::kFullDocumentFieldName; +constexpr StringData DocumentSourceChangeStreamLookupPostImage::kStageName; +constexpr StringData DocumentSourceChangeStreamLookupPostImage::kFullDocumentFieldName; namespace { REGISTER_INTERNAL_DOCUMENT_SOURCE( _internalChangeStreamLookupPostImage, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceLookupChangePostImage::createFromBson, + DocumentSourceChangeStreamLookupPostImage::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); @@ -58,8 +58,8 @@ Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType } } // namespace -boost::intrusive_ptr<DocumentSourceLookupChangePostImage> -DocumentSourceLookupChangePostImage::createFromBson( +boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage> +DocumentSourceChangeStreamLookupPostImage::createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467608, str::stream() << "the '" << kStageName << "' stage spec must be an object", @@ -70,10 +70,10 @@ DocumentSourceLookupChangePostImage::createFromBson( str::stream() << "the 'fullDocument' field can only be 'updateLookup'", parsedSpec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup); - return new DocumentSourceLookupChangePostImage(expCtx); + return new DocumentSourceChangeStreamLookupPostImage(expCtx); } -DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamLookupPostImage::doGetNext() { auto input = pSource->getNext(); if (!input.isAdvanced()) { return input; @@ -89,7 +89,7 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() { return output.freeze(); } -NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace( +NamespaceString DocumentSourceChangeStreamLookupPostImage::assertValidNamespace( const Document& inputDoc) const { auto namespaceObject = assertFieldHasType(inputDoc, DocumentSourceChangeStream::kNamespaceField, BSONType::Object) @@ -110,7 +110,7 @@ NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace( return nss; } -Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const { +Value DocumentSourceChangeStreamLookupPostImage::lookupPostImage(const Document& updateOp) const { // Make sure we have a well-formed input. auto nss = assertValidNamespace(updateOp); @@ -147,7 +147,7 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL)); } -Value DocumentSourceLookupChangePostImage::serializeLatest( +Value DocumentSourceChangeStreamLookupPostImage::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(Document{{DocumentSourceChangeStream::kStageName, diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h index c674d2218e3..2a7c9666e68 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -38,22 +38,23 @@ namespace mongo { * Part of the change stream API machinery used to look up the post-image of a document. Uses the * "documentKey" field of the input to look up the new version of the document. */ -class DocumentSourceLookupChangePostImage final : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamLookupPostImage final + : public DocumentSource, + public ChangeStreamStageSerializationInterface { public: static constexpr StringData kStageName = "$_internalChangeStreamLookupPostImage"_sd; static constexpr StringData kFullDocumentFieldName = DocumentSourceChangeStream::kFullDocumentField; /** - * Creates a DocumentSourceLookupChangePostImage stage. + * Creates a DocumentSourceChangeStreamLookupPostImage stage. */ - static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage> create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceLookupChangePostImage(expCtx); + return new DocumentSourceChangeStreamLookupPostImage(expCtx); } - static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> createFromBson( + static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage> createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** @@ -107,7 +108,7 @@ public: } private: - DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) + DocumentSourceChangeStreamLookupPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} /** diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp index 8ed1907f31c..6672e54f1c6 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -41,7 +41,7 @@ #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" @@ -55,7 +55,7 @@ using std::vector; using MockMongoInterface = StubLookupSingleDocumentProcessInterface; // This provides access to getExpCtx(), but we'll use a different name for this test suite. -class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture { +class DocumentSourceChangeStreamLookupPostImageTest : public AggregationContextFixture { public: /** * This method is required to avoid a static initialization fiasco resulting from calling @@ -78,11 +78,11 @@ public: } }; -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "documentKey" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -101,11 +101,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingOperationType) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -124,11 +124,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingNamespace) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -148,11 +148,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfNsFieldHasWrongType) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "ns" field. auto mockLocalSource = @@ -171,11 +171,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -194,13 +194,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDatabaseMismatchOnCollectionlessNss) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, + ShouldErrorIfDatabaseMismatchOnCollectionlessNss) { auto expCtx = getExpCtx(); expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -219,13 +220,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDatabaseMismatchOnC ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPassIfDatabaseMatchesOnCollectionlessNss) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, + ShouldPassIfDatabaseMatchesOnCollectionlessNss) { auto expCtx = getExpCtx(); expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}}; @@ -251,11 +253,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPassIfDatabaseMatchesOnCol {"fullDocument", Document{{"_id", 0}}}})); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input with an update document. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -277,11 +279,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments); } -TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { +TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldPropagatePauses) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx); // Mock its input, pausing every other result. auto mockLocalSource = DocumentSourceMock::createForTest( diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.cpp index f8510e741b5..b4a7c51a735 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/db/transaction_history_iterator.h" @@ -44,34 +44,35 @@ namespace { REGISTER_INTERNAL_DOCUMENT_SOURCE( _internalChangeStreamLookupPreImage, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceLookupChangePreImage::createFromBson, + DocumentSourceChangeStreamLookupPreImage::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); } -constexpr StringData DocumentSourceLookupChangePreImage::kStageName; -constexpr StringData DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName; +constexpr StringData DocumentSourceChangeStreamLookupPreImage::kStageName; +constexpr StringData DocumentSourceChangeStreamLookupPreImage::kFullDocumentBeforeChangeFieldName; -boost::intrusive_ptr<DocumentSourceLookupChangePreImage> DocumentSourceLookupChangePreImage::create( +boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> +DocumentSourceChangeStreamLookupPreImage::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { auto mode = spec.getFullDocumentBeforeChange(); - return make_intrusive<DocumentSourceLookupChangePreImage>(expCtx, mode); + return make_intrusive<DocumentSourceChangeStreamLookupPreImage>(expCtx, mode); } -boost::intrusive_ptr<DocumentSourceLookupChangePreImage> -DocumentSourceLookupChangePreImage::createFromBson( +boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> +DocumentSourceChangeStreamLookupPreImage::createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467610, str::stream() << "the '" << kStageName << "' stage spec must be an object", elem.type() == BSONType::Object); auto parsedSpec = DocumentSourceChangeStreamLookUpPreImageSpec::parse( IDLParserErrorContext("DocumentSourceChangeStreamLookUpPreImageSpec"), elem.Obj()); - return make_intrusive<DocumentSourceLookupChangePreImage>( + return make_intrusive<DocumentSourceChangeStreamLookupPreImage>( expCtx, parsedSpec.getFullDocumentBeforeChange()); } -DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamLookupPreImage::doGetNext() { auto input = pSource->getNext(); if (!input.isAdvanced()) { return input; @@ -112,7 +113,7 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() { return outputDoc.freeze(); } -boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage( +boost::optional<Document> DocumentSourceChangeStreamLookupPreImage::lookupPreImage( const Document& inputDoc, const repl::OpTime& opTime) const { // We need the oplog's UUID for lookup, so obtain the collection info via MongoProcessInterface. auto localOplogInfo = pExpCtx->mongoProcessInterface->getCollectionOptions( @@ -153,7 +154,7 @@ boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage( return Document{opLogEntry.getObject().getOwned()}; } -Value DocumentSourceLookupChangePreImage::serializeLatest( +Value DocumentSourceChangeStreamLookupPreImage::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(Document{ diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h index c06b1b4a7e0..10e4028ee40 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -41,25 +41,26 @@ namespace mongo { * its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the * pre-image. This stage replaces that field with the actual pre-image document. */ -class DocumentSourceLookupChangePreImage final : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamLookupPreImage final + : public DocumentSource, + public ChangeStreamStageSerializationInterface { public: static constexpr StringData kStageName = "$_internalChangeStreamLookupPreImage"_sd; static constexpr StringData kFullDocumentBeforeChangeFieldName = DocumentSourceChangeStream::kFullDocumentBeforeChangeField; /** - * Creates a DocumentSourceLookupChangePostImage stage. + * Creates a DocumentSourceChangeStreamLookupPostImage stage. */ - static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec); - static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> createFromBson( + static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - DocumentSourceLookupChangePreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx, - FullDocumentBeforeChangeModeEnum mode) + DocumentSourceChangeStreamLookupPreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx, + FullDocumentBeforeChangeModeEnum mode) : DocumentSource(kStageName, expCtx), _fullDocumentBeforeChangeMode(mode) { // This stage should never be created with FullDocumentBeforeChangeMode::kOff. invariant(_fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kOff); diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index ba4655cc137..6f5e55cd47d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -37,7 +37,7 @@ namespace mongo { REGISTER_INTERNAL_DOCUMENT_SOURCE( _internalChangeStreamOplogMatch, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceOplogMatch::createFromBson, + DocumentSourceChangeStreamOplogMatch::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); namespace { @@ -192,11 +192,12 @@ BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -boost::intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents) { +boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> +DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx, + bool showMigrationEvents) { // TODO SERVER-56669: ensure that 'initialPostBatchResumeToken' is always populated at this // point. - return make_intrusive<DocumentSourceOplogMatch>( + return make_intrusive<DocumentSourceChangeStreamOplogMatch>( buildMatchFilter( expCtx, ResumeToken::parse(expCtx->initialPostBatchResumeToken).getData().clusterTime, @@ -204,23 +205,24 @@ boost::intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create( expCtx); } -boost::intrusive_ptr<DocumentSource> DocumentSourceOplogMatch::createFromBson( +boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(5467600, "the match filter must be an expression in an object", elem.type() == BSONType::Object); auto parsedSpec = DocumentSourceChangeStreamOplogMatchSpec::parse( IDLParserErrorContext("DocumentSourceChangeStreamOplogMatchSpec"), elem.Obj()); - return make_intrusive<DocumentSourceOplogMatch>(parsedSpec.getFilter(), pExpCtx); + return make_intrusive<DocumentSourceChangeStreamOplogMatch>(parsedSpec.getFilter(), pExpCtx); } -const char* DocumentSourceOplogMatch::getSourceName() const { +const char* DocumentSourceChangeStreamOplogMatch::getSourceName() const { // This is used in error reporting, particularly if we find this stage in a position other // than first, so report the name as $changeStream. return DocumentSourceChangeStream::kStageName.rawData(); } -StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipeState) const { +StageConstraints DocumentSourceChangeStreamOplogMatch::constraints( + Pipeline::SplitState pipeState) const { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, @@ -235,7 +237,7 @@ StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipe return constraints; } -Value DocumentSourceOplogMatch::serializeLatest( +Value DocumentSourceChangeStreamOplogMatch::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value( @@ -244,7 +246,7 @@ Value DocumentSourceOplogMatch::serializeLatest( } DocumentSourceChangeStreamOplogMatchSpec spec(_predicate); - return Value(Document{{DocumentSourceOplogMatch::kStageName, spec.toBSON()}}); + return Value(Document{{DocumentSourceChangeStreamOplogMatch::kStageName, spec.toBSON()}}); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h index 55641a8429b..06d6602ca49 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h @@ -36,18 +36,20 @@ namespace mongo { * A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied * on the oplog. The stage requires itself to be the first stage in the pipeline. */ -class DocumentSourceOplogMatch final : public DocumentSourceMatch, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch, + public ChangeStreamStageSerializationInterface { public: static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd; - DocumentSourceOplogMatch(BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx) + DocumentSourceChangeStreamOplogMatch(BSONObj filter, + const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSourceMatch(std::move(filter), expCtx) { // A change stream pipeline should always create a tailable + awaitData cursor. expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; } - DocumentSourceOplogMatch(const DocumentSourceOplogMatch& other) : DocumentSourceMatch(other) {} + DocumentSourceChangeStreamOplogMatch(const DocumentSourceChangeStreamOplogMatch& other) + : DocumentSourceMatch(other) {} virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const { return make_intrusive<std::decay_t<decltype(*this)>>(*this); @@ -55,7 +57,7 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - static boost::intrusive_ptr<DocumentSourceOplogMatch> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents); const char* getSourceName() const final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index aa91f342bb5..df6fbb1e5d6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -44,15 +44,15 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" -#include "mongo/db/pipeline/document_source_check_invalidate.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" -#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -252,12 +252,12 @@ public: getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(std::vector<FieldPath>{}); - // This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from - // executing as a safety mechanism, since it needs to use the collection-default collation, - // even if the rest of the pipeline is using some other collation. To avoid ever executing - // that stage here, we'll up-convert it from the non-executable DocumentSourceOplogMatch to - // a fully-executable DocumentSourceMatch. This is safe because all of the unit tests will - // use the 'simple' collation. + // This match stage is a DocumentSourceChangeStreamOplogMatch, which we explicitly disallow + // from executing as a safety mechanism, since it needs to use the collection-default + // collation, even if the rest of the pipeline is using some other collation. To avoid ever + // executing that stage here, we'll up-convert it from the non-executable + // DocumentSourceChangeStreamOplogMatch to a fully-executable DocumentSourceMatch. This is + // safe because all of the unit tests will use the 'simple' collation. auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get()); ASSERT(match); auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx()); @@ -275,7 +275,7 @@ public: // Remove the DSEnsureResumeTokenPresent stage since it will swallow the result. auto newEnd = std::remove_if(stages.begin(), stages.end(), [](auto& stage) { - return dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage.get()); + return dynamic_cast<DocumentSourceChangeStreamEnsureResumeTokenPresent*>(stage.get()); }); stages.erase(newEnd, stages.end()); @@ -2176,7 +2176,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSeriali spec.setFilter(dummyFilter); auto stageSpecAsBSON = BSON("" << spec.toBSON()); - validateDocumentSourceStageSerialization<DocumentSourceOplogMatch>( + validateDocumentSourceStageSerialization<DocumentSourceChangeStreamOplogMatch>( std::move(spec), stageSpecAsBSON, expCtx); } @@ -2199,7 +2199,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSe kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate))); auto stageSpecAsBSON = BSON("" << spec.toBSON()); - validateDocumentSourceStageSerialization<DocumentSourceCheckInvalidate>( + validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckInvalidate>( std::move(spec), stageSpecAsBSON, expCtx); } @@ -2210,7 +2210,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSeria spec.setResumeToken(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid()))); auto stageSpecAsBSON = BSON("" << spec.toBSON()); - validateDocumentSourceStageSerialization<DocumentSourceCheckResumability>( + validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckResumability>( std::move(spec), stageSpecAsBSON, expCtx); } @@ -2220,7 +2220,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageSt DocumentSourceChangeStreamLookUpPreImageSpec spec(FullDocumentBeforeChangeModeEnum::kRequired); auto stageSpecAsBSON = BSON("" << spec.toBSON()); - validateDocumentSourceStageSerialization<DocumentSourceLookupChangePreImage>( + validateDocumentSourceStageSerialization<DocumentSourceChangeStreamLookupPreImage>( std::move(spec), stageSpecAsBSON, expCtx); } @@ -2230,7 +2230,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePostImageS DocumentSourceChangeStreamLookUpPostImageSpec spec(FullDocumentModeEnum::kUpdateLookup); auto stageSpecAsBSON = BSON("" << spec.toBSON()); - validateDocumentSourceStageSerialization<DocumentSourceLookupChangePostImage>( + validateDocumentSourceStageSerialization<DocumentSourceChangeStreamLookupPostImage>( std::move(spec), stageSpecAsBSON, expCtx); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 4d5610878f0..0f62b87ec4f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -36,9 +36,9 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/pipeline/change_stream_document_diff_parser.h" #include "mongo/db/pipeline/document_path_support.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp index 35dff608271..48a3e7db187 100644 --- a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,7 +27,7 @@ * it in the license file. */ -#include "mongo/db/pipeline/document_source_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include <algorithm> @@ -77,16 +77,17 @@ bool isShardConfigEvent(const Document& eventDoc) { } } // namespace -boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create( +boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> +DocumentSourceChangeStreamUpdateOnAddShard::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceUpdateOnAddShard(expCtx); + return new DocumentSourceChangeStreamUpdateOnAddShard(expCtx); } -DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( +DocumentSourceChangeStreamUpdateOnAddShard::DocumentSourceChangeStreamUpdateOnAddShard( const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} -DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetNext() { // For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be // populated. We also resolve the original aggregation command from the expression context. if (!_mergeCursors) { @@ -112,11 +113,13 @@ DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { return childResult; } -void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) { +void DocumentSourceChangeStreamUpdateOnAddShard::addNewShardCursors( + const Document& newShardDetectedObj) { _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); } -std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( +std::vector<RemoteCursor> +DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards( const Document& newShardDetectedObj) { // Reload the shard registry to see the new shard. auto* opCtx = pExpCtx->opCtx; @@ -143,7 +146,8 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO allowPartialResults); } -BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestamp shardAddedTime) { +BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewShard( + Timestamp shardAddedTime) { // We must start the new cursor from the moment at which the shard became visible. const auto newShardAddedTime = LogicalTime{shardAddedTime}; auto resumeTokenForNewShard = @@ -174,7 +178,8 @@ BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestam true /* needsMerge */); } -BSONObj DocumentSourceUpdateOnAddShard::replaceResumeTokenInCommand(Document resumeToken) { +BSONObj DocumentSourceChangeStreamUpdateOnAddShard::replaceResumeTokenInCommand( + Document resumeToken) { Document originalCmd(_originalAggregateCommand); auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray(); diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h index 82e981572c1..4270ff8b1ec 100644 --- a/src/mongo/db/pipeline/document_source_update_on_add_shard.h +++ b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -45,7 +45,7 @@ namespace mongo { * the first time. When this event is detected, this stage will establish a new cursor on that * shard and add it to the cursors being merged. */ -class DocumentSourceUpdateOnAddShard final : public DocumentSource { +class DocumentSourceChangeStreamUpdateOnAddShard final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd; @@ -53,7 +53,7 @@ public: * Creates a new stage which will establish a new cursor and add it to the cursors being merged * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. */ - static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> create( const boost::intrusive_ptr<ExpressionContext>&); Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { @@ -77,7 +77,7 @@ public: } private: - DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&); + DocumentSourceChangeStreamUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&); GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index e8fde0f3d7c..f81f1880616 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -40,8 +40,8 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" -#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" @@ -312,10 +312,10 @@ protected: /** * Convenience method to create the class under test with a given ResumeTokenData. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( - ResumeTokenData tokenData) { + intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> + createDSEnsureResumeTokenPresent(ResumeTokenData tokenData) { auto checkResumeToken = - DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData); + DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), tokenData); _mock->setResumeToken(std::move(tokenData)); checkResumeToken->setSource(_mock.get()); return checkResumeToken; @@ -325,12 +325,12 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( - Timestamp ts, - int version, - std::size_t txnOpIndex, - boost::optional<Document> docKey, - UUID uuid) { + intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> + createDSEnsureResumeTokenPresent(Timestamp ts, + int version, + std::size_t txnOpIndex, + boost::optional<Document> docKey, + UUID uuid) { return createDSEnsureResumeTokenPresent( {ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()}); } @@ -339,8 +339,10 @@ protected: * Convenience method to create the class under test with a given timestamp, docKey, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( - Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { + intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> + createDSEnsureResumeTokenPresent(Timestamp ts, + boost::optional<Document> docKey, + UUID uuid = testUuid()) { return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid); } @@ -348,8 +350,8 @@ protected: * Convenience method to create the class under test with a given timestamp, _id string, and * namespace. */ - intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent( - Timestamp ts, StringData id, UUID uuid = testUuid()) { + intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> + createDSEnsureResumeTokenPresent(Timestamp ts, StringData id, UUID uuid = testUuid()) { return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid); } @@ -367,14 +369,16 @@ protected: class CheckResumabilityTest : public CheckResumeTokenTest { protected: - intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability( + intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability( ResumeTokenData tokenData) { - auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData); + auto dsCheckResumability = + DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), tokenData); _mock->setResumeToken(std::move(tokenData)); dsCheckResumability->setSource(_mock.get()); return dsCheckResumability; } - intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) { + intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability( + Timestamp ts) { return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); } }; @@ -493,8 +497,8 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) { // Verify that a resume token whose documentKey only contains _id can be used to resume a stream // on a sharded collection as long as its _id matches the first document. We set 'inMongos' - // since this behaviour is only applicable when DocumentSourceEnsureResumeTokenPresent is - // running on mongoS. + // since this behaviour is only applicable when + // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -533,8 +537,8 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumentKeyFields) { // Verify that the relaxed _id check only applies if _id is the sole field present in the // client's resume token, even if all the fields that are present match the first doc. We set - // 'inMongos' since this is only applicable when DocumentSourceEnsureResumeTokenPresent is - // running on mongoS. + // 'inMongos' since this is only applicable when + // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -976,7 +980,8 @@ TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event + // ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); auto dsCheckResumability = createDSCheckResumability(token); @@ -1005,7 +1010,8 @@ TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeT TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken. + // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event + // ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); auto dsCheckResumability = createDSCheckResumability(token); diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 2f6e05a0fc8..d7aa35e1c66 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -40,12 +40,12 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_facet.h" #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_internal_split_pipeline.h" #include "mongo/db/pipeline/document_source_lookup.h" -#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" -#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_out.h" @@ -2061,7 +2061,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. - ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>(stages.back().get())); auto matchPredicate = BSON("extra" << "predicate"); @@ -2070,7 +2070,8 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { pipeline->optimizePipeline(); // Make sure the $match stage has swapped before the change look up. - ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(pipeline->getSources().back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>( + pipeline->getSources().back().get())); } TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage) { @@ -2087,10 +2088,11 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. - ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>(stages.back().get())); stages.push_back(DocumentSourceMatch::create( - BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx)); + BSON(DocumentSourceChangeStreamLookupPostImage::kFullDocumentFieldName << BSONNULL), + expCtx)); auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); @@ -2112,7 +2114,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. - ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>(stages.back().get())); auto matchPredicate = BSON("extra" << "predicate"); @@ -2121,7 +2123,8 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen pipeline->optimizePipeline(); // Make sure the $match stage has swapped before the change look up. - ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(pipeline->getSources().back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>( + pipeline->getSources().back().get())); } TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPreImage) { @@ -2138,10 +2141,11 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. - ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>(stages.back().get())); stages.push_back(DocumentSourceMatch::create( - BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL), + BSON(DocumentSourceChangeStreamLookupPreImage::kFullDocumentBeforeChangeFieldName + << BSONNULL), expCtx)); auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 7d405768fc2..8f495bcb756 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" @@ -47,7 +48,6 @@ #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" -#include "mongo/db/pipeline/document_source_update_on_add_shard.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/db/vector_clock.h" |