summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2021-07-27 19:20:43 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-27 17:54:15 +0000
commit2d76701ca6266cf01bfc43e96c9a8ad912debb4f (patch)
treed28d3558b7b9dd57cc15a78cfc6c68262d4fa12b
parent0c1e060e7e87a32673e9a6b0311663aa4875544b (diff)
downloadmongo-2d76701ca6266cf01bfc43e96c9a8ad912debb4f.tar.gz
SERVER-58423 Parse and validate change stream open parameter "fullDocument"
-rw-r--r--jstests/change_streams/lookup_pit_post_image.js26
-rw-r--r--jstests/libs/change_stream_util.js11
-rw-r--r--src/mongo/db/pipeline/SConscript4
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp (renamed from src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp)21
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.h (renamed from src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h)19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp (renamed from src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp)63
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp41
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
12 files changed, 184 insertions, 43 deletions
diff --git a/jstests/change_streams/lookup_pit_post_image.js b/jstests/change_streams/lookup_pit_post_image.js
new file mode 100644
index 00000000000..5bb2bc9ec14
--- /dev/null
+++ b/jstests/change_streams/lookup_pit_post_image.js
@@ -0,0 +1,26 @@
+// Tests the behaviour of $changeStream's 'fullDocument' option when retrieving point-in-time
+// post-images.
+(function() {
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
+
+const testDB = db.getSiblingDB(jsTestName());
+const coll = assertDropAndRecreateCollection(testDB, "test");
+
+if (!isChangeStreamPreAndPostImagesEnabled(db)) {
+ // If feature flag is off, creating changeStream with new fullDocument arguments should throw.
+ assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}),
+ ErrorCodes.BadValue);
+ assert.throwsWithCode(() => coll.watch([], {fullDocument: 'required'}), ErrorCodes.BadValue);
+
+ jsTestLog(
+ 'Skipping test because featureFlagChangeStreamsPreAndPostImages feature flag is not enabled');
+ return;
+}
+
+// Open the change streams with new fullDocument parameters.
+assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'}));
+assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'}));
+}());
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 8daa0e8be29..e4c220743e0 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -24,6 +24,17 @@ function isChangeStreamsOptimizationEnabled(db) {
}
/**
+ * Returns true if feature flag 'featureFlagChangeStreamsPreAndPostImages' is enabled, false
+ * otherwise.
+ */
+function isChangeStreamPreAndPostImagesEnabled(db) {
+ const getParam =
+ db.adminCommand({getParameter: 1, featureFlagChangeStreamsPreAndPostImages: 1});
+ return getParam.hasOwnProperty("featureFlagChangeStreamsPreAndPostImages") &&
+ getParam.featureFlagChangeStreamsPreAndPostImages.value;
+}
+
+/**
* Returns true if feature flag 'featureFlagChangeStreamsRewrite' is enabled, false otherwise.
*/
function isChangeStreamsRewriteEnabled(db) {
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 47439f941ec..ccdbf58aa3d 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -349,13 +349,13 @@ env.Library(
'change_stream_document_diff_parser.cpp',
'change_stream_helpers_legacy.cpp',
'document_source_change_stream.cpp',
+ 'document_source_change_stream_add_post_image.cpp',
'document_source_change_stream_check_invalidate.cpp',
'document_source_change_stream_check_resumability.cpp',
'document_source_change_stream_check_topology_change.cpp',
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_ensure_resume_token_present.cpp',
'document_source_change_stream_handle_topology_change.cpp',
- 'document_source_change_stream_lookup_post_image.cpp',
'document_source_change_stream_lookup_pre_image.cpp',
'document_source_change_stream_oplog_match.cpp',
'document_source_change_stream_transform.cpp',
@@ -433,7 +433,7 @@ env.CppUnitTest(
'document_source_internal_shard_filter_test.cpp',
'document_source_internal_split_pipeline_test.cpp',
'document_source_limit_test.cpp',
- 'document_source_change_stream_lookup_post_image_test.cpp',
+ 'document_source_change_stream_add_post_image_test.cpp',
'document_source_lookup_test.cpp',
'document_source_match_test.cpp',
'document_source_merge_cursors_test.cpp',
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 797254ad6e0..554559cfe23 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -31,13 +31,13 @@
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
@@ -112,10 +112,10 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec));
}
- // There should be only one post-image lookup stage. If we're on the shards and producing
+ // There should be only one post-image lookup stage. If we're on the shards and producing
// input to be merged, the lookup is done on the mongos.
- if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
- stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx));
+ if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) {
+ stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec));
}
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 42ca7d5f39c..fc6986597b1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -38,13 +38,13 @@
#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
@@ -261,9 +261,9 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec));
}
- // If 'fullDocument' is set to "updateLookup", add the DSCSAddPostImage stage here.
- if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
- stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx));
+ // If 'fullDocument' is not set to "default", add the DSCSAddPostImage stage here.
+ if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) {
+ stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec));
}
// If the pipeline is built on MongoS, then the DSCSHandleTopologyChange stage acts as the
@@ -328,6 +328,18 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
"the 'fullDocumentBeforeChange' option is not supported in a sharded cluster",
!(shouldAddPreImage && (expCtx->inMongos || expCtx->needsMerge)));
+ // TODO SERVER-58584: remove the feature flag.
+ if (!feature_flags::gFeatureFlagChangeStreamsPreAndPostImages.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "Specified value '"
+ << FullDocumentMode_serializer(spec.getFullDocument())
+ << "' is not a valid option for the 'fullDocument' parameter of the "
+ "$changeStream stage",
+ spec.getFullDocument() == FullDocumentModeEnum::kDefault ||
+ spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup);
+ }
+
uassert(31123,
"Change streams from mongos may not show migration events",
!(expCtx->inMongos && spec.getShowMigrationEvents()));
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index e8094f2915d..b59a555c4df 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -55,7 +55,7 @@ enums:
values:
# Do not supply pre-images.
kOff: "off"
- # Supply a pre-image if available. Otherwise, omit the output field.
+ # Supply a pre-image if available. Otherwise, return null.
kWhenAvailable: "whenAvailable"
# Pre-images are required.Throw an exception if not available.
kRequired: "required"
@@ -64,8 +64,14 @@ enums:
description: Possible modes for the 'fullDocument' parameter of the $changeStream stage.
type: string
values:
- kUpdateLookup: "updateLookup"
+ # Post-images will only be populated for insert and replace events.
kDefault: "default"
+ # The current version of the document will be retrieved for update events.
+ kUpdateLookup: "updateLookup"
+ # Supply a post-image if available. Otherwise, return null.
+ kWhenAvailable: "whenAvailable"
+ # Post-images are required. Throw an exception if not available.
+ kRequired: "required"
structs:
ResumeTokenClusterTime:
diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
index 3851bca53f4..c20ba0d03f9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
@@ -66,11 +66,7 @@ DocumentSourceChangeStreamAddPostImage::createFromBson(
elem.type() == BSONType::Object);
auto parsedSpec = DocumentSourceChangeStreamAddPostImageSpec::parse(
IDLParserErrorContext("DocumentSourceChangeStreamAddPostImageSpec"), elem.Obj());
- uassert(5467609,
- str::stream() << "the 'fullDocument' field can only be 'updateLookup'",
- parsedSpec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup);
-
- return new DocumentSourceChangeStreamAddPostImage(expCtx);
+ return new DocumentSourceChangeStreamAddPostImage(expCtx, parsedSpec.getFullDocument());
}
DocumentSource::GetNextResult DocumentSourceChangeStreamAddPostImage::doGetNext() {
@@ -141,13 +137,12 @@ Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& up
Value DocumentSourceChangeStreamAddPostImage::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
- ? Value(Document{{DocumentSourceChangeStream::kStageName,
- Document{{"stage"_sd, "internalAddPostImage"_sd},
- {"fullDocumentBeforeChange"_sd, "updateLookUp"_sd}}}})
- : Value(Document{
- {kStageName,
- DocumentSourceChangeStreamAddPostImageSpec(FullDocumentModeEnum::kUpdateLookup)
- .toBSON()}});
+ ? Value(Document{
+ {DocumentSourceChangeStream::kStageName,
+ Document{{"stage"_sd, kStageName},
+ {kFullDocumentFieldName, FullDocumentMode_serializer(_fullDocumentMode)}}}})
+ : Value(Document{{kStageName,
+ DocumentSourceChangeStreamAddPostImageSpec(_fullDocumentMode).toBSON()}});
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
index cb9833a35de..6f5addd5cc1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
@@ -50,8 +50,9 @@ public:
* Creates a DocumentSourceChangeStreamAddPostImage stage.
*/
static boost::intrusive_ptr<DocumentSourceChangeStreamAddPostImage> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceChangeStreamAddPostImage(expCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ return new DocumentSourceChangeStreamAddPostImage(expCtx, spec.getFullDocument());
}
static boost::intrusive_ptr<DocumentSourceChangeStreamAddPostImage> createFromBson(
@@ -112,8 +113,13 @@ public:
}
private:
- DocumentSourceChangeStreamAddPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(kStageName, expCtx) {}
+ DocumentSourceChangeStreamAddPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const FullDocumentModeEnum fullDocumentMode)
+ : DocumentSource(kStageName, expCtx), _fullDocumentMode(fullDocumentMode) {
+ tassert(5842300,
+ "the 'fullDocument' field cannot be 'default'",
+ _fullDocumentMode != FullDocumentModeEnum::kDefault);
+ }
/**
* Performs the lookup to retrieve the full document.
@@ -135,6 +141,11 @@ private:
Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+ // Determines whether post-images are strictly required or may be included only when available,
+ // and whether to return a point-in-time post-image or the most current majority-committed
+ // version of the updated document.
+ FullDocumentModeEnum _fullDocumentMode = FullDocumentModeEnum::kDefault;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
index 7c226271066..329c497a5dd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
@@ -41,10 +41,11 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h"
+#include "mongo/idl/server_parameter_test_util.h"
namespace mongo {
namespace {
@@ -76,13 +77,54 @@ public:
return ResumeToken(ResumeTokenData(ts, 0, 0, testUuid(), Value(Document{{"_id", id}})))
.toDocument();
}
+
+ DocumentSourceChangeStreamSpec getSpec(
+ FullDocumentModeEnum documentMode = FullDocumentModeEnum::kUpdateLookup) {
+ auto spec = DocumentSourceChangeStreamSpec();
+ spec.setFullDocument(documentMode);
+ return spec;
+ }
};
+TEST_F(DocumentSourceChangeStreamAddPostImageTest,
+ CannotCreateStageFromBsonWithUnrecognizedFullDocumentMode) {
+ auto expCtx = getExpCtx();
+ auto spec = BSON("$changeStream: " << BSON("fullDocument"
+ << "banana"));
+ ASSERT_THROWS_CODE(
+ DocumentSourceChangeStreamAddPostImage::createFromBson(spec.firstElement(), expCtx),
+ AssertionException,
+ ErrorCodes::BadValue);
+}
+
+TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForExplain) {
+ auto expCtx = getExpCtx();
+ const auto stage = DocumentSourceChangeStreamAddPostImage::create(
+ expCtx, getSpec(FullDocumentModeEnum::kUpdateLookup));
+ const auto expectedOutput =
+ Value(Document{{DocumentSourceChangeStream::kStageName,
+ Document{{"stage"_sd, DocumentSourceChangeStreamAddPostImage::kStageName},
+ {"fullDocument"_sd, "updateLookup"_sd}}}});
+
+ ASSERT_VALUE_EQ(stage->serializeToValue({ExplainOptions::Verbosity::kQueryPlanner}),
+ expectedOutput);
+}
+
+TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForDispatch) {
+ auto expCtx = getExpCtx();
+ const auto stage = DocumentSourceChangeStreamAddPostImage::create(
+ expCtx, getSpec(FullDocumentModeEnum::kUpdateLookup));
+ const auto expectedOutput = Value(Document{{DocumentSourceChangeStreamAddPostImage::kStageName,
+ Document{{"fullDocument"_sd, "updateLookup"_sd}}}});
+
+ ASSERT_VALUE_EQ(stage->serializeToValue(), expectedOutput);
+}
+
TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "documentKey" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -105,7 +147,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingOperation
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -128,7 +170,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingNamespace
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -152,7 +194,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfNsFieldHasWrongT
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "ns" field.
auto mockLocalSource =
@@ -175,7 +217,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfNsFieldDoesNotMa
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -201,7 +243,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest,
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test");
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -226,7 +268,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPassIfDatabaseMatchesOn
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test");
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}};
@@ -256,7 +298,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfDocumentKeyIsNot
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input with an update document.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -282,7 +324,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamAddPostImage::create(expCtx, getSpec());
// Mock its input, pausing every other result.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -334,6 +376,5 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) {
ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index accc45bb573..16b55988bfe 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -44,10 +44,10 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
@@ -443,6 +443,10 @@ bool getCSOptimizationFeatureFlagValue() {
return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
}
+bool isChangeStreamsPreAndPostImagesEnabled() {
+ return feature_flags::gFeatureFlagChangeStreamsPreAndPostImages.isEnabledAndIgnoreFCV();
+}
+
/**
* Runs the tests with feature flag 'featureFlagChangeStreamsOptimization' true and false.
*/
@@ -561,6 +565,41 @@ TEST_F(ChangeStreamStageTest, ShouldRejectUnrecognizedFullDocumentOption) {
ErrorCodes::BadValue);
}
+TEST_F(ChangeStreamStageTest, ShouldRejectUnsupportedFullDocumentOption) {
+ auto expCtx = getExpCtx();
+
+ // New modes that are supposed to be working only when pre-/post-images feature flag is on.
+ FullDocumentModeEnum modes[] = {FullDocumentModeEnum::kWhenAvailable,
+ FullDocumentModeEnum::kRequired};
+
+ for (const auto& mode : modes) {
+ auto spec =
+ BSON("$changeStream: " << DocumentSourceChangeStreamAddPostImageSpec(mode).toBSON());
+
+ // TODO SERVER-58584: remove the feature flag.
+ {
+ RAIIServerParameterControllerForTest controller(
+ "featureFlagChangeStreamsPreAndPostImages", false);
+ ASSERT_FALSE(isChangeStreamsPreAndPostImagesEnabled());
+
+ // 'DSChangeStream' is not allowed to be instantiated with new document modes when
+ // pre-/post-images feature flag is disabled.
+ ASSERT_THROWS_CODE(DSChangeStream::createFromBson(spec.firstElement(), expCtx),
+ AssertionException,
+ ErrorCodes::BadValue);
+ }
+ {
+ RAIIServerParameterControllerForTest controller(
+ "featureFlagChangeStreamsPreAndPostImages", true);
+ ASSERT(isChangeStreamsPreAndPostImagesEnabled());
+
+ // 'DSChangeStream' is allowed to be instantiated with new document modes when
+ // pre-/post-images feature flag is enabled.
+ DSChangeStream::createFromBson(spec.firstElement(), expCtx);
+ }
+ }
+}
+
TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfterOptions) {
auto expCtx = getExpCtx();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 33c919bdaaa..6166c3702ea 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -36,8 +36,8 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index a411922552e..1bd3337a81f 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -40,7 +40,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_facet.h"
#include "mongo/db/pipeline/document_source_graph_lookup.h"