diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2018-12-13 20:54:06 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2019-02-03 15:06:46 +0000 |
commit | 065f3ef77de57609d92fce482c1e4e36b15cf29c (patch) | |
tree | 52a3735f31b7b5632e7e04d4ed8657f75e12c1f9 /jstests/change_streams | |
parent | fbb9daeb791d16c49b861d82c097cf9bd9daf07e (diff) | |
download | mongo-065f3ef77de57609d92fce482c1e4e36b15cf29c.tar.gz |
SERVER-37786 Reject change stream pipelines which modify or project-out the resume token
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/collation.js | 9 | ||||
-rw-r--r-- | jstests/change_streams/only_wake_getmore_for_relevant_changes.js | 2 | ||||
-rw-r--r-- | jstests/change_streams/pipeline_cannot_modify_id_field.js | 143 | ||||
-rw-r--r-- | jstests/change_streams/shell_helper.js | 28 |
4 files changed, 158 insertions, 24 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js index 86c7563c339..77d345dc0b1 100644 --- a/jstests/change_streams/collation.js +++ b/jstests/change_streams/collation.js @@ -217,18 +217,15 @@ assertDropAndRecreateCollection(db, "change_stream_no_collation"); const cursor = noCollationCollection.watch( - [ - {$match: {"fullDocument.text": "abc"}}, - {$project: {docId: "$documentKey._id", _id: 0}} - ], + [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}], {collation: caseInsensitive}); assert(!cursor.hasNext()); assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"})); assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"})); assert.soon(() => cursor.hasNext()); - assert.docEq(cursor.next(), {docId: 0}); + assertChangeStreamEventEq(cursor.next(), {docId: 0}); assert.soon(() => cursor.hasNext()); - assert.docEq(cursor.next(), {docId: 1}); + assertChangeStreamEventEq(cursor.next(), {docId: 1}); assert(!cursor.hasNext()); }()); diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js index bdd45a4d1f4..63be5783fa5 100644 --- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js +++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js @@ -116,7 +116,7 @@ eventFn();`, let res = assert.commandWorked(db.runCommand({ aggregate: changesCollection.getName(), // Project out the resume token, since that's subject to change unpredictably. - pipeline: [{$changeStream: {}}, {$project: {"_id": 0}}], + pipeline: [{$changeStream: {}}], cursor: {}, comment: wholeCollectionStreamComment })); diff --git a/jstests/change_streams/pipeline_cannot_modify_id_field.js b/jstests/change_streams/pipeline_cannot_modify_id_field.js new file mode 100644 index 00000000000..49a0ce42d33 --- /dev/null +++ b/jstests/change_streams/pipeline_cannot_modify_id_field.js @@ -0,0 +1,143 @@ +/** + * Tests that stages which modify or remove the _id field are not permitted to run in a + * $changeStream pipeline. + */ +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + const coll = assertDropAndRecreateCollection(db, jsTestName()); + + // Bare-bones $changeStream pipeline which will be augmented during tests. + const changeStream = [{$changeStream: {}}]; + + // Test-cases of transformations that modify or remove _id, and are thus disallowed. + const idModifyingTransformations = [ + {$project: {_id: 0}}, + {$project: {_id: "newValue"}}, + {$project: {_id: "$otherField"}}, + {$project: {_id: 0, otherField: 0}}, + {$project: {_id: 0, otherField: 1}}, + {$project: {"_id._data": 0}}, + {$project: {"_id._data": 1}}, + {$project: {"_id._data": "newValue"}}, + {$project: {"_id._data": "$_id._data"}}, // Disallowed because it discards _typeBits. + {$project: {"_id._data": "$otherField"}}, + {$project: {"_id.otherField": 1}}, + {$project: {"_id._typeBits": 0}}, + [ + {$project: {otherField: "$_id"}}, + {$project: {otherField: 0}}, + {$project: {_id: "$otherField"}} + ], + {$project: {_id: {data: "$_id._data", typeBits: "$_id._typeBits"}}}, // Fields renamed. + {$project: {_id: {_typeBits: "$_id._typeBits", _data: "$_id._data"}}}, // Fields reordered. + {$project: {_id: {_data: "$_id._typeBits", _typeBits: "$_id._data"}}}, // Fields swapped. + {$addFields: {_id: "newValue"}}, + {$addFields: {_id: "$otherField"}}, + {$addFields: {"_id._data": "newValue"}}, + {$addFields: {"_id._data": "$otherField"}}, + {$addFields: {"_id.otherField": "newValue"}}, // New subfield added to _id. + [ + {$addFields: {otherField: "$_id"}}, + {$addFields: {otherField: "newValue"}}, + {$addFields: {_id: "$otherField"}} + ], + [ + // Fields renamed. + {$addFields: {newId: {data: "$_id._data", typeBits: "$_id._typeBits"}}}, + {$addFields: {_id: "$newId"}} + ], + [ + // Fields reordered. + {$addFields: {newId: {_typeBits: "$_id._typeBits", _data: "$_id._data"}}}, + {$addFields: {_id: "$newId"}} + ], + [ + // Fields swapped. + {$addFields: {newId: {_data: "$_id._typeBits", _typeBits: "$_id._data"}}}, + {$addFields: {_id: "$newId"}} + ], + {$replaceRoot: {newRoot: {otherField: "$_id"}}}, + {$redact: {$cond: {if: {$gt: ["$_id", {}]}, then: "$$DESCEND", else: "$$PRUNE"}}} // _id:0 + ]; + + // Test-cases of projections which are allowed: explicit inclusion of _id, implicit inclusion of + // _id, renames which retain the full _id field, exclusion of unrelated fields, addition of and + // modifications to unrelated fields, sequential renames which ultimately preserve _id, etc. + const idPreservingTransformations = [ + {$project: {_id: 1}}, + {$project: {_id: 1, otherField: 0}}, + {$project: {_id: 1, otherField: 1}}, + {$project: {_id: "$_id", otherField: 1}}, + {$project: {"_id.otherField": 0}}, + {$project: {otherField: 1}}, + {$project: {otherField: 0}}, + {$project: {otherField: "$_id"}}, + [ + {$project: {otherField: "$_id"}}, + {$project: {otherField: 1}}, + {$project: {_id: "$otherField"}} + ], + {$project: {"_id._data": 1, "_id._typeBits": 1}}, + {$project: {_id: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}}, + {$addFields: {_id: "$_id"}}, + {$addFields: {otherField: "newValue"}}, + {$addFields: {_id: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}}, + [{$addFields: {otherField: "$_id"}}, {$addFields: {_id: "$otherField"}}], + [ + {$addFields: {newId: {_data: "$_id._data", _typeBits: "$_id._typeBits"}}}, + {$addFields: {_id: "$newId"}} + ], + {$replaceRoot: {newRoot: {_id: "$_id"}}}, + { + $redact: { + $cond: { + if: { + $or: [ + // Keeps _id, descends into fullDocument. + {$not: {$isArray: "$tags"}}, + {$gt: [{$size: {$setIntersection: ["$tags", ["USA"]]}}, 0]} + ] + }, + then: "$$DESCEND", + else: "$$PRUNE" + } + } + }, + {$redact: "$$DESCEND"}, // Descends through entire document, retaining all of it. + {$redact: "$$KEEP"} // Keeps entire document. + ]; + + let docId = 0; + + // Verify that each of the whitelisted transformations above succeeds. + for (let transform of idPreservingTransformations) { + const cmdRes = assert.commandWorked( + db.runCommand( + {aggregate: coll.getName(), pipeline: changeStream.concat(transform), cursor: {}}), + transform); + assert.commandWorked(coll.insert({_id: docId++})); + assert.soon(() => { + const getMoreRes = assert.commandWorked( + db.runCommand({getMore: cmdRes.cursor.id, collection: coll.getName()}), transform); + return getMoreRes.cursor.nextBatch.length > 0; + }, transform); + } + + // Verify that each of the blacklisted transformations above are rejected. + for (let transform of idModifyingTransformations) { + const cmdRes = assert.commandWorked( + db.runCommand( + {aggregate: coll.getName(), pipeline: changeStream.concat(transform), cursor: {}}), + transform); + assert.commandWorked(coll.insert({_id: docId++})); + assert.soon(() => { + const getMoreRes = + db.runCommand({getMore: cmdRes.cursor.id, collection: coll.getName()}); + return !getMoreRes.ok && + assert.commandFailedWithCode(getMoreRes, [51059, 51060], transform); + }, transform); + } +}());
\ No newline at end of file diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index 2b7c485b1c7..f63eba06df7 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -13,13 +13,8 @@ function checkNextChange(cursor, expected) { assert.soon(() => cursor.hasNext()); const nextObj = cursor.next(); - const originalObj = Object.assign({}, nextObj); - - delete nextObj._id; - delete nextObj.clusterTime; - assert.docEq(nextObj, expected); - - return originalObj; + assertChangeStreamEventEq(nextObj, expected); + return nextObj; } function testCommandIsCalled(testFunc, checkFunc) { @@ -75,8 +70,7 @@ assert.docEq(change, expected); jsTestLog("Testing watch() with pipeline"); - changeStreamCursor = - coll.watch([{$project: {_id: 0, clusterTime: 1, docId: "$documentKey._id"}}]); + changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]); // Store the cluster time of the insert as the timestamp to start from. const resumeTime = @@ -94,21 +88,21 @@ jsTestLog("Testing watch() with pipeline and resumeAfter"); changeStreamCursor = - coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); + coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with pipeline and startAfter"); changeStreamCursor = - coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken}); + coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with pipeline and startAtOperationTime"); - changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {startAtOperationTime: resumeTime}); + changeStreamCursor = + coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with updateLookup"); - changeStreamCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); + changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"}); assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}})); expected = { @@ -131,9 +125,9 @@ } // Only watch the "update" changes of the specific doc since the beginning. - changeStreamCursor = coll.watch( - [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}], - {resumeAfter: resumeToken, batchSize: 2}); + changeStreamCursor = + coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}], + {resumeAfter: resumeToken, batchSize: 2}); // Check the first batch. assert.eq(changeStreamCursor.objsLeftInBatch(), 2); |