summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2021-09-17 20:39:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-02 18:27:23 +0000
commit9dd93503190332d7518634bb49bde9a8edb93244 (patch)
tree06d12d88e18da673ca85ea5e65532698e7385a9c
parentba5cf359d87e3fb3b32d24056e0da1b8452ac304 (diff)
downloadmongo-9dd93503190332d7518634bb49bde9a8edb93244.tar.gz
SERVER-55545 Add support for change stream $match pushdown for the "updateDescription" field
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js234
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp147
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp203
3 files changed, 584 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
new file mode 100644
index 00000000000..13fbbad16b5
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
@@ -0,0 +1,234 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate
+// involving the 'updateDescription' field can push down the $match and rewrite the $match and make
+// it part of the oplog cursor's filter in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_updateDescription_rewrite";
+const collNameBase = "change_stream_match_pushdown_updateDescription_rewrite";
+
+// Start a new 2-shard cluster. One shard will always write v1 update oplog entries, the other v2.
+const st = new ShardingTest({
+ shards: [
+ {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
+ {
+ nodes: 1,
+ setParameter: {
+ writePeriodicNoops: true,
+ periodicNoopIntervalSecs: 1,
+ internalQueryEnableLoggingV2OplogEntries: false
+ }
+ }
+ ]
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// To maximize coverage, we perform these tests twice and reverse the order of shards on which each
+// update is performed for each run. This ensures that all updates are tested for both v1 and v2
+// oplog formats.
+for (const reverseShards of [false, true]) {
+ const s0 = reverseShards ? 1 : 0;
+ const s1 = reverseShards ? 0 : 1;
+ const collName = collNameBase + s0 + s1;
+
+ // Returns a newly created sharded collection, where shard key is 'shard'.
+ const coll =
+ createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */);
+
+ // Open a change stream and store the resume token. This resume token will be used to replay the
+ // stream after this point.
+ const resumeAfterToken = coll.watch([]).getResumeToken();
+
+ // A helper that opens a change stream with the user supplied match expression 'userMatchExpr'
+ // and validates that: (1) for each shard, the events are seen in that order as specified in
+ // 'expectedOps'; and (2) each shard returns the expected number of events; and (3) the number
+ // of docs returned by the oplog cursor on each shard matches what we expect
+ // as specified in 'expectedOplogCursorReturnedDocs'.
+ const verifyOps = function(userMatchExpr, expectedOps, expectedOplogCursorReturnedDocs) {
+ const cursor =
+ coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]);
+
+ let expectedChangeStreamDocsReturned = [0, 0];
+ for (const [op, id, s] of expectedOps) {
+ const shardId = s == 0 ? s0 : s == 1 ? s1 : s;
+
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+
+ assert.eq(event.operationType, op, event);
+ assert.eq(event.documentKey._id, id, event);
+ assert.eq(event.documentKey.shard, shardId, event);
+ if (shardId == 0 || shardId == 1) {
+ ++expectedChangeStreamDocsReturned[shardId];
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ // An 'executionStats' could only be captured for a non-invalidating stream.
+ const stats = coll.explain("executionStats").aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken}},
+ userMatchExpr
+ ]);
+
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs0.name, expectedChangeStreamDocsReturned[0]);
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs1.name, expectedChangeStreamDocsReturned[1]);
+
+ assertNumMatchingOplogEventsForShard(
+ stats, st.rs0.name, expectedOplogCursorReturnedDocs[s0]);
+ assertNumMatchingOplogEventsForShard(
+ stats, st.rs1.name, expectedOplogCursorReturnedDocs[s1]);
+ };
+
+ // These operations will create oplog events. The change stream will apply several filters
+ // on these series of events and ensure that the '$match' expressions are rewritten
+ // correctly.
+ assert.commandWorked(coll.insert({_id: 2, shard: s0}));
+ assert.commandWorked(coll.insert({_id: 2, shard: s1}));
+ assert.commandWorked(coll.insert({_id: 3, shard: s0}));
+ assert.commandWorked(coll.insert({_id: 3, shard: s1}));
+
+ assert.commandWorked(coll.replaceOne({_id: 2, shard: s0},
+ {_id: 2, shard: s0, z: 4, f: "a", w: {h: 5, k: 5, l: 5}}));
+ assert.commandWorked(coll.replaceOne({_id: 2, shard: s1},
+ {_id: 2, shard: s1, z: 4, f: "a", w: {h: 5, k: 5, l: 5}}));
+ assert.commandWorked(coll.replaceOne({_id: 3, shard: s0},
+ {_id: 3, shard: s0, 1: 4, f: "a", w: {h: 5, k: 5, l: 5}}));
+ assert.commandWorked(coll.replaceOne({_id: 3, shard: s1},
+ {_id: 3, shard: s1, y: 4, f: "a", w: {h: 5, k: 5, l: 5}}));
+
+ assert.commandWorked(reverseShards ? coll.update({_id: 2, shard: s0}, {$unset: {z: 0}})
+ : coll.update({_id: 2, shard: s0}, [{$unset: ["z"]}]));
+ assert.commandWorked(reverseShards
+ ? coll.update({_id: 2, shard: s1}, [{$set: {g: "c"}}, {$unset: "z"}])
+ : coll.update({_id: 2, shard: s1}, {$set: {g: "c"}, $unset: {z: 0}}));
+ assert.commandWorked(
+ coll.update({_id: 3, shard: s0}, {$set: {f: "b", x: {j: 7}}, $unset: {"w.h": 0, 1: 0}}));
+ assert.commandWorked(
+ coll.update({_id: 3, shard: s1}, {$set: {"0": "d", x: {j: 7}}, $unset: {y: 0, "w.h": 0}}));
+
+ assert.commandWorked(coll.deleteOne({_id: 2, shard: s0}));
+ assert.commandWorked(coll.deleteOne({_id: 2, shard: s1}));
+ assert.commandWorked(coll.deleteOne({_id: 3, shard: s0}));
+ assert.commandWorked(coll.deleteOne({_id: 3, shard: s1}));
+
+ // Ensure that the '$match' on the 'update' operation type with various predicates are rewritten
+ // correctly.
+ const op = "update";
+
+ const v1UpdateDesc = {updatedFields: {}, removedFields: ["z"]};
+ const v2UpdateDesc = {updatedFields: {}, removedFields: ["z"], truncatedArrays: []};
+ const updateDesc = reverseShards ? v1UpdateDesc : v2UpdateDesc;
+
+ // Test out a predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: updateDesc}},
+ [[op, 2, 0]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq predicate on 'updateDescription.updatedFields.f'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}},
+ [[op, 3, 0]],
+ [1, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $lte predicate on 'updateDescription.updatedFields.f'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": {$lte: "b"}}},
+ [[op, 3, 0]],
+ [1, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq predicate on 'updateDescription.updatedFields.g'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": "c"}},
+ [[op, 2, 1]],
+ [0, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $exists predicate on 'updateDescription.updatedFields.g'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$exists: true}}},
+ [[op, 2, 1]],
+ [0, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq predicate on 'updateDescription.updatedFields.x.j'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.x.j": 7}},
+ [[op, 3, 0], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq predicate on 'updateDescription.updatedFields.0'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.0": "d"}},
+ [[op, 3, 1]],
+ [0, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}},
+ [[op, 2, 0], [op, 2, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an array $eq predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": ["z"]}},
+ [[op, 2, 0], [op, 2, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a dotted string $eq predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": "w.h"}},
+ [[op, 3, 0], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a number-like string $eq predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": "1"}},
+ [[op, 3, 0]],
+ [1, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields.0'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields.0": "z"}},
+ [[op, 2, 0], [op, 2, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $in predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$in: ["y", "z"]}}},
+ [[op, 2, 0], [op, 2, 1], [op, 3, 1]],
+ [1, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: {$not: {$eq: updateDesc}}}},
+ [[op, 2, 1], [op, 3, 0], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $eq predicate on 'updateDescription.updatedFields.f'.
+ verifyOps(
+ {$match: {operationType: op, "updateDescription.updatedFields.f": {$not: {$eq: "b"}}}},
+ [[op, 2, 0], [op, 2, 1], [op, 3, 1]],
+ [1, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on 'updateDescription.updatedFields.g'.
+ verifyOps(
+ {$match: {operationType: op, "updateDescription.updatedFields.g": {$not: {$exists: true}}}},
+ [[op, 2, 0], [op, 3, 0], [op, 3, 1]],
+ [2, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $eq predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}},
+ [[op, 3, 0], [op, 3, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $in predicate on 'updateDescription.removedFields'.
+ verifyOps(
+ {$match: {operationType: op, "updateDescription.removedFields": {$not: {$in: ["y", "z"]}}}},
+ [[op, 3, 0]],
+ [1, 0] /* expectedOplogCursorReturnedDocs */);
+}
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index 3c1df34ba33..c7c0231e252 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -425,6 +425,152 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
return rewrittenPredicate;
}
+/**
+ * Rewrites filters on 'updateDescription' in a format that can be applied directly to the oplog.
+ * Returns nullptr if the predicate cannot be rewritten.
+ */
+std::unique_ptr<MatchExpression> matchRewriteUpdateDescription(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const PathMatchExpression* predicate,
+ bool allowInexact) {
+ tassert(5554500, "Unexpected empty predicate path", predicate->fieldRef()->numParts() > 0);
+ tassert(5554501,
+ str::stream() << "Unexpected predicate path: " << predicate->path(),
+ predicate->fieldRef()->getPart(0) ==
+ DocumentSourceChangeStream::kUpdateDescriptionField);
+
+ // Check that this is a non-replacement update, i.e. {op: "u", "o._id": {$exists: false}}.
+ auto rewrittenPredicate = std::make_unique<AndMatchExpression>();
+ rewrittenPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
+ rewrittenPredicate->add(
+ std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd)));
+
+ // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can generate
+ // a rewritten predicate that matches exactly like so:
+ //
+ // {updateDescription.updatedFields.<fieldName>: <pred>}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.i.<fieldName>: <pred>},
+ // {o.diff.u.<fieldName>: <pred>},
+ // {o.$set.<fieldName>: <pred>}
+ // ]}
+ // ]}
+ if (predicate->fieldRef()->numParts() == 3 &&
+ predicate->fieldRef()->getPart(1) == "updatedFields"_sd) {
+ // The oplog field corresponding to "updateDescription.updatedFields" can be in any one of
+ // three locations. Construct an $or filter to match against them all.
+ static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"};
+ auto updatedFieldsOr = std::make_unique<OrMatchExpression>();
+ for (auto&& oplogField : oplogFields) {
+ auto updateRewrite = predicate->shallowClone();
+ static_cast<PathMatchExpression*>(updateRewrite.get())
+ ->applyRename({{"updateDescription.updatedFields", oplogField}});
+ updatedFieldsOr->add(std::move(updateRewrite));
+ }
+ // Add the $or into the final rewritten predicate and return.
+ rewrittenPredicate->add(std::move(updatedFieldsOr));
+ return rewrittenPredicate;
+ }
+
+ // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can generate
+ // a rewritten predicate that matches exactly like so:
+ //
+ // {updateDescription.removedFields: {$eq: <fieldName>}}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.d.<fieldName>: {$exists: true}},
+ // {o.$unset.<fieldName>: {$exists: true}}
+ // ]}
+ // ]}
+ //
+ // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.d.<fieldName1>: {$exists: true}},
+ // {o.$unset.<fieldName1>: {$exists: true}},
+ // {o.diff.d.<fieldName2>: {$exists: true}},
+ // {o.$unset.<fieldName2>: {$exists: true}},
+ // ..
+ // ]}
+ // ]}
+ if (predicate->fieldRef()->numParts() == 2 &&
+ predicate->fieldRef()->getPart(1) == "removedFields"_sd) {
+ // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog.
+ auto rewriteEqualityForOplog = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> {
+ // We can only rewrite equality matches on strings.
+ if (rhsElem.type() != BSONType::String) {
+ return nullptr;
+ }
+ // We can only rewrite top-level fields, i.e. no dotted subpaths.
+ auto fieldName = rhsElem.str();
+ if (FieldRef(fieldName).numParts() > 1) {
+ return nullptr;
+ }
+ // The oplog field corresponding to "updateDescription.removedFields" can be in either
+ // of two locations. Construct an $or filter to match against them both.
+ static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"};
+ auto removedFieldsOr = std::make_unique<OrMatchExpression>();
+ for (auto&& oplogField : oplogFields) {
+ removedFieldsOr->add(
+ std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName));
+ }
+ return removedFieldsOr;
+ };
+
+ // We can only match against a limited number of predicates here, $eq and $in.
+ switch (predicate->matchType()) {
+ case MatchExpression::EQ: {
+ // Try to rewrite the predicate on "updateDescription.removedFields".
+ auto eqME = static_cast<const EqualityMatchExpression*>(predicate);
+ if (auto removedRewrite = rewriteEqualityForOplog(eqME->getData())) {
+ rewrittenPredicate->add(std::move(removedRewrite));
+ return rewrittenPredicate;
+ }
+ break;
+ }
+ case MatchExpression::MATCH_IN: {
+ // If this $in includes any regexes, we can't proceed with the rewrite.
+ auto inME = static_cast<const InMatchExpression*>(predicate);
+ if (!inME->getRegexes().empty()) {
+ return nullptr;
+ }
+ // An empty '$in' should never match anything.
+ if (inME->getEqualities().empty()) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+ // Try to rewrite the $in as an $or of equalities on the oplog. If any individual
+ // rewrite fails, we must abandon the entire rewrite.
+ auto inRemovedOr = std::make_unique<OrMatchExpression>();
+ for (const auto& rhsElem : inME->getEqualities()) {
+ if (auto removedRewrite = rewriteEqualityForOplog(rhsElem)) {
+ inRemovedOr->add(std::move(removedRewrite));
+ } else {
+ return nullptr;
+ }
+ }
+ // Add the rewritten $in to the final rewritten predicate and return.
+ rewrittenPredicate->add(std::move(inRemovedOr));
+ return rewrittenPredicate;
+ }
+ default:
+ break;
+ }
+ }
+
+ // If we reach here, we cannot perform a rewrite.
+ return nullptr;
+}
+
// Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName",
// coll: "collName"} into the oplog.
@@ -938,6 +1084,7 @@ StringMap<MatchExpressionRewrite> matchRewriteRegistry = {
{"operationType", matchRewriteOperationType},
{"documentKey", matchRewriteDocumentKey},
{"fullDocument", matchRewriteFullDocument},
+ {"updateDescription", matchRewriteUpdateDescription},
{"ns", matchRewriteNs},
{"to", matchRewriteTo}};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 4b7a5c2de87..2251e15e28e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -6534,5 +6534,208 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteToWithExprOnInvalidCollSubFieldPath) {
fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}"));
}
+TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson(
+ "{updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: []}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUpdatedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields': {}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CanRewriteArbitraryPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields.foo': {$lt: 'b'}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$or: ["
+ " {'o.diff.i.foo': {$lt: 'b'}},"
+ " {'o.diff.u.foo': {$lt: 'b'}},"
+ " {'o.$set.foo': {$lt: 'b'}}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': 'b'}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': 'z'}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$or: ["
+ " {'o.diff.d.z': {$exists: true}},"
+ " {'o.$unset.z': {$exists: true}}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': 'u.v'}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': ['z']}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteStringInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', 'y']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$or: ["
+ " {$or: ["
+ " {'o.diff.d.w': {$exists: true}},"
+ " {'o.$unset.w': {$exists: true}}"
+ " ]},"
+ " {$or: ["
+ " {'o.diff.d.y': {$exists: true}},"
+ " {'o.$unset.y': {$exists: true}}"
+ " ]}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$in: []}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', ['y']]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$in: [/ab*c/, /de*f/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$lt: 'z'}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.truncatedArrays': []}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
} // namespace
} // namespace mongo