summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2021-12-15 18:39:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-15 19:20:47 +0000
commit577a111d0f31418b60b0b3ee999ffb7657943ea5 (patch)
treeb946c7346f83ddfa5ee4c32d30589b316d73254c /src/mongo/db/pipeline
parentbdc7d583377ce7fe57d002e55b18b5ef338bae1d (diff)
downloadmongo-577a111d0f31418b60b0b3ee999ffb7657943ea5.tar.gz
SERVER-62003 Fix change stream rewrite for 'fullDocument' null-equality on 'delete' and non-CRUD oplog entries
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp32
2 files changed, 58 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index f48ba981d8c..3dee9c53520 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -400,28 +400,47 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
// {$or: [{op: 'i'}, {op: 'u'}]},
// {o: <predicate>}
// ]},
+ // // The following predicates are only present if the predicate matches a missing field
+ // {op: "d"},
+ // {$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}
// ]}
auto rewrittenPredicate = std::make_unique<OrMatchExpression>();
+ // Handle the case of non-replacement update entries. For the general case, we cannot apply the
+ // predicate and must return all such events.
auto updateCase = std::make_unique<AndMatchExpression>();
updateCase->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
updateCase->add(
std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd)));
rewrittenPredicate->add(std::move(updateCase));
+ // Handle the case of insert and replacement entries. We can always apply the predicate in these
+ // cases, because the full document is present in the oplog.
auto insertOrReplaceCase = std::make_unique<AndMatchExpression>();
- auto orExpr = std::make_unique<OrMatchExpression>();
- orExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
- orExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
- insertOrReplaceCase->add(std::move(std::move(orExpr)));
+ auto insertOrReplaceOpFilter = std::make_unique<OrMatchExpression>();
+ insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
+ insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
+ insertOrReplaceCase->add(std::move(insertOrReplaceOpFilter));
- auto renamedExpr = predicate->shallowClone();
- static_cast<PathMatchExpression*>(renamedExpr.get())->applyRename({{"fullDocument", "o"}});
- insertOrReplaceCase->add(std::move(std::move(renamedExpr)));
+ auto predForInsertOrReplace = predicate->shallowClone();
+ static_cast<PathMatchExpression*>(predForInsertOrReplace.get())
+ ->applyRename({{"fullDocument", "o"}});
+ insertOrReplaceCase->add(std::move(predForInsertOrReplace));
rewrittenPredicate->add(std::move(insertOrReplaceCase));
+ // Handle the case of delete and non-CRUD events. The 'fullDocument' field never exists for such
+ // events, so we evaluate the predicate against a non-existent field to see whether it matches.
+ if (predicate->matchesSingleElement({})) {
+ auto deleteCase = std::make_unique<EqualityMatchExpression>("op"_sd, Value("d"_sd));
+ rewrittenPredicate->add(std::move(deleteCase));
+
+ auto nonCRUDCase = MatchExpressionParser::parseAndNormalize(
+ fromjson("{$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}"), expCtx);
+ rewrittenPredicate->add(std::move(nonCRUDCase));
+ }
+
return rewrittenPredicate;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index f18b2622a5f..5dcb4550ba3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -4747,6 +4747,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentF
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteNullComparisonPredicateOnFieldFullDocumentFoo) {
+ auto spec = fromjson("{'fullDocument.foo': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"fullDocument"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that the filter below includes a predicate on delete and non-CRUD events. These are only
+ // present when the user's predicate matches a non-existent field. This is because these change
+ // events never have a 'fullDocument' field, and so we either match all such events in the oplog
+ // or none of them, depending on how the predicate evaluates against a missing field.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ " ]},"
+ " {$and: ["
+ " {$or: ["
+ " {op: {$eq: 'i'}},"
+ " {op: {$eq: 'u'}}"
+ " ]},"
+ " {'o.foo': {$eq: null}}"
+ " ]},"
+ " {op: {$eq: 'd'}},"
+ " {$nor: [{op: {$eq: 'i'}}, {op: {$eq: 'u'}}, {op: {$eq: 'd'}}]}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocumentFoo) {
auto spec = fromjson("{'fullDocument.foo': {$not: {$eq: 'bar'}}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());