diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-10-11 14:23:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-15 22:08:36 +0000 |
commit | 37104ac0e21fdffd29de16be74f4a35d4e1c865c (patch) | |
tree | 8fab578349c6581c4ad3ff6e6d8f1c82235dc96c /src/mongo/db/pipeline | |
parent | dad32a29132b9427a9d742f4f4d2ecf3bc3d830f (diff) | |
download | mongo-37104ac0e21fdffd29de16be74f4a35d4e1c865c.tar.gz |
SERVER-55659 Remove feature flag for Allow $changeStream to participate
in optimization.
Diffstat (limited to 'src/mongo/db/pipeline')
28 files changed, 147 insertions, 597 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index ad8d1b310dc..7bf050b06ab 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -382,7 +382,6 @@ env.Library( 'document_source_change_stream_check_invalidate.cpp', 'document_source_change_stream_check_resumability.cpp', 'document_source_change_stream_check_topology_change.cpp', - 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_ensure_resume_token_present.cpp', 'document_source_change_stream_handle_topology_change.cpp', 'document_source_change_stream_add_pre_image.cpp', diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 75cd1ec29d8..861f86f0992 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -36,7 +36,6 @@ #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" -#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" #include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" @@ -49,6 +48,12 @@ namespace change_stream_legacy { std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) { + // The only case where we expect to build a legacy pipeline is if we are a shard which has + // received a $changeStream request from an older mongoS. + tassert(5565900, + "Unexpected {needsMerge:false} request for a legacy change stream pipeline", + expCtx->needsMerge); + std::list<boost::intrusive_ptr<DocumentSource>> stages; const auto userRequestedResumePoint = @@ -65,6 +70,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec)); stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec)); + tassert(5467606, "'DocumentSourceChangeStreamTransform' stage should populate " "'initialPostBatchResumeToken' field", @@ -74,50 +80,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // whether the event that matches the resume token should be followed by an "invalidate" event. stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec)); - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); - - // If the user-requested resume point is a high water mark, or if we are running on the shards - // in a cluster, we must include a DSCSCheckResumability stage. - if (expCtx->needsMerge || - (userRequestedResumePoint && ResumeToken::isHighWaterMarkToken(resumeToken))) { - stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); - } - - // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as - // the split point for the pipline. All stages before this stages will run on shards and all - // stages after and inclusive of this stage will run on the MongoS. - if (expCtx->inMongos) { - stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx)); - } - - // If the resume token is from an event, we must include a DSCSEnsureResumeTokenPresent stage. - // In a cluster, this will be on mongoS and should not be generated on the shards. - if (!expCtx->needsMerge && !ResumeToken::isHighWaterMarkToken(resumeToken)) { - stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec)); - } - - if (!expCtx->needsMerge) { - // There should only be one close cursor stage. If we're on the shards and producing input - // to be merged, do not add a close cursor stage, since the mongos will already have one. - stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx)); - - // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here - // so that any $match stages which follow the $changeStream pipeline prefix may be able to - // skip ahead of the DSCSAddPreImage stage. This allows a whole-db or whole-cluster stream - // to run on an instance where only some collections have pre-images enabled, so long as the - // user filters for only those namespaces. - // TODO SERVER-36941: figure out how to get this to work in a sharded cluster. - if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) { - invariant(!expCtx->inMongos); - stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec)); - } - - // There should be only one post-image lookup stage. If we're on the shards and producing - // input to be merged, the lookup is done on the mongos. - if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) { - stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec)); - } - } + // We must always check that the shard is capable of resuming from the specified point. + stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); return stages; } @@ -157,71 +121,6 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo return Document{opLogEntry.getObject().getOwned()}; } -} // namespace change_stream_legacy - -Value DocumentSourceChangeStreamOplogMatch::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - return explain ? Value(Document{{"$_internalOplogMatch"_sd, Document{}}}) : Value(); -} - -Value DocumentSourceChangeStreamTransform::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - tassert( - 5467607, - str::stream() << "At least one of 'resumeAfter', 'startAfter' or 'startAtOperationTime' " - "fields should be present to serialize " - << DocumentSourceChangeStreamTransform::kStageName << " stage", - _changeStreamSpec.getResumeAfter() || _changeStreamSpec.getStartAtOperationTime() || - _changeStreamSpec.getStartAfter()); - - return Value(Document{{DocumentSourceChangeStream::kStageName, _changeStreamSpec.toBSON()}}); -} - -Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - // We only serialize this stage in the context of explain. - return explain ? Value(DOC(kStageName << Document())) : Value(); -} - -Value DocumentSourceChangeStreamCheckResumability::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - // We only serialize this stage in the context of explain. - return explain ? Value(DOC(getSourceName() - << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument()))) - : Value(); -} - -Value DocumentSourceChangeStreamUnwindTransaction::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - if (explain) { - return Value( - Document{{kStageName, - Value(Document{ - {"nsRegex", - DocumentSourceChangeStream::getNsRegexForChangeStream(pExpCtx->ns)}})}}); - } - - return Value(); -} - -Value DocumentSourceChangeStreamAddPreImage::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - return (explain ? Value{Document{{kStageName, Document()}}} : Value()); -} - -Value DocumentSourceChangeStreamAddPostImage::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - return (explain ? Value{Document{{kStageName, Document()}}} : Value()); -} - -Value DocumentSourceChangeStreamCheckTopologyChange::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - return (explain ? Value{Document{{kStageName, Document()}}} : Value()); -} - -Value DocumentSourceChangeStreamHandleTopologyChange::serializeLegacy( - boost::optional<ExplainOptions::Verbosity> explain) const { - return (explain ? Value(Document{{kStageName, Value()}}) : Value()); -} +} // namespace change_stream_legacy } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 2e6fd767e62..11b551ac755 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -43,7 +43,6 @@ #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" -#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" #include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" @@ -233,8 +232,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to // us from an old mongoS. Build a legacy shard pipeline. - if (expCtx->needsMerge || - !feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + if (expCtx->needsMerge) { return change_stream_legacy::buildPipeline(expCtx, spec); } return _buildPipeline(expCtx, spec); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index b58c31108d6..4cc02667ab2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -258,24 +258,4 @@ public: } }; -/** - * Class interface to keep track of the change streams internal stage serialization formats across - * versions or features. - * - * TODO SERVER-55659: remove this serializer class and make each stage serialize only the "latest" - * format. - */ -class ChangeStreamStageSerializationInterface { -public: - Value serializeToValue(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const { - return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() - ? serializeLatest(explain) - : serializeLegacy(explain); - } - -protected: - virtual Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const = 0; - virtual Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const = 0; -}; - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp index fa0cf402095..051922556a5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp @@ -43,11 +43,10 @@ constexpr StringData DocumentSourceChangeStreamAddPostImage::kStageName; constexpr StringData DocumentSourceChangeStreamAddPostImage::kFullDocumentFieldName; namespace { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamAddPostImage, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamAddPostImage::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPostImage, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamAddPostImage::createFromBson, + true); Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType expectedType) { @@ -217,7 +216,7 @@ boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPo pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern)); } -Value DocumentSourceChangeStreamAddPostImage::serializeLatest( +Value DocumentSourceChangeStreamAddPostImage::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(Document{ diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h index 2732639eb9d..d92994064e2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h @@ -38,9 +38,7 @@ namespace mongo { * Part of the change stream API machinery used to look up the post-image of a document. Uses the * "documentKey" field of the input to look up the new version of the document. */ -class DocumentSourceChangeStreamAddPostImage final - : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamAddPostImage final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamAddPostImage"_sd; static constexpr StringData kFullDocumentFieldName = @@ -77,15 +75,9 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const final { invariant(pipeState != Pipeline::SplitState::kSplitForShards); - // TODO SERVER-55659: remove the feature flag. - HostTypeRequirement hostTypeRequirement = - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() - ? HostTypeRequirement::kAnyShard - : HostTypeRequirement::kLocalOnly; - StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - hostTypeRequirement, + HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, @@ -122,9 +114,7 @@ public: return DepsTracker::State::SEE_NEXT; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; const char* getSourceName() const final { return kStageName.rawData(); @@ -158,9 +148,6 @@ private: */ NamespaceString assertValidNamespace(const Document& inputDoc) const; - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - // Determines whether post-images are strictly required or may be included only when available, // and whether to return a point-in-time post-image or the most current majority-committed // version of the updated document. diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp index 329c497a5dd..25ca17bbbfa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp @@ -106,8 +106,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForE Document{{"stage"_sd, DocumentSourceChangeStreamAddPostImage::kStageName}, {"fullDocument"_sd, "updateLookup"_sd}}}}); - ASSERT_VALUE_EQ(stage->serializeToValue({ExplainOptions::Verbosity::kQueryPlanner}), - expectedOutput); + ASSERT_VALUE_EQ(stage->serialize({ExplainOptions::Verbosity::kQueryPlanner}), expectedOutput); } TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForDispatch) { @@ -117,7 +116,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForD const auto expectedOutput = Value(Document{{DocumentSourceChangeStreamAddPostImage::kStageName, Document{{"fullDocument"_sd, "updateLookup"_sd}}}}); - ASSERT_VALUE_EQ(stage->serializeToValue(), expectedOutput); + ASSERT_VALUE_EQ(stage->serialize(), expectedOutput); } TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp index 40ac5ef1dab..dad3a18b306 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp @@ -43,11 +43,10 @@ namespace mongo { namespace { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamAddPreImage, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamAddPreImage::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPreImage, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamAddPreImage::createFromBson, + true); } // namespace constexpr StringData DocumentSourceChangeStreamAddPreImage::kStageName; @@ -165,7 +164,7 @@ boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage( return lookedUpDoc->getField(ChangeStreamPreImage::kPreImageFieldName).getDocument().getOwned(); } -Value DocumentSourceChangeStreamAddPreImage::serializeLatest( +Value DocumentSourceChangeStreamAddPreImage::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(Document{ diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h index 7c0153d3519..8e9c764fe19 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h @@ -41,8 +41,7 @@ namespace mongo { * its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the * pre-image. This stage replaces that field with the actual pre-image document. */ -class DocumentSourceChangeStreamAddPreImage final : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamAddPreImage final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamAddPreImage"_sd; static constexpr StringData kFullDocumentBeforeChangeFieldName = @@ -109,9 +108,7 @@ public: return DepsTracker::State::SEE_NEXT; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; const char* getSourceName() const final { return kStageName.rawData(); @@ -123,9 +120,6 @@ private: */ GetNextResult doGetNext() final; - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - // Determines whether pre-images are strictly required or may be included only when available. FullDocumentBeforeChangeModeEnum _fullDocumentBeforeChangeMode = FullDocumentBeforeChangeModeEnum::kOff; diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index e386cc73bb2..84b1f023d1e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -41,11 +41,10 @@ namespace mongo { using DSCS = DocumentSourceChangeStream; -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamCheckInvalidate, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamCheckInvalidate::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckInvalidate, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamCheckInvalidate::createFromBson, + true); namespace { @@ -103,8 +102,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNe return res; } - if (_queuedException && - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + if (_queuedException) { uasserted(static_cast<ChangeStreamInvalidationInfo>(*_queuedException), "Change stream invalidated"); } @@ -179,7 +177,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNe return nextInput; } -Value DocumentSourceChangeStreamCheckInvalidate::serializeLatest( +Value DocumentSourceChangeStreamCheckInvalidate::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(Document{{DocumentSourceChangeStream::kStageName, diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h index 30dc1b6eeda..166604c1d10 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h @@ -39,9 +39,7 @@ namespace mongo { * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop * for a single-collection change stream). It is not intended to be created by the user. */ -class DocumentSourceChangeStreamCheckInvalidate final - : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamCheckInvalidate final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamCheckInvalidate"_sd; @@ -66,9 +64,7 @@ public: return boost::none; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -91,9 +87,6 @@ private: GetNextResult doGetNext() final; - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - boost::optional<ResumeTokenData> _startAfterInvalidate; boost::optional<Document> _queuedInvalidate; boost::optional<ChangeStreamInvalidationInfo> _queuedException; diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index 4c5ef221616..b68edc81899 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -38,11 +38,10 @@ using boost::intrusive_ptr; namespace mongo { namespace { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamCheckResumability, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamCheckResumability::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamCheckResumability::createFromBson, + true); } // namespace @@ -246,7 +245,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGet MONGO_UNREACHABLE; } -Value DocumentSourceChangeStreamCheckResumability::serializeLatest( +Value DocumentSourceChangeStreamCheckResumability::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { return explain ? Value(DOC(DocumentSourceChangeStream::kStageName @@ -258,4 +257,5 @@ Value DocumentSourceChangeStreamCheckResumability::serializeLatest( DocumentSourceChangeStreamCheckResumabilitySpec(ResumeToken(_tokenFromClient)) .toBSON()}}); } + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h index 5b2caea6b26..a290d59cacd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h @@ -59,8 +59,7 @@ namespace mongo { * - Otherwise we cannot resume, as we do not know if there were any events between the resume token * and the first matching document in the oplog. */ -class DocumentSourceChangeStreamCheckResumability : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamCheckResumability : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamCheckResumability"_sd; @@ -90,9 +89,7 @@ public: return boost::none; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override; static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -117,9 +114,5 @@ protected: ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc; const ResumeTokenData _tokenFromClient; - -private: - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp index d3d62ac864a..26cdcb77181 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp @@ -35,11 +35,10 @@ namespace mongo { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamCheckTopologyChange, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamCheckTopologyChange::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckTopologyChange, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamCheckTopologyChange::createFromBson, + true); StageConstraints DocumentSourceChangeStreamCheckTopologyChange::constraints( Pipeline::SplitState pipeState) const { @@ -87,7 +86,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doG return nextInput; } -Value DocumentSourceChangeStreamCheckTopologyChange::serializeLatest( +Value DocumentSourceChangeStreamCheckTopologyChange::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(DOC(DocumentSourceChangeStream::kStageName diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h index 1808cbe7995..7629e5d8063 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h @@ -45,9 +45,7 @@ namespace mongo { * that previously may not have held any data for the collection being watched, and they contain the * information necessary for the mongoS to include the new shard in the merged change stream. */ -class DocumentSourceChangeStreamCheckTopologyChange final - : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamCheckTopologyChange final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamCheckTopologyChange"_sd; @@ -69,9 +67,7 @@ public: return boost::none; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; private: DocumentSourceChangeStreamCheckTopologyChange( @@ -79,10 +75,6 @@ private: : DocumentSource(kStageName, expCtx) {} GetNextResult doGetNext() final; - - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp deleted file mode 100644 index a7db8e647a0..00000000000 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" - -namespace mongo { - -namespace { - -// Returns true if the given 'operationType' should invalidate the change stream based on the -// namespace in 'pExpCtx'. -bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - StringData operationType) { - if (pExpCtx->isSingleNamespaceAggregation()) { - return operationType == DocumentSourceChangeStream::kDropCollectionOpType || - operationType == DocumentSourceChangeStream::kRenameCollectionOpType || - operationType == DocumentSourceChangeStream::kDropDatabaseOpType; - } else if (!pExpCtx->isClusterAggregation()) { - return operationType == DocumentSourceChangeStream::kDropDatabaseOpType; - } else { - return false; - } -}; - -} // namespace - -DocumentSource::GetNextResult DocumentSourceChangeStreamCloseCursor::doGetNext() { - // Close cursor if we have returned an invalidate entry. - if (_shouldCloseCursor) { - uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); - } - - auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) - return nextInput; - - auto doc = nextInput.getDocument(); - const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField; - DocumentSourceChangeStream::checkValueType( - doc[kOperationTypeField], kOperationTypeField, BSONType::String); - auto operationType = doc[kOperationTypeField].getString(); - if (operationType == DocumentSourceChangeStream::kInvalidateOpType) { - // Pass the invalidation forward, so that it can be included in the results, or - // filtered/transformed by further stages in the pipeline, then throw an exception - // to close the cursor on the next call to getNext(). - _shouldCloseCursor = true; - } - - return nextInput; -} - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h deleted file mode 100644 index 16a1e13f2d4..00000000000 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/change_stream_constants.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_sort.h" - -namespace mongo { - -/** - * This stage is used internally for change notifications to close cursor after returning - * "invalidate" entries. - * It is not intended to be created by the user. - */ -class DocumentSourceChangeStreamCloseCursor final : public DocumentSource { -public: - static constexpr StringData kStageName = "$changeStream"_sd; - - const char* getSourceName() const final { - // This is used in error reporting. - return DocumentSourceChangeStreamCloseCursor::kStageName.rawData(); - } - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - // This stage should never be in the shards part of a split pipeline. - invariant(pipeState != Pipeline::SplitState::kSplitForShards); - return {StreamType::kStreaming, - PositionRequirement::kNone, - // If this is parsed on mongos it should stay on mongos. If we're not in a sharded - // cluster then it's okay to run on mongod. - HostTypeRequirement::kLocalOnly, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed, - UnionRequirement::kNotAllowed, - ChangeStreamRequirement::kChangeStreamStage}; - } - - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - // We only serialize this stage in the context of explain. - return explain ? Value(DOC(kStageName << Document())) : Value(); - } - - static boost::intrusive_ptr<DocumentSourceChangeStreamCloseCursor> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceChangeStreamCloseCursor(expCtx); - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - return boost::none; - } - -private: - /** - * Use the create static method to create a DocumentSourceChangeStreamCloseCursor. - */ - DocumentSourceChangeStreamCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(kStageName, expCtx) {} - - GetNextResult doGetNext() final; - - bool _shouldCloseCursor = false; -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index 4850f0e7290..d100b6ace48 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -76,10 +76,8 @@ StageConstraints DocumentSourceChangeStreamEnsureResumeTokenPresent::constraints // pipelines, swaps can allow $match and 'DocumentSourceSingleDocumentTransformation' stages to // execute on the shards, providing inter-node parallelism and potentially reducing the amount // of data sent form each shard to the mongoS. - if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { - constraints.canSwapWithMatch = true; - constraints.canSwapWithSingleDocTransform = true; - } + constraints.canSwapWithMatch = true; + constraints.canSwapWithSingleDocTransform = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 394d4bd0d13..8b3463f015b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -105,10 +105,8 @@ StageConstraints DocumentSourceChangeStreamHandleTopologyChange::constraints( // Can be swapped with the '$match' and 'DocumentSourceSingleDocumentTransformation' stages and // ensures that they get pushed down to the shards, as this stage bisects the change streams // pipeline. - if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { - constraints.canSwapWithMatch = true; - constraints.canSwapWithSingleDocTransform = true; - } + constraints.canSwapWithMatch = true; + constraints.canSwapWithSingleDocTransform = true; return constraints; } @@ -229,7 +227,7 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::replaceResumeTokenInComm return newCmd.freeze().toBson(); } -Value DocumentSourceChangeStreamHandleTopologyChange::serializeLatest( +Value DocumentSourceChangeStreamHandleTopologyChange::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(DOC(DocumentSourceChangeStream::kStageName diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h index b50ae1000b6..c5d5a16fd93 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h @@ -46,9 +46,7 @@ namespace mongo { * the first time. When this event is detected, this stage will establish a new cursor on that * shard and add it to the cursors being merged. */ -class DocumentSourceChangeStreamHandleTopologyChange final - : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamHandleTopologyChange final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamHandleTopologyChange"_sd; @@ -63,9 +61,7 @@ public: return kStageName.rawData(); } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; StageConstraints constraints(Pipeline::SplitState) const final; @@ -83,10 +79,6 @@ private: GetNextResult doGetNext() final; - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - /** * Establish the new cursors and tell the RouterStageMerge about them. */ diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index 8ac7ea35a09..6a634cc38a2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -36,11 +36,10 @@ namespace mongo { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamOplogMatch, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamOplogMatch::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamOplogMatch, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamOplogMatch::createFromBson, + true); namespace change_stream_filter { /** @@ -191,7 +190,7 @@ Pipeline::SourceContainer::iterator DocumentSourceChangeStreamOplogMatch::doOpti return nextChangeStreamStageItr; } -Value DocumentSourceChangeStreamOplogMatch::serializeLatest( +Value DocumentSourceChangeStreamOplogMatch::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value( diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h index 6207d46e376..5622e483569 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h @@ -36,8 +36,7 @@ namespace mongo { * A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied * on the oplog. The stage requires itself to be the first stage in the pipeline. */ -class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch { public: static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd; @@ -81,9 +80,7 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; protected: Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, @@ -102,9 +99,6 @@ private: expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; } - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - // Needed for re-creating the filter during optimization. Note that we do not serialize these // fields. The filter in a serialized DocumentSourceOplogMatch is considered final, so there is // no need to re-create it. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index d1e76617542..f18b2622a5f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -236,12 +236,7 @@ public: ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate); // Then throw an exception on the next call of getNext(). - if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { - ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); - } else { - ASSERT_THROWS(lastStage->getNext(), - ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); - } + ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); } } @@ -440,58 +435,10 @@ public: }; checkTransformation(deltaOplog, expectedUpdateField); } -}; - -bool getCSOptimizationFeatureFlagValue() { - return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); -} - -bool getCSRewriteFeatureFlagValue() { - return feature_flags::gFeatureFlagChangeStreamsRewrite.isEnabledAndIgnoreFCV(); -} - -bool isChangeStreamPreAndPostImagesEnabled() { - return feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledAndIgnoreFCV(); -} - -/** - * Runs the tests with feature flag 'featureFlagChangeStreamsOptimization' true and false. - */ -class ChangeStreamStageWithDualFeatureFlagValueTest : public ChangeStreamStageTest { -public: - ChangeStreamStageWithDualFeatureFlagValueTest() : ChangeStreamStageTest() {} - - - void run() { - { - RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization", - true); - ASSERT(getCSOptimizationFeatureFlagValue()); - ChangeStreamStageTest::run(); - } - { - RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization", - false); - ASSERT_FALSE(getCSOptimizationFeatureFlagValue()); - ChangeStreamStageTest::run(); - } - } -}; - -class ChangeStreamPipelineOptimizationTest : public ChangeStreamStageTest { -public: - explicit ChangeStreamPipelineOptimizationTest() : ChangeStreamStageTest() {} - - void run() { - RAIIServerParameterControllerForTest controllerOptimization( - "featureFlagChangeStreamsOptimization", true); - ASSERT(getCSOptimizationFeatureFlagValue()); - RAIIServerParameterControllerForTest controllerRewrite("featureFlagChangeStreamsRewrite", - true); - ASSERT(getCSOptimizationFeatureFlagValue()); - ChangeStreamStageTest::run(); - } + /** + * Helper to create change stream pipeline for testing. + */ std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline( const std::vector<BSONObj>& rawPipeline) { auto expCtx = getExpCtx(); @@ -504,6 +451,9 @@ public: return pipeline; } + /** + * Helper to verify if the change stream pipeline contains expected stages. + */ void assertStagesNameOrder(std::unique_ptr<Pipeline, PipelineDeleter> pipeline, const std::vector<std::string> expectedStages) { ASSERT_EQ(pipeline->getSources().size(), expectedStages.size()); @@ -519,6 +469,14 @@ public: } }; +bool getCSRewriteFeatureFlagValue() { + return feature_flags::gFeatureFlagChangeStreamsRewrite.isEnabledAndIgnoreFCV(); +} + +bool isChangeStreamPreAndPostImagesEnabled() { + return feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledAndIgnoreFCV(); +} + TEST_F(ChangeStreamStageTest, ShouldRejectNonObjectArg) { auto expCtx = getExpCtx(); @@ -2186,12 +2144,8 @@ TEST_F(ChangeStreamStageTest, MatchFiltersNoOp) { checkTransformation(noOp, boost::none); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, - TransformationShouldBeAbleToReParseSerializedStage) { +TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage) { auto expCtx = getExpCtx(); - const auto featureFlag = getCSOptimizationFeatureFlagValue(); - const auto serializedStageName = - featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName; DocumentSourceChangeStreamSpec spec; spec.setStartAtOperationTime(kDefaultTs); @@ -2201,8 +2155,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - const size_t changeStreamStageSize = featureFlag ? 5 : 6; - ASSERT_EQ(allStages.size(), changeStreamStageSize); + ASSERT_EQ(allStages.size(), 5); auto stage = allStages[2]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); @@ -2215,8 +2168,9 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, ASSERT_EQ(serialization.size(), 1UL); ASSERT_EQ(serialization[0].getType(), BSONType::Object); auto serializedDoc = serialization[0].getDocument(); - ASSERT_BSONOBJ_EQ(serializedDoc[serializedStageName].getDocument().toBson(), - originalSpec[""].Obj()); + ASSERT_BSONOBJ_EQ( + serializedDoc[DocumentSourceChangeStreamTransform::kStageName].getDocument().toBson(), + originalSpec[""].Obj()); // // Create a new stage from the serialization. Serialize the new stage and confirm that it is @@ -2227,26 +2181,15 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx); auto newSerialization = roundTripped->serialize(); - // When optimiziation is enabled, we should serialize all the internal stages. - if (featureFlag) { - ASSERT_EQ(newSerialization.size(), 5UL); + ASSERT_EQ(newSerialization.size(), 5UL); - // DSCSTransform stage should be the third stage after DSCSOplogMatch and - // DSCSUnwindTransactions stages. - ASSERT_VALUE_EQ(newSerialization[2], serialization[0]); - } else { - ASSERT_EQ(newSerialization.size(), 1UL); - ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); - } + // DSCSTransform stage should be the third stage after DSCSOplogMatch and + // DSCSUnwindTransactions stages. + ASSERT_VALUE_EQ(newSerialization[2], serialization[0]); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, - DSCSTransformStageEmptySpecSerializeResumeAfter) { +TEST_F(ChangeStreamStageTest, DSCSTransformStageEmptySpecSerializeResumeAfter) { auto expCtx = getExpCtx(); - auto featureFlag = getCSOptimizationFeatureFlagValue(); - const auto serializedStageName = - featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName; - auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj()); // Verify that the 'initialPostBatchResumeToken' is populated while parsing. @@ -2271,18 +2214,13 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, ASSERT_EQ(serialization.size(), 1UL); ASSERT_EQ(serialization[0].getType(), BSONType::Object); ASSERT(!serialization[0] - .getDocument()[serializedStageName] - .getDocument()[featureFlag - ? DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName - : DocumentSourceChangeStreamSpec::kResumeAfterFieldName] + .getDocument()[DocumentSourceChangeStreamTransform::kStageName] + .getDocument()[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] .missing()); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSTransformStageWithResumeTokenSerialize) { +TEST_F(ChangeStreamStageTest, DSCSTransformStageWithResumeTokenSerialize) { auto expCtx = getExpCtx(); - const auto serializedStageName = getCSOptimizationFeatureFlagValue() - ? DocumentSourceChangeStreamTransform::kStageName - : DSChangeStream::kStageName; DocumentSourceChangeStreamSpec spec; spec.setResumeAfter(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid()))); @@ -2303,7 +2241,10 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSTransformStageWithResu stage->serializeToArray(serialization); ASSERT_EQ(serialization.size(), 1UL); ASSERT_EQ(serialization[0].getType(), BSONType::Object); - ASSERT_BSONOBJ_EQ(serialization[0].getDocument()[serializedStageName].getDocument().toBson(), + ASSERT_BSONOBJ_EQ(serialization[0] + .getDocument()[DocumentSourceChangeStreamTransform::kStageName] + .getDocument() + .toBson(), originalSpec[""].Obj()); } @@ -2313,17 +2254,14 @@ void validateDocumentSourceStageSerialization( auto stage = Stage::createFromBson(specAsBSON.firstElement(), expCtx); vector<Value> serialization; stage->serializeToArray(serialization); - if (getCSOptimizationFeatureFlagValue()) { - ASSERT_EQ(serialization.size(), 1UL); - ASSERT_EQ(serialization[0].getType(), BSONType::Object); - ASSERT_BSONOBJ_EQ(serialization[0].getDocument().toBson(), - BSON(Stage::kStageName << spec.toBSON())); - } else { - ASSERT(serialization.empty()); - } + + ASSERT_EQ(serialization.size(), 1UL); + ASSERT_EQ(serialization[0].getType(), BSONType::Object); + ASSERT_BSONOBJ_EQ(serialization[0].getDocument().toBson(), + BSON(Stage::kStageName << spec.toBSON())); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSOplogMatchStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamOplogMatchSpec spec; @@ -2335,7 +2273,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSeriali std::move(spec), stageSpecAsBSON, expCtx); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSUnwindTransactionStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSUnwindTransactionStageSerialization) { auto expCtx = getExpCtx(); auto filter = BSON("ns" << BSON("$regex" @@ -2347,7 +2285,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSUnwindTransactionStage std::move(spec), stageSpecAsBSON, expCtx); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSCheckInvalidateStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamCheckInvalidateSpec spec; @@ -2359,7 +2297,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSe std::move(spec), stageSpecAsBSON, expCtx); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSResumabilityStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamCheckResumabilitySpec spec; @@ -2370,7 +2308,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSeria std::move(spec), stageSpecAsBSON, expCtx); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSLookupChangePreImageStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamAddPreImageSpec spec(FullDocumentBeforeChangeModeEnum::kRequired); @@ -2380,7 +2318,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageSt std::move(spec), stageSpecAsBSON, expCtx); } -TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePostImageStageSerialization) { +TEST_F(ChangeStreamStageTest, DSCSLookupChangePostImageStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamAddPostImageSpec spec(FullDocumentModeEnum::kUpdateLookup); @@ -2417,11 +2355,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate); // Then throw an exception on the next call of getNext(). - if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { - ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); - } else { - ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); - } + ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); } TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) { @@ -2433,11 +2367,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) match->setSource(lastStage.get()); // Throw an exception on the call of getNext(). - if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { - ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); - } else { - ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); - } + ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); } TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { @@ -3458,7 +3388,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai BSON("$changeStream" << BSON("startAfter" << resumeToken))); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) { // // Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3480,7 +3410,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) { // // Tests that multiple '$match' gets merged and promoted before the // '$_internalUpdateOnAddShard'. @@ -3502,7 +3432,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) { // // Tests that multiple '$match' gets merged and promoted before the // '$_internalUpdateOnAddShard' if resume token if present. @@ -3530,7 +3460,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndRes "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) { // // Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3550,7 +3480,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) { // // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3572,7 +3502,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) { // // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if // resume token is present. @@ -3599,7 +3529,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndR "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) { // // Tests that a '$project' followed by a '$match' gets optimized and they get promoted before // the '$_internalUpdateOnAddShard'. @@ -3627,7 +3557,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResu "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) { // // Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as // '$project'. @@ -3648,7 +3578,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) { // // Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as // '$project'. @@ -3671,7 +3601,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) { // // Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project' // even if resume token is present. @@ -3697,7 +3627,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) { // // Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3717,7 +3647,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) { // // Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3739,7 +3669,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields) "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) { // // Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if // resume token is present. @@ -3764,7 +3694,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeT "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) { // // Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3784,7 +3714,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) { // // Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3806,7 +3736,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) { "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) { // // Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if // resume token is present. @@ -3831,7 +3761,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken) "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) { // // Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'. // @@ -3851,7 +3781,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot) "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) { // // Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if // resume token is present. @@ -3877,7 +3807,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResum "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) { // // Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as // '$replaceRoot'. @@ -3898,7 +3828,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith) "$_internalChangeStreamHandleTopologyChange"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) { // // Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if // resume token is present as '$replaceRoot'. @@ -3924,7 +3854,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResum "$_internalChangeStreamEnsureResumeTokenPresent"}); } -TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeToken) { +TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) { // // Tests that when all allowed stages are included along with the resume token, the final // pipeline gets optimized. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index e03c237c56e..0e43a47ef0d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -65,11 +65,10 @@ namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; } // namespace -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamTransform, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamTransform::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamTransform::createFromBson, + true); intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -444,7 +443,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document return doc.freeze(); } -Value DocumentSourceChangeStreamTransform::serializeLatest( +Value DocumentSourceChangeStreamTransform::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(Document{{DocumentSourceChangeStream::kStageName, diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 4ee2de16d1c..51d3fdc793c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -33,8 +33,7 @@ namespace mongo { -class DocumentSourceChangeStreamTransform : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamTransform : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd; @@ -54,9 +53,7 @@ public: DocumentSource::GetModPathsReturn getModifiedPaths() const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final; @@ -97,9 +94,6 @@ private: */ ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey, Value txnOpIndex); - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; - DocumentSourceChangeStreamSpec _changeStreamSpec; // Map of collection UUID to document key fields. diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 4ff928d5970..d7f646c8d10 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -39,11 +39,10 @@ namespace mongo { -REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamUnwindTransaction, - LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamUnwindTransaction::createFromBson, - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction, + LiteParsedDocumentSourceChangeStreamInternal::parse, + DocumentSourceChangeStreamUnwindTransaction::createFromBson, + true); boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction> DocumentSourceChangeStreamUnwindTransaction::create( @@ -88,7 +87,7 @@ StageConstraints DocumentSourceChangeStreamUnwindTransaction::constraints( ChangeStreamRequirement::kChangeStreamStage); } -Value DocumentSourceChangeStreamUnwindTransaction::serializeLatest( +Value DocumentSourceChangeStreamUnwindTransaction::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { tassert(5467604, "expression has not been initialized", _expression); diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h index ef48340d331..a2659178d6b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h @@ -41,8 +41,7 @@ namespace mongo { * output, but all other entries pass through unmodified. Note that the namespace filter applies * only to unwound transaction operations, not to any other entries. */ -class DocumentSourceChangeStreamUnwindTransaction : public DocumentSource, - public ChangeStreamStageSerializationInterface { +class DocumentSourceChangeStreamUnwindTransaction : public DocumentSource { public: static constexpr StringData kStageName = "$_internalChangeStreamUnwindTransaction"_sd; @@ -56,13 +55,7 @@ public: DocumentSource::GetModPathsReturn getModifiedPaths() const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - return ChangeStreamStageSerializationInterface::serializeToValue(explain); - } - - Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const; - Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const; - + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 08dac82eaed..45f4386637d 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -75,8 +75,8 @@ using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); -size_t getChangeStreamStageSize() { - return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 6 : 6); +constexpr size_t getChangeStreamStageSize() { + return 6; } void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) { |