diff options
author | Drew Paroski <drew.paroski@mongodb.com> | 2021-09-17 20:39:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-02 18:27:23 +0000 |
commit | 9dd93503190332d7518634bb49bde9a8edb93244 (patch) | |
tree | 06d12d88e18da673ca85ea5e65532698e7385a9c | |
parent | ba5cf359d87e3fb3b32d24056e0da1b8452ac304 (diff) | |
download | mongo-9dd93503190332d7518634bb49bde9a8edb93244.tar.gz |
SERVER-55545 Add support for change stream $match pushdown for the "updateDescription" field
3 files changed, 584 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js new file mode 100644 index 00000000000..13fbbad16b5 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js @@ -0,0 +1,234 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate +// involving the 'updateDescription' field can push down the $match and rewrite the $match and make +// it part of the oplog cursor's filter in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_updateDescription_rewrite"; +const collNameBase = "change_stream_match_pushdown_updateDescription_rewrite"; + +// Start a new 2-shard cluster. One shard will always write v1 update oplog entries, the other v2. +const st = new ShardingTest({ + shards: [ + {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}, + { + nodes: 1, + setParameter: { + writePeriodicNoops: true, + periodicNoopIntervalSecs: 1, + internalQueryEnableLoggingV2OplogEntries: false + } + } + ] +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// To maximize coverage, we perform these tests twice and reverse the order of shards on which each +// update is performed for each run. This ensures that all updates are tested for both v1 and v2 +// oplog formats. +for (const reverseShards of [false, true]) { + const s0 = reverseShards ? 1 : 0; + const s1 = reverseShards ? 0 : 1; + const collName = collNameBase + s0 + s1; + + // Returns a newly created sharded collection, where shard key is 'shard'. + const coll = + createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */); + + // Open a change stream and store the resume token. This resume token will be used to replay the + // stream after this point. + const resumeAfterToken = coll.watch([]).getResumeToken(); + + // A helper that opens a change stream with the user supplied match expression 'userMatchExpr' + // and validates that: (1) for each shard, the events are seen in that order as specified in + // 'expectedOps'; and (2) each shard returns the expected number of events; and (3) the number + // of docs returned by the oplog cursor on each shard matches what we expect + // as specified in 'expectedOplogCursorReturnedDocs'. + const verifyOps = function(userMatchExpr, expectedOps, expectedOplogCursorReturnedDocs) { + const cursor = + coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]); + + let expectedChangeStreamDocsReturned = [0, 0]; + for (const [op, id, s] of expectedOps) { + const shardId = s == 0 ? s0 : s == 1 ? s1 : s; + + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + + assert.eq(event.operationType, op, event); + assert.eq(event.documentKey._id, id, event); + assert.eq(event.documentKey.shard, shardId, event); + if (shardId == 0 || shardId == 1) { + ++expectedChangeStreamDocsReturned[shardId]; + } + } + + assert(!cursor.hasNext()); + + // An 'executionStats' could only be captured for a non-invalidating stream. + const stats = coll.explain("executionStats").aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken}}, + userMatchExpr + ]); + + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs0.name, expectedChangeStreamDocsReturned[0]); + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs1.name, expectedChangeStreamDocsReturned[1]); + + assertNumMatchingOplogEventsForShard( + stats, st.rs0.name, expectedOplogCursorReturnedDocs[s0]); + assertNumMatchingOplogEventsForShard( + stats, st.rs1.name, expectedOplogCursorReturnedDocs[s1]); + }; + + // These operations will create oplog events. The change stream will apply several filters + // on these series of events and ensure that the '$match' expressions are rewritten + // correctly. + assert.commandWorked(coll.insert({_id: 2, shard: s0})); + assert.commandWorked(coll.insert({_id: 2, shard: s1})); + assert.commandWorked(coll.insert({_id: 3, shard: s0})); + assert.commandWorked(coll.insert({_id: 3, shard: s1})); + + assert.commandWorked(coll.replaceOne({_id: 2, shard: s0}, + {_id: 2, shard: s0, z: 4, f: "a", w: {h: 5, k: 5, l: 5}})); + assert.commandWorked(coll.replaceOne({_id: 2, shard: s1}, + {_id: 2, shard: s1, z: 4, f: "a", w: {h: 5, k: 5, l: 5}})); + assert.commandWorked(coll.replaceOne({_id: 3, shard: s0}, + {_id: 3, shard: s0, 1: 4, f: "a", w: {h: 5, k: 5, l: 5}})); + assert.commandWorked(coll.replaceOne({_id: 3, shard: s1}, + {_id: 3, shard: s1, y: 4, f: "a", w: {h: 5, k: 5, l: 5}})); + + assert.commandWorked(reverseShards ? coll.update({_id: 2, shard: s0}, {$unset: {z: 0}}) + : coll.update({_id: 2, shard: s0}, [{$unset: ["z"]}])); + assert.commandWorked(reverseShards + ? coll.update({_id: 2, shard: s1}, [{$set: {g: "c"}}, {$unset: "z"}]) + : coll.update({_id: 2, shard: s1}, {$set: {g: "c"}, $unset: {z: 0}})); + assert.commandWorked( + coll.update({_id: 3, shard: s0}, {$set: {f: "b", x: {j: 7}}, $unset: {"w.h": 0, 1: 0}})); + assert.commandWorked( + coll.update({_id: 3, shard: s1}, {$set: {"0": "d", x: {j: 7}}, $unset: {y: 0, "w.h": 0}})); + + assert.commandWorked(coll.deleteOne({_id: 2, shard: s0})); + assert.commandWorked(coll.deleteOne({_id: 2, shard: s1})); + assert.commandWorked(coll.deleteOne({_id: 3, shard: s0})); + assert.commandWorked(coll.deleteOne({_id: 3, shard: s1})); + + // Ensure that the '$match' on the 'update' operation type with various predicates are rewritten + // correctly. + const op = "update"; + + const v1UpdateDesc = {updatedFields: {}, removedFields: ["z"]}; + const v2UpdateDesc = {updatedFields: {}, removedFields: ["z"], truncatedArrays: []}; + const updateDesc = reverseShards ? v1UpdateDesc : v2UpdateDesc; + + // Test out a predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: updateDesc}}, + [[op, 2, 0]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq predicate on 'updateDescription.updatedFields.f'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}}, + [[op, 3, 0]], + [1, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out an $lte predicate on 'updateDescription.updatedFields.f'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": {$lte: "b"}}}, + [[op, 3, 0]], + [1, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq predicate on 'updateDescription.updatedFields.g'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": "c"}}, + [[op, 2, 1]], + [0, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out an $exists predicate on 'updateDescription.updatedFields.g'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$exists: true}}}, + [[op, 2, 1]], + [0, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq predicate on 'updateDescription.updatedFields.x.j'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.x.j": 7}}, + [[op, 3, 0], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq predicate on 'updateDescription.updatedFields.0'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.0": "d"}}, + [[op, 3, 1]], + [0, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}}, + [[op, 2, 0], [op, 2, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out an array $eq predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": ["z"]}}, + [[op, 2, 0], [op, 2, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a dotted string $eq predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": "w.h"}}, + [[op, 3, 0], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a number-like string $eq predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": "1"}}, + [[op, 3, 0]], + [1, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields.0'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields.0": "z"}}, + [[op, 2, 0], [op, 2, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out an $in predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$in: ["y", "z"]}}}, + [[op, 2, 0], [op, 2, 1], [op, 3, 1]], + [1, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: {$not: {$eq: updateDesc}}}}, + [[op, 2, 1], [op, 3, 0], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $eq predicate on 'updateDescription.updatedFields.f'. + verifyOps( + {$match: {operationType: op, "updateDescription.updatedFields.f": {$not: {$eq: "b"}}}}, + [[op, 2, 0], [op, 2, 1], [op, 3, 1]], + [1, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on 'updateDescription.updatedFields.g'. + verifyOps( + {$match: {operationType: op, "updateDescription.updatedFields.g": {$not: {$exists: true}}}}, + [[op, 2, 0], [op, 3, 0], [op, 3, 1]], + [2, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $eq predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}}, + [[op, 3, 0], [op, 3, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $in predicate on 'updateDescription.removedFields'. + verifyOps( + {$match: {operationType: op, "updateDescription.removedFields": {$not: {$in: ["y", "z"]}}}}, + [[op, 3, 0]], + [1, 0] /* expectedOplogCursorReturnedDocs */); +} + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index 3c1df34ba33..c7c0231e252 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -425,6 +425,152 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument( return rewrittenPredicate; } +/** + * Rewrites filters on 'updateDescription' in a format that can be applied directly to the oplog. + * Returns nullptr if the predicate cannot be rewritten. + */ +std::unique_ptr<MatchExpression> matchRewriteUpdateDescription( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const PathMatchExpression* predicate, + bool allowInexact) { + tassert(5554500, "Unexpected empty predicate path", predicate->fieldRef()->numParts() > 0); + tassert(5554501, + str::stream() << "Unexpected predicate path: " << predicate->path(), + predicate->fieldRef()->getPart(0) == + DocumentSourceChangeStream::kUpdateDescriptionField); + + // Check that this is a non-replacement update, i.e. {op: "u", "o._id": {$exists: false}}. + auto rewrittenPredicate = std::make_unique<AndMatchExpression>(); + rewrittenPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd))); + rewrittenPredicate->add( + std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd))); + + // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can generate + // a rewritten predicate that matches exactly like so: + // + // {updateDescription.updatedFields.<fieldName>: <pred>} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.i.<fieldName>: <pred>}, + // {o.diff.u.<fieldName>: <pred>}, + // {o.$set.<fieldName>: <pred>} + // ]} + // ]} + if (predicate->fieldRef()->numParts() == 3 && + predicate->fieldRef()->getPart(1) == "updatedFields"_sd) { + // The oplog field corresponding to "updateDescription.updatedFields" can be in any one of + // three locations. Construct an $or filter to match against them all. + static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"}; + auto updatedFieldsOr = std::make_unique<OrMatchExpression>(); + for (auto&& oplogField : oplogFields) { + auto updateRewrite = predicate->shallowClone(); + static_cast<PathMatchExpression*>(updateRewrite.get()) + ->applyRename({{"updateDescription.updatedFields", oplogField}}); + updatedFieldsOr->add(std::move(updateRewrite)); + } + // Add the $or into the final rewritten predicate and return. + rewrittenPredicate->add(std::move(updatedFieldsOr)); + return rewrittenPredicate; + } + + // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can generate + // a rewritten predicate that matches exactly like so: + // + // {updateDescription.removedFields: {$eq: <fieldName>}} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.d.<fieldName>: {$exists: true}}, + // {o.$unset.<fieldName>: {$exists: true}} + // ]} + // ]} + // + // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.d.<fieldName1>: {$exists: true}}, + // {o.$unset.<fieldName1>: {$exists: true}}, + // {o.diff.d.<fieldName2>: {$exists: true}}, + // {o.$unset.<fieldName2>: {$exists: true}}, + // .. + // ]} + // ]} + if (predicate->fieldRef()->numParts() == 2 && + predicate->fieldRef()->getPart(1) == "removedFields"_sd) { + // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog. + auto rewriteEqualityForOplog = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> { + // We can only rewrite equality matches on strings. + if (rhsElem.type() != BSONType::String) { + return nullptr; + } + // We can only rewrite top-level fields, i.e. no dotted subpaths. + auto fieldName = rhsElem.str(); + if (FieldRef(fieldName).numParts() > 1) { + return nullptr; + } + // The oplog field corresponding to "updateDescription.removedFields" can be in either + // of two locations. Construct an $or filter to match against them both. + static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"}; + auto removedFieldsOr = std::make_unique<OrMatchExpression>(); + for (auto&& oplogField : oplogFields) { + removedFieldsOr->add( + std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName)); + } + return removedFieldsOr; + }; + + // We can only match against a limited number of predicates here, $eq and $in. + switch (predicate->matchType()) { + case MatchExpression::EQ: { + // Try to rewrite the predicate on "updateDescription.removedFields". + auto eqME = static_cast<const EqualityMatchExpression*>(predicate); + if (auto removedRewrite = rewriteEqualityForOplog(eqME->getData())) { + rewrittenPredicate->add(std::move(removedRewrite)); + return rewrittenPredicate; + } + break; + } + case MatchExpression::MATCH_IN: { + // If this $in includes any regexes, we can't proceed with the rewrite. + auto inME = static_cast<const InMatchExpression*>(predicate); + if (!inME->getRegexes().empty()) { + return nullptr; + } + // An empty '$in' should never match anything. + if (inME->getEqualities().empty()) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + // Try to rewrite the $in as an $or of equalities on the oplog. If any individual + // rewrite fails, we must abandon the entire rewrite. + auto inRemovedOr = std::make_unique<OrMatchExpression>(); + for (const auto& rhsElem : inME->getEqualities()) { + if (auto removedRewrite = rewriteEqualityForOplog(rhsElem)) { + inRemovedOr->add(std::move(removedRewrite)); + } else { + return nullptr; + } + } + // Add the rewritten $in to the final rewritten predicate and return. + rewrittenPredicate->add(std::move(inRemovedOr)); + return rewrittenPredicate; + } + default: + break; + } + } + + // If we reach here, we cannot perform a rewrite. + return nullptr; +} + // Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName", // coll: "collName"} into the oplog. @@ -938,6 +1084,7 @@ StringMap<MatchExpressionRewrite> matchRewriteRegistry = { {"operationType", matchRewriteOperationType}, {"documentKey", matchRewriteDocumentKey}, {"fullDocument", matchRewriteFullDocument}, + {"updateDescription", matchRewriteUpdateDescription}, {"ns", matchRewriteNs}, {"to", matchRewriteTo}}; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 4b7a5c2de87..2251e15e28e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -6534,5 +6534,208 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteToWithExprOnInvalidCollSubFieldPath) { fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}")); } +TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription) { + auto expCtx = getExpCtx(); + auto expr = fromjson( + "{updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: []}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUpdatedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields': {}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CanRewriteArbitraryPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields.foo': {$lt: 'b'}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$or: [" + " {'o.diff.i.foo': {$lt: 'b'}}," + " {'o.diff.u.foo': {$lt: 'b'}}," + " {'o.$set.foo': {$lt: 'b'}}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': 'b'}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': 'z'}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$or: [" + " {'o.diff.d.z': {$exists: true}}," + " {'o.$unset.z': {$exists: true}}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': 'u.v'}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': ['z']}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteStringInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', 'y']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$or: [" + " {$or: [" + " {'o.diff.d.w': {$exists: true}}," + " {'o.$unset.w': {$exists: true}}" + " ]}," + " {$or: [" + " {'o.diff.d.y': {$exists: true}}," + " {'o.$unset.y': {$exists: true}}" + " ]}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$in: []}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', ['y']]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$in: [/ab*c/, /de*f/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$lt: 'z'}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.truncatedArrays': []}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + } // namespace } // namespace mongo |