diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2021-07-27 19:20:43 +0200 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-27 17:54:15 +0000 |
commit | 2d76701ca6266cf01bfc43e96c9a8ad912debb4f (patch) | |
tree | d28d3558b7b9dd57cc15a78cfc6c68262d4fa12b | |
parent | 0c1e060e7e87a32673e9a6b0311663aa4875544b (diff) | |
download | mongo-2d76701ca6266cf01bfc43e96c9a8ad912debb4f.tar.gz |
SERVER-58423 Parse and validate change stream open parameter "fullDocument"
12 files changed, 184 insertions, 43 deletions
diff --git a/jstests/change_streams/lookup_pit_post_image.js b/jstests/change_streams/lookup_pit_post_image.js new file mode 100644 index 00000000000..5bb2bc9ec14 --- /dev/null +++ b/jstests/change_streams/lookup_pit_post_image.js @@ -0,0 +1,26 @@ +// Tests the behaviour of $changeStream's 'fullDocument' option when retrieving point-in-time +// post-images. +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled. + +const testDB = db.getSiblingDB(jsTestName()); +const coll = assertDropAndRecreateCollection(testDB, "test"); + +if (!isChangeStreamPreAndPostImagesEnabled(db)) { + // If feature flag is off, creating changeStream with new fullDocument arguments should throw. + assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}), + ErrorCodes.BadValue); + assert.throwsWithCode(() => coll.watch([], {fullDocument: 'required'}), ErrorCodes.BadValue); + + jsTestLog( + 'Skipping test because featureFlagChangeStreamsPreAndPostImages feature flag is not enabled'); + return; +} + +// Open the change streams with new fullDocument parameters. +assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'})); +assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'})); +}()); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 8daa0e8be29..e4c220743e0 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -24,6 +24,17 @@ function isChangeStreamsOptimizationEnabled(db) { } /** + * Returns true if feature flag 'featureFlagChangeStreamsPreAndPostImages' is enabled, false + * otherwise. + */ +function isChangeStreamPreAndPostImagesEnabled(db) { + const getParam = + db.adminCommand({getParameter: 1, featureFlagChangeStreamsPreAndPostImages: 1}); + return getParam.hasOwnProperty("featureFlagChangeStreamsPreAndPostImages") && + getParam.featureFlagChangeStreamsPreAndPostImages.value; +} + +/** * Returns true if feature flag 'featureFlagChangeStreamsRewrite' is enabled, false otherwise. */ function isChangeStreamsRewriteEnabled(db) { diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 47439f941ec..ccdbf58aa3d 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -349,13 +349,13 @@ env.Library( 'change_stream_document_diff_parser.cpp', 'change_stream_helpers_legacy.cpp', 'document_source_change_stream.cpp', + 'document_source_change_stream_add_post_image.cpp', 'document_source_change_stream_check_invalidate.cpp', 'document_source_change_stream_check_resumability.cpp', 'document_source_change_stream_check_topology_change.cpp', 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_ensure_resume_token_present.cpp', 'document_source_change_stream_handle_topology_change.cpp', - 'document_source_change_stream_lookup_post_image.cpp', 'document_source_change_stream_lookup_pre_image.cpp', 'document_source_change_stream_oplog_match.cpp', 'document_source_change_stream_transform.cpp', @@ -433,7 +433,7 @@ env.CppUnitTest( 'document_source_internal_shard_filter_test.cpp', 'document_source_internal_split_pipeline_test.cpp', 'document_source_limit_test.cpp', - 'document_source_change_stream_lookup_post_image_test.cpp', + 'document_source_change_stream_add_post_image_test.cpp', 'document_source_lookup_test.cpp', 'document_source_match_test.cpp', 'document_source_merge_cursors_test.cpp', diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 797254ad6e0..554559cfe23 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -31,13 +31,13 @@ #include "mongo/db/pipeline/change_stream_helpers_legacy.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" #include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" @@ -112,10 +112,10 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec)); } - // There should be only one post-image lookup stage. If we're on the shards and producing + // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. - if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { - stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx)); + if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) { + stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec)); } } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 42ca7d5f39c..fc6986597b1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -38,13 +38,13 @@ #include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" #include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" @@ -261,9 +261,9 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec)); } - // If 'fullDocument' is set to "updateLookup", add the DSCSAddPostImage stage here. - if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { - stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx)); + // If 'fullDocument' is not set to "default", add the DSCSAddPostImage stage here. + if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) { + stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec)); } // If the pipeline is built on MongoS, then the DSCSHandleTopologyChange stage acts as the @@ -328,6 +328,18 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster", !(shouldAddPreImage && (expCtx->inMongos || expCtx->needsMerge))); + // TODO SERVER-58584: remove the feature flag. + if (!feature_flags::gFeatureFlagChangeStreamsPreAndPostImages.isEnabled( + serverGlobalParams.featureCompatibility)) { + uassert(ErrorCodes::BadValue, + str::stream() << "Specified value '" + << FullDocumentMode_serializer(spec.getFullDocument()) + << "' is not a valid option for the 'fullDocument' parameter of the " + "$changeStream stage", + spec.getFullDocument() == FullDocumentModeEnum::kDefault || + spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup); + } + uassert(31123, "Change streams from mongos may not show migration events", !(expCtx->inMongos && spec.getShowMigrationEvents())); diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index e8094f2915d..b59a555c4df 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -55,7 +55,7 @@ enums: values: # Do not supply pre-images. kOff: "off" - # Supply a pre-image if available. Otherwise, omit the output field. + # Supply a pre-image if available. Otherwise, return null. kWhenAvailable: "whenAvailable" # Pre-images are required.Throw an exception if not available. kRequired: "required" @@ -64,8 +64,14 @@ enums: description: Possible modes for the 'fullDocument' parameter of the $changeStream stage. type: string values: - kUpdateLookup: "updateLookup" + # Post-images will only be populated for insert and replace events. kDefault: "default" + # The current version of the document will be retrieved for update events. + kUpdateLookup: "updateLookup" + # Supply a post-image if available. Otherwise, return null. + kWhenAvailable: "whenAvailable" + # Post-images are required. Throw an exception if not available. + kRequired: "required" structs: ResumeTokenClusterTime: diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp index 3851bca53f4..c20ba0d03f9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/bson/simple_bsonelement_comparator.h" @@ -66,11 +66,7 @@ DocumentSourceChangeStreamAddPostImage::createFromBson( elem.type() == BSONType::Object); auto parsedSpec = DocumentSourceChangeStreamAddPostImageSpec::parse( IDLParserErrorContext("DocumentSourceChangeStreamAddPostImageSpec"), elem.Obj()); - uassert(5467609, - str::stream() << "the 'fullDocument' field can only be 'updateLookup'", - parsedSpec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup); - - return new DocumentSourceChangeStreamAddPostImage(expCtx); + return new DocumentSourceChangeStreamAddPostImage(expCtx, parsedSpec.getFullDocument()); } DocumentSource::GetNextResult DocumentSourceChangeStreamAddPostImage::doGetNext() { @@ -141,13 +137,12 @@ Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& up Value DocumentSourceChangeStreamAddPostImage::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { return explain - ? Value(Document{{DocumentSourceChangeStream::kStageName, - Document{{"stage"_sd, "internalAddPostImage"_sd}, - {"fullDocumentBeforeChange"_sd, "updateLookUp"_sd}}}}) - : Value(Document{ - {kStageName, - DocumentSourceChangeStreamAddPostImageSpec(FullDocumentModeEnum::kUpdateLookup) - .toBSON()}}); + ? Value(Document{ + {DocumentSourceChangeStream::kStageName, + Document{{"stage"_sd, kStageName}, + {kFullDocumentFieldName, FullDocumentMode_serializer(_fullDocumentMode)}}}}) + : Value(Document{{kStageName, + DocumentSourceChangeStreamAddPostImageSpec(_fullDocumentMode).toBSON()}}); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h index cb9833a35de..6f5addd5cc1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h @@ -50,8 +50,9 @@ public: * Creates a DocumentSourceChangeStreamAddPostImage stage. */ static boost::intrusive_ptr<DocumentSourceChangeStreamAddPostImage> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceChangeStreamAddPostImage(expCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + return new DocumentSourceChangeStreamAddPostImage(expCtx, spec.getFullDocument()); } static boost::intrusive_ptr<DocumentSourceChangeStreamAddPostImage> createFromBson( @@ -112,8 +113,13 @@ public: } private: - DocumentSourceChangeStreamAddPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(kStageName, expCtx) {} + DocumentSourceChangeStreamAddPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const FullDocumentModeEnum fullDocumentMode) + : DocumentSource(kStageName, expCtx), _fullDocumentMode(fullDocumentMode) { + tassert(5842300, + "the 'fullDocument' field cannot be 'default'", + _fullDocumentMode != FullDocumentModeEnum::kDefault); + } /** * Performs the lookup to retrieve the full document. @@ -135,6 +141,11 @@ private: Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final; Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final; + + // Determines whether post-images are strictly required or may be included only when available, + // and whether to return a point-in-time post-image or the most current majority-committed + // version of the updated document. + FullDocumentModeEnum _fullDocumentMode = FullDocumentModeEnum::kDefault; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp index 7c226271066..329c497a5dd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp @@ -41,10 +41,11 @@ #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" +#include "mongo/idl/server_parameter_test_util.h" namespace mongo { namespace { @@ -76,13 +77,54 @@ public: return ResumeToken(ResumeTokenData(ts, 0, 0, testUuid(), Value(Document{{"_id", id}}))) .toDocument(); } + + DocumentSourceChangeStreamSpec getSpec( + FullDocumentModeEnum documentMode = FullDocumentModeEnum::kUpdateLookup) { + auto spec = DocumentSourceChangeStreamSpec(); + spec.setFullDocument(documentMode); + return spec; + } }; +TEST_F(DocumentSourceChangeStreamAddPostImageTest, + CannotCreateStageFromBsonWithUnrecognizedFullDocumentMode) { + auto expCtx = getExpCtx(); + auto spec = BSON("$changeStream: " << BSON("fullDocument" + << "banana")); + ASSERT_THROWS_CODE( + DocumentSourceChangeStreamAddPostImage::createFromBson(spec.firstElement(), expCtx), + AssertionException, + ErrorCodes::BadValue); +} + +TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForExplain) { + auto expCtx = getExpCtx(); + const auto stage = DocumentSourceChangeStreamAddPostImage::create( + expCtx, getSpec(FullDocumentModeEnum::kUpdateLookup)); + const auto expectedOutput = + Value(Document{{DocumentSourceChangeStream::kStageName, + Document{{"stage"_sd, DocumentSourceChangeStreamAddPostImage::kStageName}, + {"fullDocument"_sd, "updateLookup"_sd}}}}); + + ASSERT_VALUE_EQ(stage->serializeToValue({ExplainOptions::Verbosity::kQueryPlanner}), + expectedOutput); +} + +TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForDispatch) { + auto expCtx = getExpCtx(); + const auto stage = DocumentSourceChangeStreamAddPostImage::create( + expCtx, getSpec(FullDocumentModeEnum::kUpdateLookup)); + const auto expectedOutput = Value(Document{{DocumentSourceChangeStreamAddPostImage::kStageName, + Document{{"fullDocument"_sd, "updateLookup"_sd}}}}); + + ASSERT_VALUE_EQ(stage->serializeToValue(), expectedOutput); +} + TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "documentKey" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -105,7 +147,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingOperation auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -128,7 +170,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingNamespace auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -152,7 +194,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfNsFieldHasWrongT auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "ns" field. auto mockLocalSource = @@ -175,7 +217,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfNsFieldDoesNotMa auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -201,7 +243,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -226,7 +268,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPassIfDatabaseMatchesOn expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}}; @@ -256,7 +298,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfDocumentKeyIsNot auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input with an update document. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -282,7 +324,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) { auto expCtx = getExpCtx(); // Set up the lookup change post image stage. - auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx); + auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec()); // Mock its input, pausing every other result. auto mockLocalSource = DocumentSourceMock::createForTest( @@ -334,6 +376,5 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) { ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); } - } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index accc45bb573..16b55988bfe 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -44,10 +44,10 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" @@ -443,6 +443,10 @@ bool getCSOptimizationFeatureFlagValue() { return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); } +bool isChangeStreamsPreAndPostImagesEnabled() { + return feature_flags::gFeatureFlagChangeStreamsPreAndPostImages.isEnabledAndIgnoreFCV(); +} + /** * Runs the tests with feature flag 'featureFlagChangeStreamsOptimization' true and false. */ @@ -561,6 +565,41 @@ TEST_F(ChangeStreamStageTest, ShouldRejectUnrecognizedFullDocumentOption) { ErrorCodes::BadValue); } +TEST_F(ChangeStreamStageTest, ShouldRejectUnsupportedFullDocumentOption) { + auto expCtx = getExpCtx(); + + // New modes that are supposed to be working only when pre-/post-images feature flag is on. + FullDocumentModeEnum modes[] = {FullDocumentModeEnum::kWhenAvailable, + FullDocumentModeEnum::kRequired}; + + for (const auto& mode : modes) { + auto spec = + BSON("$changeStream: " << DocumentSourceChangeStreamAddPostImageSpec(mode).toBSON()); + + // TODO SERVER-58584: remove the feature flag. + { + RAIIServerParameterControllerForTest controller( + "featureFlagChangeStreamsPreAndPostImages", false); + ASSERT_FALSE(isChangeStreamsPreAndPostImagesEnabled()); + + // 'DSChangeStream' is not allowed to be instantiated with new document modes when + // pre-/post-images feature flag is disabled. + ASSERT_THROWS_CODE(DSChangeStream::createFromBson(spec.firstElement(), expCtx), + AssertionException, + ErrorCodes::BadValue); + } + { + RAIIServerParameterControllerForTest controller( + "featureFlagChangeStreamsPreAndPostImages", true); + ASSERT(isChangeStreamsPreAndPostImagesEnabled()); + + // 'DSChangeStream' is allowed to be instantiated with new document modes when + // pre-/post-images feature flag is enabled. + DSChangeStream::createFromBson(spec.firstElement(), expCtx); + } + } +} + TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfterOptions) { auto expCtx = getExpCtx(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 33c919bdaaa..6166c3702ea 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -36,8 +36,8 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/pipeline/change_stream_document_diff_parser.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index a411922552e..1bd3337a81f 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -40,7 +40,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" +#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_facet.h" #include "mongo/db/pipeline/document_source_graph_lookup.h" |