summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2022-02-07 17:40:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-09 12:02:17 +0000
commit1cace179dad67a095a957720c3868d317755adb6 (patch)
tree17c9f56da5cfd7cdc5c96d89f25c79d873707b15
parent0991ecd9669e16b16ecadcc918ebf65993615cba (diff)
downloadmongo-1cace179dad67a095a957720c3868d317755adb6.tar.gz
SERVER-63391 Add a $match to filter out newly added events when 'showExpandedEvents' is false
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js25
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp15
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.h7
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp7
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp455
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp37
8 files changed, 394 insertions, 170 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
index 8f7262553de..677f7fc03f3 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
@@ -113,19 +113,17 @@ verifyNonInvalidatingOps(resumeAfterToken,
[] /* expectedOps */,
0 /* expectedOplogRetDocsForEachShard */);
-// Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog format.
-// The oplog cursor should return '4' documents.
+// Ensure that the '$match' on the operation type unknown is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: "unknown"}},
[] /* expectedOps */,
- 4 /* expectedOplogRetDocsForEachShard */);
+ 0 /* expectedOplogRetDocsForEachShard */);
-// Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog
-// format. The oplog cursor should return '4' documents for each shard.
+// Ensure that the '$match' on an empty string operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: ""}},
[] /* expectedOps */,
- 4 /* expectedOplogRetDocsForEachShard */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on operation type with inequality operator cannot be rewritten to the
// oplog format. The oplog cursor should return '4' documents for each shard.
@@ -154,19 +152,17 @@ verifyNonInvalidatingOps(resumeAfterToken,
["insert", "update"],
2 /* expectedOplogRetDocsForEachShard */);
-// Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the oplog
-// format will be abandoned. The oplog cursor should return '4' documents for each shard.
+// Ensure that for the '$in' with one valid and one invalid operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: ["insert", "unknown"]}}},
["insert"],
- 4 /* expectedOplogRetDocsForEachShard */);
+ 1 /* expectedOplogRetDocsForEachShard */);
-// Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The oplog
-// cursor should return '4' documents for each shard.
+// Ensure that the '$match' with '$in' on an unknown operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: ["unknown"]}}},
[] /* expectedOps */,
- 4 /* expectedOplogRetDocsForEachShard */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$in' with operation type as number is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
@@ -187,12 +183,11 @@ verifyNonInvalidatingOps(resumeAfterToken,
["update", "replace", "delete"],
3 /* expectedOplogRetDocsForEachShard */);
-// Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the oplog
-// format will be abandoned. The oplog cursor should return '4' documents for each shard.
+// Ensure that for the '$nin' with one valid and one invalid operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$nin: ["insert", "unknown"]}}},
["update", "replace", "delete"],
- 4 /* expectedOplogRetDocsForEachShard */);
+ 3 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 125d43c9dab..8ab33cce834 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -241,5 +241,20 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
<< "$or" << internalOpTypeOrBuilder.arr()),
expCtx);
}
+
+BSONObj getMatchFilterForClassicOperationTypes() {
+ return BSON(DocumentSourceChangeStream::kOperationTypeField << BSON(
+ "$in" << BSON_ARRAY(DocumentSourceChangeStream::kUpdateOpType
+ << DocumentSourceChangeStream::kDeleteOpType
+ << DocumentSourceChangeStream::kReplaceOpType
+ << DocumentSourceChangeStream::kInsertOpType
+ << DocumentSourceChangeStream::kDropCollectionOpType
+ << DocumentSourceChangeStream::kRenameCollectionOpType
+ << DocumentSourceChangeStream::kDropDatabaseOpType
+ << DocumentSourceChangeStream::kInvalidateOpType
+ << DocumentSourceChangeStream::kReshardBeginOpType
+ << DocumentSourceChangeStream::kReshardDoneCatchUpOpType
+ << DocumentSourceChangeStream::kNewShardDetectedOpType)));
+}
} // namespace change_stream_filter
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.h b/src/mongo/db/pipeline/change_stream_filter_helpers.h
index 96b1def978c..3ef643a9543 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.h
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.h
@@ -92,5 +92,12 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
*/
std::unique_ptr<MatchExpression> buildInternalOpFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch);
+
+/**
+ * Returns the match filter for the classic changestream operationTypes i.e. the operations that
+ * are NOT guarded behind the 'showExpandedEvents' flag.
+ */
+BSONObj getMatchFilterForClassicOperationTypes();
+
} // namespace change_stream_filter
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 24f06821a9b..ea1df1ae1a1 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
+#include "mongo/db/pipeline/change_stream_filter_helpers.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_add_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
@@ -83,6 +84,12 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// We must always check that the shard is capable of resuming from the specified point.
stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
+ // If 'showExpandedEvents' is NOT set, add a filter that returns only classic change events.
+ if (!spec.getShowExpandedEvents()) {
+ stages.push_back(DocumentSourceMatch::create(
+ change_stream_filter::getMatchFilterForClassicOperationTypes(), expCtx));
+ }
+
return stages;
}
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index 9e1cd0d308e..d080be71902 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -113,13 +113,15 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType(
// Helper to convert a BSONElement opType into a rewritten MatchExpression.
auto getRewrittenOpType = [&](auto& opType) -> std::unique_ptr<MatchExpression> {
- if (BSONType::String != opType.type()) {
+ // If the operand is not a string, then this predicate will never match. If a rewrite rule
+ // does not exist for the specified operation type, then it is either handled elsewhere or
+ // it's an invalid type. In either case, return $alwaysFalse so that this predicate is
+ // ignored.
+ if (BSONType::String != opType.type() || !kOpTypeRewriteMap.count(opType.str())) {
return std::make_unique<AlwaysFalseMatchExpression>();
- } else if (kOpTypeRewriteMap.count(opType.str())) {
- return MatchExpressionParser::parseAndNormalize(
- kOpTypeRewriteMap.at(opType.str()).toBson(), expCtx);
}
- return nullptr;
+ return MatchExpressionParser::parseAndNormalize(kOpTypeRewriteMap.at(opType.str()).toBson(),
+ expCtx);
};
switch (predicate->matchType()) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 2f715342e47..a5f8f7eb3a0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands/feature_compatibility_version_documentation.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/change_stream_constants.h"
+#include "mongo/db/pipeline/change_stream_filter_helpers.h"
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
@@ -307,6 +308,11 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec));
}
+ // If 'showExpandedEvents' is NOT set, add a filter that returns only classic change events.
+ if (!spec.getShowExpandedEvents()) {
+ stages.push_back(DocumentSourceMatch::create(
+ change_stream_filter::getMatchFilterForClassicOperationTypes(), expCtx));
+ }
return stages;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 7b9788f2842..1960d70b968 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -90,6 +90,8 @@ static const Timestamp kDefaultTs(100, 1);
static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
static const NamespaceString nss("unittests.change_stream");
static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}");
+static const BSONObj kShowExpandedEventsSpec =
+ fromjson("{$changeStream: {showExpandedEvents: true}}");
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
public:
@@ -434,6 +436,9 @@ public:
expCtx->ns = NamespaceString("a.collection");
expCtx->inMongos = true;
+ // Always enable 'featureFlagChangeStreamsVisibility' since some tests rely on
+ // 'showExpandedEvents'.
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
pipeline->optimizePipeline();
@@ -2178,7 +2183,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- ASSERT_EQ(allStages.size(), 5);
+ ASSERT_EQ(allStages.size(), 6);
auto stage = allStages[2];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
@@ -2204,7 +2209,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx);
auto newSerialization = roundTripped->serialize();
- ASSERT_EQ(newSerialization.size(), 5UL);
+ ASSERT_EQ(newSerialization.size(), 6UL);
// DSCSTransform stage should be the third stage after DSCSOplogMatch and
// DSCSUnwindTransactions stages.
@@ -2226,7 +2231,7 @@ TEST_F(ChangeStreamStageTest, DSCSTransformStageEmptySpecSerializeResumeAfter) {
ASSERT(!expCtx->initialPostBatchResumeToken.isEmpty());
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- ASSERT_EQ(allStages.size(), 5);
+ ASSERT_EQ(allStages.size(), 6);
auto transformStage = allStages[2];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transformStage.get()));
@@ -3491,12 +3496,15 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
+//
+// Tests that the single '$match' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) {
- //
- // Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'.
- //
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- fromjson("{$changeStream: {}}"),
+ kShowExpandedEventsSpec,
fromjson("{$match: {operationType: 'insert'}}"),
};
@@ -3513,12 +3521,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$match' gets merged and promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) {
- //
- // Tests that multiple '$match' gets merged and promoted before the
- // '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$match: {operationType: 'insert'}}"),
fromjson("{$match: {operationType: 'delete'}}")};
@@ -3535,15 +3545,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$match' gets merged and promoted before the
+// '$_internalChangeStreamCheckTopologyChange' when resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) {
- //
- // Tests that multiple '$match' gets merged and promoted before the
- // '$_internalUpdateOnAddShard' if resume token if present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$match" << BSON("operationType"
<< "insert")),
BSON("$match" << BSON("operationType"
@@ -3563,11 +3576,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single '$project' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) {
- //
- // Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$project: {operationType: 1}}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3583,11 +3599,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$project' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) {
- //
- // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$project: {operationType: 1}}"),
fromjson("{$project: {fullDocument: 1}}")};
@@ -3605,15 +3624,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$project' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange' if resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) {
- //
- // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if
- // resume token is present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$project" << BSON("operationType" << 1)),
BSON("$project" << BSON("fullDocument" << 1))};
@@ -3632,15 +3654,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that a '$project' followed by a '$match' gets optimized and they get promoted before
+// the '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) {
- //
- // Tests that a '$project' followed by a '$match' gets optimized and they get promoted before
- // the '$_internalUpdateOnAddShard'.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$project" << BSON("operationType" << 1)),
BSON("$match" << BSON("operationType"
<< "insert"))};
@@ -3660,12 +3685,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single '$unset' gets promoted before the
+// '$_internalChangeStreamCheckTopologyChange' as
+// '$project'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) {
- //
- // Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as
- // '$project'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$unset: 'operationType'}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3681,12 +3709,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$unset' gets promoted before the '$_internalChangeStreamCheckTopologyChange'
+// as '$project'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) {
- //
- // Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as
- // '$project'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$unset: 'operationType'}"),
fromjson("{$unset: 'fullDocument'}")};
@@ -3704,17 +3734,20 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that the '$unset' gets promoted before the '$_internalChangeStreamCheckTopologyChange' as
+// '$project' even if resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) {
- //
- // Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project'
- // even if resume token is present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
- const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
- << BSON("resumeAfter" << resumeToken)),
- BSON("$unset"
- << "operationType")};
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
+ BSON("$unset"
+ << "operationType")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3730,11 +3763,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single'$addFields' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) {
- //
- // Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$addFields: {stockPrice: 100}}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3750,11 +3786,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$addFields' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) {
- //
- // Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$addFields: {stockPrice: 100}}"),
fromjson("{$addFields: {quarter: 'Q1'}}")};
@@ -3772,15 +3811,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that the '$addFields' gets promoted before the '$_internalChangeStreamCheckTopologyChange'
+// if resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) {
- //
- // Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if
- // resume token is present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$addFields" << BSON("stockPrice" << 100))};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3797,11 +3839,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single '$set' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) {
- //
- // Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$set: {stockPrice: 100}}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3817,11 +3862,13 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that multiple '$set' gets promoted before the '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) {
- //
- // Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$set: {stockPrice: 100}}"),
fromjson("{$set: {quarter: 'Q1'}}")};
@@ -3839,15 +3886,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that the '$set' gets promoted before the '$_internalChangeStreamCheckTopologyChange' if
+// resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) {
- //
- // Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if
- // resume token is present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$set" << BSON("stockPrice" << 100))};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3864,12 +3914,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single '$replaceRoot' gets promoted before the
+// '$_internalChangeStreamHandleTopologyChange'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) {
- //
- // Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'.
- //
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- fromjson("{$changeStream: {}}"), fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")};
+ kShowExpandedEventsSpec, fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3884,15 +3937,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that the '$replaceRoot' gets promoted before the
+// '$_internalChangeStreamCheckTopologyChange' if resume token is present.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) {
- //
- // Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if
- // resume token is present.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
BSON("$replaceRoot" << BSON("newRoot"
<< "$fullDocument"))};
@@ -3910,12 +3966,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that the single '$replaceWith' gets promoted before the
+// '$_internalChangeStreamCheckTopologyChange' as
+// '$replaceRoot'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) {
- //
- // Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as
- // '$replaceRoot'.
- //
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec,
fromjson("{$replaceWith: '$fullDocument'}")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3931,17 +3990,20 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) {
"$_internalChangeStreamHandleTopologyChange"});
}
+//
+// Tests that the '$replaceWith' gets promoted before the
+// '$_internalChangeStreamCheckTopologyChange' if resume token is present as '$replaceRoot'.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) {
- //
- // Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if
- // resume token is present as '$replaceRoot'.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
- const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
- << BSON("resumeAfter" << resumeToken)),
- BSON("$replaceWith"
- << "$fullDocument")};
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
+ BSON("$replaceWith"
+ << "$fullDocument")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -3957,27 +4019,118 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) {
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
+//
+// Tests that when 'showExpandedEvents' is true, we do not inject any additional stages.
+//
+TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsTrueDoesNotInjectMatchStage) {
+ const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+//
+// Tests that when 'showExpandedEvents' is unset, we inject an additional $match stage and promote
+// it before the '$_internalChangeStreamHandleTopologyChange'.
+//
+TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseInjectsMatchStage) {
+ const std::vector<BSONObj> rawPipeline = {kDefaultSpec};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+//
+// Tests that when 'showExpandedEvents' is false, the injected match stage gets merged with the user
+// match stage and gets promoted before the '$_internalChangeStreamHandleTopologyChange'.
+//
+TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserMatch) {
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$changeStream: {showExpandedEvents: false}}"),
+ BSON("$match" << BSON("operationType"
+ << "insert"))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+//
+// Tests that when 'showExpandedEvents' is false, the injected match stage can be merged with the
+// user match stage and can be promoted before the user '$project' and
+// '$_internalChangeStreamHandleTopologyChange'.
+//
+TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserProjectMatch) {
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$changeStream: {showExpandedEvents: false}}"),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$match" << BSON("operationType"
+ << "insert")),
+ };
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+//
+// Tests that when all allowed stages are included along with the resume token, the final
+// pipeline gets optimized.
+//
TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) {
- //
- // Tests that when all allowed stages are included along with the resume token, the final
- // pipeline gets optimized.
- //
- auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
-
- const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
- << BSON("resumeAfter" << resumeToken)),
- BSON("$project" << BSON("operationType" << 1)),
- BSON("$unset"
- << "_id"),
- BSON("$addFields" << BSON("stockPrice" << 100)),
- BSON("$set"
- << BSON("fullDocument.stockPrice" << 100)),
- BSON("$match" << BSON("operationType"
- << "insert")),
- BSON("$replaceRoot" << BSON("newRoot"
- << "$fullDocument")),
- BSON("$replaceWith"
- << "fullDocument.stockPrice")};
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ const std::vector<BSONObj>
+ rawPipeline = {BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid())
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
+ << true)),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$unset"
+ << "_id"),
+ BSON("$addFields" << BSON("stockPrice" << 100)),
+ BSON("$set" << BSON("fullDocument.stockPrice" << 100)),
+ BSON("$match" << BSON("operationType"
+ << "insert")),
+ BSON("$replaceRoot" << BSON("newRoot"
+ << "$fullDocument")),
+ BSON("$replaceWith"
+ << "fullDocument.stockPrice")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -4322,7 +4475,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithInvalidO
ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
}
-TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnknownOpType) {
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithUnknownOpType) {
auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("operationType"
<< "nonExisting"),
getExpCtx());
@@ -4330,7 +4483,10 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnkno
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
- ASSERT_FALSE(rewrittenMatchExpression);
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
}
TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationType) {
@@ -4469,7 +4625,18 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) {
}
TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateWithRegexOnOperationType) {
- auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("/^in*sert")));
+ auto expr =
+ BSON("operationType" << BSON("$in" << BSON_ARRAY(BSONRegEx("^in*sert") << "update")));
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteRegexPredicateOnOperationType) {
+ auto expr = BSON("operationType" << BSONRegEx("^in*sert"));
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx());
ASSERT_OK(statusWithMatchExpression.getStatus());
@@ -4506,7 +4673,7 @@ TEST_F(ChangeStreamRewriteTest,
BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{op: {$eq: 'i'}}"))));
}
-TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateOnOperationTypeWithUnknownOpType) {
+TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationTypeWithUnknownOpType) {
auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("unknown"
<< "insert")));
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx());
@@ -4514,7 +4681,11 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateOnOperationTypeWithUnkno
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
- ASSERT_FALSE(rewrittenMatchExpression);
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ BSON(OR(fromjson("{op: {$eq: 'i'}}"), fromjson("{$alwaysFalse: 1}"))));
}
TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnOperationType) {
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index c8325745bff..0af4b541df7 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/temp_dir.h"
@@ -2647,8 +2648,13 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
- auto spec = BSON("$changeStream" << BSON("fullDocument"
- << "updateLookup"));
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+ auto spec = BSON("$changeStream" << BSON(
+ "fullDocument"
+ << "updateLookup"
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
@@ -2674,8 +2680,13 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
- auto spec = BSON("$changeStream" << BSON("fullDocument"
- << "updateLookup"));
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+ auto spec = BSON("$changeStream" << BSON(
+ "fullDocument"
+ << "updateLookup"
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
@@ -2699,8 +2710,13 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen
expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
- auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
- << "required"));
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+ auto spec = BSON("$changeStream" << BSON(
+ "fullDocumentBeforeChange"
+ << "required"
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.
@@ -2726,8 +2742,13 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre
expCtx->uuid = UUID::gen();
setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
- auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
- << "required"));
+ // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
+ // filters out newly added events.
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true);
+ auto spec = BSON("$changeStream" << BSON(
+ "fullDocumentBeforeChange"
+ << "required"
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.