diff options
author | Arun Banala <arun.banala@mongodb.com> | 2022-02-07 17:40:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-09 12:02:17 +0000 |
commit | 1cace179dad67a095a957720c3868d317755adb6 (patch) | |
tree | 17c9f56da5cfd7cdc5c96d89f25c79d873707b15 | |
parent | 0991ecd9669e16b16ecadcc918ebf65993615cba (diff) | |
download | mongo-1cace179dad67a095a957720c3868d317755adb6.tar.gz |
SERVER-63391 Add a $match to filter out newly added events when 'showExpandedEvents' is false
8 files changed, 394 insertions, 170 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js index 8f7262553de..677f7fc03f3 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js @@ -113,19 +113,17 @@ verifyNonInvalidatingOps(resumeAfterToken, [] /* expectedOps */, 0 /* expectedOplogRetDocsForEachShard */); -// Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog format. -// The oplog cursor should return '4' documents. +// Ensure that the '$match' on the operation type unknown is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: "unknown"}}, [] /* expectedOps */, - 4 /* expectedOplogRetDocsForEachShard */); + 0 /* expectedOplogRetDocsForEachShard */); -// Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog -// format. The oplog cursor should return '4' documents for each shard. +// Ensure that the '$match' on an empty string operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: ""}}, [] /* expectedOps */, - 4 /* expectedOplogRetDocsForEachShard */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on operation type with inequality operator cannot be rewritten to the // oplog format. The oplog cursor should return '4' documents for each shard. @@ -154,19 +152,17 @@ verifyNonInvalidatingOps(resumeAfterToken, ["insert", "update"], 2 /* expectedOplogRetDocsForEachShard */); -// Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the oplog -// format will be abandoned. The oplog cursor should return '4' documents for each shard. +// Ensure that for the '$in' with one valid and one invalid operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: ["insert", "unknown"]}}}, ["insert"], - 4 /* expectedOplogRetDocsForEachShard */); + 1 /* expectedOplogRetDocsForEachShard */); -// Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The oplog -// cursor should return '4' documents for each shard. +// Ensure that the '$match' with '$in' on an unknown operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: ["unknown"]}}}, [] /* expectedOps */, - 4 /* expectedOplogRetDocsForEachShard */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$in' with operation type as number is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, @@ -187,12 +183,11 @@ verifyNonInvalidatingOps(resumeAfterToken, ["update", "replace", "delete"], 3 /* expectedOplogRetDocsForEachShard */); -// Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the oplog -// format will be abandoned. The oplog cursor should return '4' documents for each shard. +// Ensure that for the '$nin' with one valid and one invalid operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$nin: ["insert", "unknown"]}}}, ["update", "replace", "delete"], - 4 /* expectedOplogRetDocsForEachShard */); + 3 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index 125d43c9dab..8ab33cce834 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -241,5 +241,20 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( << "$or" << internalOpTypeOrBuilder.arr()), expCtx); } + +BSONObj getMatchFilterForClassicOperationTypes() { + return BSON(DocumentSourceChangeStream::kOperationTypeField << BSON( + "$in" << BSON_ARRAY(DocumentSourceChangeStream::kUpdateOpType + << DocumentSourceChangeStream::kDeleteOpType + << DocumentSourceChangeStream::kReplaceOpType + << DocumentSourceChangeStream::kInsertOpType + << DocumentSourceChangeStream::kDropCollectionOpType + << DocumentSourceChangeStream::kRenameCollectionOpType + << DocumentSourceChangeStream::kDropDatabaseOpType + << DocumentSourceChangeStream::kInvalidateOpType + << DocumentSourceChangeStream::kReshardBeginOpType + << DocumentSourceChangeStream::kReshardDoneCatchUpOpType + << DocumentSourceChangeStream::kNewShardDetectedOpType))); +} } // namespace change_stream_filter } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.h b/src/mongo/db/pipeline/change_stream_filter_helpers.h index 96b1def978c..3ef643a9543 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.h +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.h @@ -92,5 +92,12 @@ std::unique_ptr<MatchExpression> buildTransactionFilter( */ std::unique_ptr<MatchExpression> buildInternalOpFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch); + +/** + * Returns the match filter for the classic changestream operationTypes i.e. the operations that + * are NOT guarded behind the 'showExpandedEvents' flag. + */ +BSONObj getMatchFilterForClassicOperationTypes(); + } // namespace change_stream_filter } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 24f06821a9b..ea1df1ae1a1 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/change_stream_helpers_legacy.h" +#include "mongo/db/pipeline/change_stream_filter_helpers.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_add_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" @@ -83,6 +84,12 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( // We must always check that the shard is capable of resuming from the specified point. stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); + // If 'showExpandedEvents' is NOT set, add a filter that returns only classic change events. + if (!spec.getShowExpandedEvents()) { + stages.push_back(DocumentSourceMatch::create( + change_stream_filter::getMatchFilterForClassicOperationTypes(), expCtx)); + } + return stages; } diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index 9e1cd0d308e..d080be71902 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -113,13 +113,15 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType( // Helper to convert a BSONElement opType into a rewritten MatchExpression. auto getRewrittenOpType = [&](auto& opType) -> std::unique_ptr<MatchExpression> { - if (BSONType::String != opType.type()) { + // If the operand is not a string, then this predicate will never match. If a rewrite rule + // does not exist for the specified operation type, then it is either handled elsewhere or + // it's an invalid type. In either case, return $alwaysFalse so that this predicate is + // ignored. + if (BSONType::String != opType.type() || !kOpTypeRewriteMap.count(opType.str())) { return std::make_unique<AlwaysFalseMatchExpression>(); - } else if (kOpTypeRewriteMap.count(opType.str())) { - return MatchExpressionParser::parseAndNormalize( - kOpTypeRewriteMap.at(opType.str()).toBson(), expCtx); } - return nullptr; + return MatchExpressionParser::parseAndNormalize(kOpTypeRewriteMap.at(opType.str()).toBson(), + expCtx); }; switch (predicate->matchType()) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 2f715342e47..a5f8f7eb3a0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands/feature_compatibility_version_documentation.h" #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/pipeline/change_stream_constants.h" +#include "mongo/db/pipeline/change_stream_filter_helpers.h" #include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" @@ -307,6 +308,11 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec)); } + // If 'showExpandedEvents' is NOT set, add a filter that returns only classic change events. + if (!spec.getShowExpandedEvents()) { + stages.push_back(DocumentSourceMatch::create( + change_stream_filter::getMatchFilterForClassicOperationTypes(), expCtx)); + } return stages; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 7b9788f2842..1960d70b968 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -90,6 +90,8 @@ static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); static const NamespaceString nss("unittests.change_stream"); static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); +static const BSONObj kShowExpandedEventsSpec = + fromjson("{$changeStream: {showExpandedEvents: true}}"); class ChangeStreamStageTestNoSetup : public AggregationContextFixture { public: @@ -434,6 +436,9 @@ public: expCtx->ns = NamespaceString("a.collection"); expCtx->inMongos = true; + // Always enable 'featureFlagChangeStreamsVisibility' since some tests rely on + // 'showExpandedEvents'. + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); auto pipeline = Pipeline::parse(rawPipeline, expCtx); pipeline->optimizePipeline(); @@ -2178,7 +2183,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - ASSERT_EQ(allStages.size(), 5); + ASSERT_EQ(allStages.size(), 6); auto stage = allStages[2]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); @@ -2204,7 +2209,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx); auto newSerialization = roundTripped->serialize(); - ASSERT_EQ(newSerialization.size(), 5UL); + ASSERT_EQ(newSerialization.size(), 6UL); // DSCSTransform stage should be the third stage after DSCSOplogMatch and // DSCSUnwindTransactions stages. @@ -2226,7 +2231,7 @@ TEST_F(ChangeStreamStageTest, DSCSTransformStageEmptySpecSerializeResumeAfter) { ASSERT(!expCtx->initialPostBatchResumeToken.isEmpty()); vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); - ASSERT_EQ(allStages.size(), 5); + ASSERT_EQ(allStages.size(), 6); auto transformStage = allStages[2]; ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transformStage.get())); @@ -3491,12 +3496,15 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken))); } +// +// Tests that the single '$match' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) { - // - // Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'. - // + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - fromjson("{$changeStream: {}}"), + kShowExpandedEventsSpec, fromjson("{$match: {operationType: 'insert'}}"), }; @@ -3513,12 +3521,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$match' gets merged and promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) { - // - // Tests that multiple '$match' gets merged and promoted before the - // '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$match: {operationType: 'insert'}}"), fromjson("{$match: {operationType: 'delete'}}")}; @@ -3535,15 +3545,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$match' gets merged and promoted before the +// '$_internalChangeStreamCheckTopologyChange' when resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) { - // - // Tests that multiple '$match' gets merged and promoted before the - // '$_internalUpdateOnAddShard' if resume token if present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$match" << BSON("operationType" << "insert")), BSON("$match" << BSON("operationType" @@ -3563,11 +3576,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single '$project' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) { - // - // Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$project: {operationType: 1}}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3583,11 +3599,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$project' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) { - // - // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$project: {operationType: 1}}"), fromjson("{$project: {fullDocument: 1}}")}; @@ -3605,15 +3624,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$project' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange' if resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) { - // - // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if - // resume token is present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$project" << BSON("operationType" << 1)), BSON("$project" << BSON("fullDocument" << 1))}; @@ -3632,15 +3654,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that a '$project' followed by a '$match' gets optimized and they get promoted before +// the '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) { - // - // Tests that a '$project' followed by a '$match' gets optimized and they get promoted before - // the '$_internalUpdateOnAddShard'. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$project" << BSON("operationType" << 1)), BSON("$match" << BSON("operationType" << "insert"))}; @@ -3660,12 +3685,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single '$unset' gets promoted before the +// '$_internalChangeStreamCheckTopologyChange' as +// '$project'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) { - // - // Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as - // '$project'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$unset: 'operationType'}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3681,12 +3709,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$unset' gets promoted before the '$_internalChangeStreamCheckTopologyChange' +// as '$project'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) { - // - // Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as - // '$project'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$unset: 'operationType'}"), fromjson("{$unset: 'fullDocument'}")}; @@ -3704,17 +3734,20 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that the '$unset' gets promoted before the '$_internalChangeStreamCheckTopologyChange' as +// '$project' even if resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) { - // - // Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project' - // even if resume token is present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - - const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" - << BSON("resumeAfter" << resumeToken)), - BSON("$unset" - << "operationType")}; + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), + BSON("$unset" + << "operationType")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3730,11 +3763,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single'$addFields' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) { - // - // Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$addFields: {stockPrice: 100}}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3750,11 +3786,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$addFields' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) { - // - // Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$addFields: {stockPrice: 100}}"), fromjson("{$addFields: {quarter: 'Q1'}}")}; @@ -3772,15 +3811,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that the '$addFields' gets promoted before the '$_internalChangeStreamCheckTopologyChange' +// if resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) { - // - // Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if - // resume token is present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$addFields" << BSON("stockPrice" << 100))}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3797,11 +3839,14 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single '$set' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) { - // - // Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$set: {stockPrice: 100}}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3817,11 +3862,13 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that multiple '$set' gets promoted before the '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) { - // - // Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$set: {stockPrice: 100}}"), fromjson("{$set: {quarter: 'Q1'}}")}; @@ -3839,15 +3886,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that the '$set' gets promoted before the '$_internalChangeStreamCheckTopologyChange' if +// resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) { - // - // Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if - // resume token is present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$set" << BSON("stockPrice" << 100))}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3864,12 +3914,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single '$replaceRoot' gets promoted before the +// '$_internalChangeStreamHandleTopologyChange'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) { - // - // Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'. - // + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - fromjson("{$changeStream: {}}"), fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")}; + kShowExpandedEventsSpec, fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3884,15 +3937,18 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that the '$replaceRoot' gets promoted before the +// '$_internalChangeStreamCheckTopologyChange' if resume token is present. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) { - // - // Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if - // resume token is present. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), BSON("$replaceRoot" << BSON("newRoot" << "$fullDocument"))}; @@ -3910,12 +3966,15 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that the single '$replaceWith' gets promoted before the +// '$_internalChangeStreamCheckTopologyChange' as +// '$replaceRoot'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) { - // - // Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as - // '$replaceRoot'. - // - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec, fromjson("{$replaceWith: '$fullDocument'}")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3931,17 +3990,20 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) { "$_internalChangeStreamHandleTopologyChange"}); } +// +// Tests that the '$replaceWith' gets promoted before the +// '$_internalChangeStreamCheckTopologyChange' if resume token is present as '$replaceRoot'. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) { - // - // Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if - // resume token is present as '$replaceRoot'. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - - const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" - << BSON("resumeAfter" << resumeToken)), - BSON("$replaceWith" - << "$fullDocument")}; + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), + BSON("$replaceWith" + << "$fullDocument")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -3957,27 +4019,118 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) { "$_internalChangeStreamEnsureResumeTokenPresent"}); } +// +// Tests that when 'showExpandedEvents' is true, we do not inject any additional stages. +// +TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsTrueDoesNotInjectMatchStage) { + const std::vector<BSONObj> rawPipeline = {kShowExpandedEventsSpec}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$_internalChangeStreamHandleTopologyChange"}); +} + +// +// Tests that when 'showExpandedEvents' is unset, we inject an additional $match stage and promote +// it before the '$_internalChangeStreamHandleTopologyChange'. +// +TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseInjectsMatchStage) { + const std::vector<BSONObj> rawPipeline = {kDefaultSpec}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$_internalChangeStreamHandleTopologyChange"}); +} + +// +// Tests that when 'showExpandedEvents' is false, the injected match stage gets merged with the user +// match stage and gets promoted before the '$_internalChangeStreamHandleTopologyChange'. +// +TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserMatch) { + const std::vector<BSONObj> rawPipeline = { + fromjson("{$changeStream: {showExpandedEvents: false}}"), + BSON("$match" << BSON("operationType" + << "insert"))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$_internalChangeStreamHandleTopologyChange"}); +} + +// +// Tests that when 'showExpandedEvents' is false, the injected match stage can be merged with the +// user match stage and can be promoted before the user '$project' and +// '$_internalChangeStreamHandleTopologyChange'. +// +TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserProjectMatch) { + const std::vector<BSONObj> rawPipeline = { + fromjson("{$changeStream: {showExpandedEvents: false}}"), + BSON("$project" << BSON("operationType" << 1)), + BSON("$match" << BSON("operationType" + << "insert")), + }; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$project", + "$_internalChangeStreamHandleTopologyChange"}); +} + +// +// Tests that when all allowed stages are included along with the resume token, the final +// pipeline gets optimized. +// TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) { - // - // Tests that when all allowed stages are included along with the resume token, the final - // pipeline gets optimized. - // - auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); - - const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" - << BSON("resumeAfter" << resumeToken)), - BSON("$project" << BSON("operationType" << 1)), - BSON("$unset" - << "_id"), - BSON("$addFields" << BSON("stockPrice" << 100)), - BSON("$set" - << BSON("fullDocument.stockPrice" << 100)), - BSON("$match" << BSON("operationType" - << "insert")), - BSON("$replaceRoot" << BSON("newRoot" - << "$fullDocument")), - BSON("$replaceWith" - << "fullDocument.stockPrice")}; + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + const std::vector<BSONObj> + rawPipeline = {BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid()) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName + << true)), + BSON("$project" << BSON("operationType" << 1)), + BSON("$unset" + << "_id"), + BSON("$addFields" << BSON("stockPrice" << 100)), + BSON("$set" << BSON("fullDocument.stockPrice" << 100)), + BSON("$match" << BSON("operationType" + << "insert")), + BSON("$replaceRoot" << BSON("newRoot" + << "$fullDocument")), + BSON("$replaceWith" + << "fullDocument.stockPrice")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -4322,7 +4475,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithInvalidO ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); } -TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnknownOpType) { +TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithUnknownOpType) { auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("operationType" << "nonExisting"), getExpCtx()); @@ -4330,7 +4483,10 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnkno auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); - ASSERT_FALSE(rewrittenMatchExpression); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); } TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationType) { @@ -4469,7 +4625,18 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) { } TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateWithRegexOnOperationType) { - auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("/^in*sert"))); + auto expr = + BSON("operationType" << BSON("$in" << BSON_ARRAY(BSONRegEx("^in*sert") << "update"))); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteRegexPredicateOnOperationType) { + auto expr = BSON("operationType" << BSONRegEx("^in*sert")); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx()); ASSERT_OK(statusWithMatchExpression.getStatus()); @@ -4506,7 +4673,7 @@ TEST_F(ChangeStreamRewriteTest, BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{op: {$eq: 'i'}}")))); } -TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateOnOperationTypeWithUnknownOpType) { +TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationTypeWithUnknownOpType) { auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("unknown" << "insert"))); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx()); @@ -4514,7 +4681,11 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteInPredicateOnOperationTypeWithUnkno auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); - ASSERT_FALSE(rewrittenMatchExpression); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + BSON(OR(fromjson("{op: {$eq: 'i'}}"), fromjson("{$alwaysFalse: 1}")))); } TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnOperationType) { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index c8325745bff..0af4b541df7 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -63,6 +63,7 @@ #include "mongo/db/query/query_test_service_context.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/temp_dir.h" @@ -2647,8 +2648,13 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); - auto spec = BSON("$changeStream" << BSON("fullDocument" - << "updateLookup")); + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + auto spec = BSON("$changeStream" << BSON( + "fullDocument" + << "updateLookup" + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. @@ -2674,8 +2680,13 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); - auto spec = BSON("$changeStream" << BSON("fullDocument" - << "updateLookup")); + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + auto spec = BSON("$changeStream" << BSON( + "fullDocument" + << "updateLookup" + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the change lookup is at the end. @@ -2699,8 +2710,13 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); - auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" - << "required")); + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + auto spec = BSON("$changeStream" << BSON( + "fullDocumentBeforeChange" + << "required" + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. @@ -2726,8 +2742,13 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre expCtx->uuid = UUID::gen(); setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); - auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" - << "required")); + // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which + // filters out newly added events. + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + auto spec = BSON("$changeStream" << BSON( + "fullDocumentBeforeChange" + << "required" + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)); auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); ASSERT_EQ(stages.size(), getChangeStreamStageSize()); // Make sure the pre-image lookup is at the end. |