summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-12-13 20:54:06 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-02-03 15:06:46 +0000
commit065f3ef77de57609d92fce482c1e4e36b15cf29c (patch)
tree52a3735f31b7b5632e7e04d4ed8657f75e12c1f9 /jstests/change_streams
parentfbb9daeb791d16c49b861d82c097cf9bd9daf07e (diff)
downloadmongo-065f3ef77de57609d92fce482c1e4e36b15cf29c.tar.gz
SERVER-37786 Reject change stream pipelines which modify or project-out the resume token
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/collation.js9
-rw-r--r--jstests/change_streams/only_wake_getmore_for_relevant_changes.js2
-rw-r--r--jstests/change_streams/pipeline_cannot_modify_id_field.js143
-rw-r--r--jstests/change_streams/shell_helper.js28
4 files changed, 158 insertions, 24 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js
index 86c7563c339..77d345dc0b1 100644
--- a/jstests/change_streams/collation.js
+++ b/jstests/change_streams/collation.js
@@ -217,18 +217,15 @@
assertDropAndRecreateCollection(db, "change_stream_no_collation");
const cursor = noCollationCollection.watch(
- [
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id", _id: 0}}
- ],
+ [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}],
{collation: caseInsensitive});
assert(!cursor.hasNext());
assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
assert.soon(() => cursor.hasNext());
- assert.docEq(cursor.next(), {docId: 0});
+ assertChangeStreamEventEq(cursor.next(), {docId: 0});
assert.soon(() => cursor.hasNext());
- assert.docEq(cursor.next(), {docId: 1});
+ assertChangeStreamEventEq(cursor.next(), {docId: 1});
assert(!cursor.hasNext());
}());
diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
index bdd45a4d1f4..63be5783fa5 100644
--- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
@@ -116,7 +116,7 @@ eventFn();`,
let res = assert.commandWorked(db.runCommand({
aggregate: changesCollection.getName(),
// Project out the resume token, since that's subject to change unpredictably.
- pipeline: [{$changeStream: {}}, {$project: {"_id": 0}}],
+ pipeline: [{$changeStream: {}}],
cursor: {},
comment: wholeCollectionStreamComment
}));
diff --git a/jstests/change_streams/pipeline_cannot_modify_id_field.js b/jstests/change_streams/pipeline_cannot_modify_id_field.js
new file mode 100644
index 00000000000..49a0ce42d33
--- /dev/null
+++ b/jstests/change_streams/pipeline_cannot_modify_id_field.js
@@ -0,0 +1,143 @@
+/**
+ * Tests that stages which modify or remove the _id field are not permitted to run in a
+ * $changeStream pipeline.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ const coll = assertDropAndRecreateCollection(db, jsTestName());
+
+ // Bare-bones $changeStream pipeline which will be augmented during tests.
+ const changeStream = [{$changeStream: {}}];
+
+ // Test-cases of transformations that modify or remove _id, and are thus disallowed.
+ const idModifyingTransformations = [
+ {$project: {_id: 0}},
+ {$project: {_id: "newValue"}},
+ {$project: {_id: "$otherField"}},
+ {$project: {_id: 0, otherField: 0}},
+ {$project: {_id: 0, otherField: 1}},
+ {$project: {"_id._data": 0}},
+ {$project: {"_id._data": 1}},
+ {$project: {"_id._data": "newValue"}},
+ {$project: {"_id._data": "$_id._data"}}, // Disallowed because it discards _typeBits.
+ {$project: {"_id._data": "$otherField"}},
+ {$project: {"_id.otherField": 1}},
+ {$project: {"_id._typeBits": 0}},
+ [
+ {$project: {otherField: "$_id"}},
+ {$project: {otherField: 0}},
+ {$project: {_id: "$otherField"}}
+ ],
+ {$project: {_id: {data: "$_id._data", typeBits: "$_id._typeBits"}}}, // Fields renamed.
+ {$project: {_id: {_typeBits: "$_id._typeBits", _data: "$_id._data"}}}, // Fields reordered.
+ {$project: {_id: {_data: "$_id._typeBits", _typeBits: "$_id._data"}}}, // Fields swapped.
+ {$addFields: {_id: "newValue"}},
+ {$addFields: {_id: "$otherField"}},
+ {$addFields: {"_id._data": "newValue"}},
+ {$addFields: {"_id._data": "$otherField"}},
+ {$addFields: {"_id.otherField": "newValue"}}, // New subfield added to _id.
+ [
+ {$addFields: {otherField: "$_id"}},
+ {$addFields: {otherField: "newValue"}},
+ {$addFields: {_id: "$otherField"}}
+ ],
+ [
+ // Fields renamed.
+ {$addFields: {newId: {data: "$_id._data", typeBits: "$_id._typeBits"}}},
+ {$addFields: {_id: "$newId"}}
+ ],
+ [
+ // Fields reordered.
+ {$addFields: {newId: {_typeBits: "$_id._typeBits", _data: "$_id._data"}}},
+ {$addFields: {_id: "$newId"}}
+ ],
+ [
+ // Fields swapped.
+ {$addFields: {newId: {_data: "$_id._typeBits", _typeBits: "$_id._data"}}},
+ {$addFields: {_id: "$newId"}}
+ ],
+ {$replaceRoot: {newRoot: {otherField: "$_id"}}},
+ {$redact: {$cond: {if: {$gt: ["$_id", {}]}, then: "$$DESCEND", else: "$$PRUNE"}}} // _id:0
+ ];
+
+ // Test-cases of projections which are allowed: explicit inclusion of _id, implicit inclusion of
+ // _id, renames which retain the full _id field, exclusion of unrelated fields, addition of and
+ // modifications to unrelated fields, sequential renames which ultimately preserve _id, etc.
+ const idPreservingTransformations = [
+ {$project: {_id: 1}},
+ {$project: {_id: 1, otherField: 0}},
+ {$project: {_id: 1, otherField: 1}},
+ {$project: {_id: "$_id", otherField: 1}},
+ {$project: {"_id.otherField": 0}},
+ {$project: {otherField: 1}},
+ {$project: {otherField: 0}},
+ {$project: {otherField: "$_id"}},
+ [
+ {$project: {otherField: "$_id"}},
+ {$project: {otherField: 1}},
+ {$project: {_id: "$otherField"}}
+ ],
+ {$project: {"_id._data": 1, "_id._typeBits": 1}},
+ {$project: {_id: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}},
+ {$addFields: {_id: "$_id"}},
+ {$addFields: {otherField: "newValue"}},
+ {$addFields: {_id: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}},
+ [{$addFields: {otherField: "$_id"}}, {$addFields: {_id: "$otherField"}}],
+ [
+ {$addFields: {newId: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}},
+ {$addFields: {_id: "$newId"}}
+ ],
+ {$replaceRoot: {newRoot: {_id: "$_id"}}},
+ {
+ $redact: {
+ $cond: {
+ if: {
+ $or: [
+ // Keeps _id, descends into fullDocument.
+ {$not: {$isArray: "$tags"}},
+ {$gt: [{$size: {$setIntersection: ["$tags", ["USA"]]}}, 0]}
+ ]
+ },
+ then: "$$DESCEND",
+ else: "$$PRUNE"
+ }
+ }
+ },
+ {$redact: "$$DESCEND"}, // Descends through entire document, retaining all of it.
+ {$redact: "$$KEEP"} // Keeps entire document.
+ ];
+
+ let docId = 0;
+
+ // Verify that each of the whitelisted transformations above succeeds.
+ for (let transform of idPreservingTransformations) {
+ const cmdRes = assert.commandWorked(
+ db.runCommand(
+ {aggregate: coll.getName(), pipeline: changeStream.concat(transform), cursor: {}}),
+ transform);
+ assert.commandWorked(coll.insert({_id: docId++}));
+ assert.soon(() => {
+ const getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cmdRes.cursor.id, collection: coll.getName()}), transform);
+ return getMoreRes.cursor.nextBatch.length > 0;
+ }, transform);
+ }
+
+ // Verify that each of the blacklisted transformations above are rejected.
+ for (let transform of idModifyingTransformations) {
+ const cmdRes = assert.commandWorked(
+ db.runCommand(
+ {aggregate: coll.getName(), pipeline: changeStream.concat(transform), cursor: {}}),
+ transform);
+ assert.commandWorked(coll.insert({_id: docId++}));
+ assert.soon(() => {
+ const getMoreRes =
+ db.runCommand({getMore: cmdRes.cursor.id, collection: coll.getName()});
+ return !getMoreRes.ok &&
+ assert.commandFailedWithCode(getMoreRes, [51059, 51060], transform);
+ }, transform);
+ }
+}()); \ No newline at end of file
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index 2b7c485b1c7..f63eba06df7 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -13,13 +13,8 @@
function checkNextChange(cursor, expected) {
assert.soon(() => cursor.hasNext());
const nextObj = cursor.next();
- const originalObj = Object.assign({}, nextObj);
-
- delete nextObj._id;
- delete nextObj.clusterTime;
- assert.docEq(nextObj, expected);
-
- return originalObj;
+ assertChangeStreamEventEq(nextObj, expected);
+ return nextObj;
}
function testCommandIsCalled(testFunc, checkFunc) {
@@ -75,8 +70,7 @@
assert.docEq(change, expected);
jsTestLog("Testing watch() with pipeline");
- changeStreamCursor =
- coll.watch([{$project: {_id: 0, clusterTime: 1, docId: "$documentKey._id"}}]);
+ changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]);
// Store the cluster time of the insert as the timestamp to start from.
const resumeTime =
@@ -94,21 +88,21 @@
jsTestLog("Testing watch() with pipeline and resumeAfter");
changeStreamCursor =
- coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with pipeline and startAfter");
changeStreamCursor =
- coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken});
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken});
checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with pipeline and startAtOperationTime");
- changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {startAtOperationTime: resumeTime});
+ changeStreamCursor =
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime});
checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with updateLookup");
- changeStreamCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
+ changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"});
assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
expected = {
@@ -131,9 +125,9 @@
}
// Only watch the "update" changes of the specific doc since the beginning.
- changeStreamCursor = coll.watch(
- [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}],
- {resumeAfter: resumeToken, batchSize: 2});
+ changeStreamCursor =
+ coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}],
+ {resumeAfter: resumeToken, batchSize: 2});
// Check the first batch.
assert.eq(changeStreamCursor.objsLeftInBatch(), 2);