summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2021-12-15 18:39:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-15 19:20:47 +0000
commit577a111d0f31418b60b0b3ee999ffb7657943ea5 (patch)
treeb946c7346f83ddfa5ee4c32d30589b316d73254c
parentbdc7d583377ce7fe57d002e55b18b5ef338bae1d (diff)
downloadmongo-577a111d0f31418b60b0b3ee999ffb7657943ea5.tar.gz
SERVER-62003 Fix change stream rewrite for 'fullDocument' null-equality on 'delete' and non-CRUD oplog entries
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js158
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp32
3 files changed, 201 insertions, 22 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js
index 35986a01d95..371ac35b4e4 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js
@@ -26,6 +26,7 @@ const st = new ShardingTest({
// Create a sharded collection where shard key is 'shard'.
const coll = createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */);
+const testDB = st.s.getDB(dbName);
// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and
// validates that:
@@ -38,8 +39,9 @@ function verifyOps(resumeAfterToken,
userMatchExpr,
expectedOps,
expectedChangeStreamDocsReturned,
- expectedOplogCursorReturnedDocs) {
- const cursor = coll.aggregate([
+ expectedOplogCursorReturnedDocs,
+ runOnWholeDB) {
+ const cursor = (runOnWholeDB ? testDB : coll).aggregate([
{$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}},
userMatchExpr
]);
@@ -59,10 +61,17 @@ function verifyOps(resumeAfterToken,
assert(!cursor.hasNext());
// An 'executionStats' could only be captured for a non-invalidating stream.
- const stats = coll.explain("executionStats").aggregate([
- {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}},
- userMatchExpr
- ]);
+ const stats = assert.commandWorked(testDB.runCommand({
+ explain: {
+ aggregate: (runOnWholeDB ? 1 : coll.getName()),
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}},
+ userMatchExpr
+ ],
+ cursor: {}
+ },
+ verbosity: "executionStats"
+ }));
assertNumChangeStreamDocsReturnedFromShard(
stats, st.rs0.name, expectedChangeStreamDocsReturned[0]);
@@ -106,20 +115,121 @@ const runDeleteOps = () => {
const runVerifyOpsTestcases = (op) => {
// 'delete' operations don't have a 'fullDocument' field, so we handle them as a special case.
if (op == "delete") {
- // Test out the '{$exists: true}' predicate on the full 'fullDocument' field.
+ // The 'delete' event never has a 'fullDocument' field, so we expect the same results
+ // whether we are filtering on the field itself or a subfield.
+ for (let fullDocumentPath of ["fullDocument", "fullDocument._id", "fullDocument.shard"]) {
+ jsTestLog("Testing path '" + fullDocumentPath + "' for 'delete' events");
+
+ // Test out the '{$exists: true}' predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$exists: true}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$exists: false}' predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$exists: false}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$eq: null}' predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$eq: null}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$ne: null}' predicate on the 'fullDocument' field. We cannot perform
+ // an exact rewrite of this negated predicate on 'fullDocument', so the oplog scan
+ // returns all 'delete' events and we subsequently filter them out in the pipeline.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$ne: null}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an inequality on null for the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$gt: null}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated inequality on null for the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$not: {$gt: null}}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // We expect the same results for $lte as we got for {$not: {$gt}}, although we can
+ // rewrite this predicate into the oplog.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$lte: null}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test that {$type: 'null'} on the 'fullDocument' field does not match.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$type: "null"}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test that negated {$type: 'null'} on the 'fullDocument' field matches.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$not: {$type: "null"}}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a non-null non-$exists predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$eq: 5}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated non-null non-$exists predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, [fullDocumentPath]: {$ne: 5}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+ }
+ return;
+ }
+
+ // Non-CRUD events don't have a 'fullDocument' field, so test 'drop' separately. We run these
+ // tests on the whole DB because otherwise the stream will be invalidated, and it's impossible
+ // to tell whether we will see one drop or two, or from which shard.
+ if (op == "drop") {
+ // Test that {$eq: null} on the 'fullDocument' field matches the 'drop' event.
verifyOps(resumeAfterToken,
- {$match: {operationType: op, fullDocument: {$exists: true}}},
- [],
- [0, 0] /* expectedChangeStreamDocsReturned */,
- [0, 0] /* expectedOplogCursorReturnedDocs */);
+ {$match: {operationType: op, fullDocument: {$eq: null}}},
+ [[op], [op]],
+ [1, 1] /* expectedChangeStreamDocsReturned */,
+ [1, 1] /* expectedOplogCursorReturnedDocs */,
+ true /* runOnWholeDB */);
- // Test out the '{$exists: false}' predicate on the full 'fullDocument' field.
+ // Test that {$exists: false} on the 'fullDocument' field matches the 'drop' event.
verifyOps(resumeAfterToken,
{$match: {operationType: op, fullDocument: {$exists: false}}},
- [[op], [op], [op], [op]],
- [2, 2] /* expectedChangeStreamDocsReturned */,
- [2, 2] /* expectedOplogCursorReturnedDocs */);
+ [[op], [op]],
+ [1, 1] /* expectedChangeStreamDocsReturned */,
+ [1, 1] /* expectedOplogCursorReturnedDocs */,
+ true /* runOnWholeDB */);
+ // Test that {$exists: true} on the 'fullDocument' field does not match the 'drop' event.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$exists: true}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */,
+ true /* runOnWholeDB */);
return;
}
@@ -191,6 +301,20 @@ const runVerifyOpsTestcases = (op) => {
[],
[0, 0] /* expectedChangeStreamDocsReturned */,
[2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$eq: null}' predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$eq: null}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ op != "update" ? [0, 0] : [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$ne: null}' predicate on the 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$ne: null}}},
+ [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
};
// Verify '$match's on the 'update' operation type with various predicates get rewritten correctly.
@@ -207,5 +331,9 @@ runVerifyOpsTestcases("insert");
runVerifyOpsTestcases("replace");
runVerifyOpsTestcases("delete");
+// Now drop the collection and verify that we see the 'drop' event with no 'fullDocument'.
+assert(coll.drop());
+runVerifyOpsTestcases("drop");
+
st.stop();
})();
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index f48ba981d8c..3dee9c53520 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -400,28 +400,47 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
// {$or: [{op: 'i'}, {op: 'u'}]},
// {o: <predicate>}
// ]},
+ // // The following predicates are only present if the predicate matches a missing field
+ // {op: "d"},
+ // {$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}
// ]}
auto rewrittenPredicate = std::make_unique<OrMatchExpression>();
+ // Handle the case of non-replacement update entries. For the general case, we cannot apply the
+ // predicate and must return all such events.
auto updateCase = std::make_unique<AndMatchExpression>();
updateCase->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
updateCase->add(
std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd)));
rewrittenPredicate->add(std::move(updateCase));
+ // Handle the case of insert and replacement entries. We can always apply the predicate in these
+ // cases, because the full document is present in the oplog.
auto insertOrReplaceCase = std::make_unique<AndMatchExpression>();
- auto orExpr = std::make_unique<OrMatchExpression>();
- orExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
- orExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
- insertOrReplaceCase->add(std::move(std::move(orExpr)));
+ auto insertOrReplaceOpFilter = std::make_unique<OrMatchExpression>();
+ insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
+ insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
+ insertOrReplaceCase->add(std::move(insertOrReplaceOpFilter));
- auto renamedExpr = predicate->shallowClone();
- static_cast<PathMatchExpression*>(renamedExpr.get())->applyRename({{"fullDocument", "o"}});
- insertOrReplaceCase->add(std::move(std::move(renamedExpr)));
+ auto predForInsertOrReplace = predicate->shallowClone();
+ static_cast<PathMatchExpression*>(predForInsertOrReplace.get())
+ ->applyRename({{"fullDocument", "o"}});
+ insertOrReplaceCase->add(std::move(predForInsertOrReplace));
rewrittenPredicate->add(std::move(insertOrReplaceCase));
+ // Handle the case of delete and non-CRUD events. The 'fullDocument' field never exists for such
+ // events, so we evaluate the predicate against a non-existent field to see whether it matches.
+ if (predicate->matchesSingleElement({})) {
+ auto deleteCase = std::make_unique<EqualityMatchExpression>("op"_sd, Value("d"_sd));
+ rewrittenPredicate->add(std::move(deleteCase));
+
+ auto nonCRUDCase = MatchExpressionParser::parseAndNormalize(
+ fromjson("{$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}"), expCtx);
+ rewrittenPredicate->add(std::move(nonCRUDCase));
+ }
+
return rewrittenPredicate;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index f18b2622a5f..5dcb4550ba3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -4747,6 +4747,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentF
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteNullComparisonPredicateOnFieldFullDocumentFoo) {
+ auto spec = fromjson("{'fullDocument.foo': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"fullDocument"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that the filter below includes a predicate on delete and non-CRUD events. These are only
+ // present when the user's predicate matches a non-existent field. This is because these change
+ // events never have a 'fullDocument' field, and so we either match all such events in the oplog
+ // or none of them, depending on how the predicate evaluates against a missing field.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ " ]},"
+ " {$and: ["
+ " {$or: ["
+ " {op: {$eq: 'i'}},"
+ " {op: {$eq: 'u'}}"
+ " ]},"
+ " {'o.foo': {$eq: null}}"
+ " ]},"
+ " {op: {$eq: 'd'}},"
+ " {$nor: [{op: {$eq: 'i'}}, {op: {$eq: 'u'}}, {op: {$eq: 'd'}}]}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocumentFoo) {
auto spec = fromjson("{'fullDocument.foo': {$not: {$eq: 'bar'}}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());