summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-18 17:27:43 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-27 09:38:59 -0400
commitbbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd (patch)
tree594dba7d8898c73ab488137e20112cc99a2d5dda /jstests/change_streams
parent752069dbc79f22b6ae4691073d455d76c9bbf18d (diff)
downloadmongo-bbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd.tar.gz
SERVER-34789: Using resume token from 'invalidate' notification for 'resumeAfter' should error
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/metadata_notifications.js78
-rw-r--r--jstests/change_streams/whole_db_resumability.js43
2 files changed, 62 insertions, 59 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index f1d4373606a..be892c114da 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -70,7 +70,7 @@
{operationType: "drop"},
{operationType: "invalidate"},
];
- const changes = cst.assertNextChangesEqual(
+ let changes = cst.assertNextChangesEqual(
{cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
const resumeToken = changes[0]._id;
const resumeTokenDrop = changes[3]._id;
@@ -89,33 +89,36 @@
coll = assertCreateCollection(db, collName);
assert.writeOK(coll.insert({_id: "after recreate"}));
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream using
- // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature.
-
// Test resuming the change stream from the collection drop using 'resumeAfter'.
- expectedChanges = [{
- operationType: "insert",
- ns: {db: db.getName(), coll: coll.getName()},
- fullDocument: {_id: "after recreate"},
- documentKey: {_id: "after recreate"}
- }];
- assertResumeExpected(
- {coll: coll.getName(), spec: {resumeAfter: resumeTokenDrop}, expected: expectedChanges});
-
- // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
assertResumeExpected({
coll: coll.getName(),
- spec: {resumeAfter: resumeTokenInvalidate},
- expected: expectedChanges
+ spec: {resumeAfter: resumeTokenDrop},
+ expected: [{operationType: "invalidate"}]
});
+ // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
+ cursor: {},
+ collation: {locale: "simple"},
+ }),
+ ErrorCodes.InvalidResumeToken);
+
// Test resuming the change stream from the collection drop using 'startAfter'.
- assertResumeExpected(
- {coll: coll.getName(), spec: {startAfter: resumeTokenDrop}, expected: expectedChanges});
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenDrop},
+ expected: [{operationType: "invalidate"}]
+ });
- // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. This
- // is expected to behave identical to resuming from the drop.
+ // Test resuming the change stream from the 'invalidate' notification using 'startAfter'.
+ expectedChanges = [{
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after recreate"},
+ documentKey: {_id: "after recreate"}
+ }];
assertResumeExpected({
coll: coll.getName(),
spec: {startAfter: resumeTokenInvalidate},
@@ -161,36 +164,35 @@
coll = db[collName];
assert.writeOK(coll.insert({_id: "after rename"}));
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here
- // to be sure that it doesn't crash the server, but the ability to resume a change stream
- // after an invalidate using 'resumeAfter' is a bug, not a feature.
-
// Test resuming the change stream from the collection rename using 'resumeAfter'.
- expectedChanges = [{
- operationType: "insert",
- ns: {db: db.getName(), coll: coll.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- }];
assertResumeExpected({
coll: coll.getName(),
spec: {resumeAfter: resumeTokenRename},
- expected: expectedChanges
+ expected: [{operationType: "invalidate"}]
});
// Test resuming the change stream from the invalidate after the rename using 'resumeAfter'.
- assertResumeExpected({
- coll: coll.getName(),
- spec: {resumeAfter: resumeTokenInvalidate},
- expected: expectedChanges
- });
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
+ cursor: {},
+ collation: {locale: "simple"},
+ }),
+ ErrorCodes.InvalidResumeToken);
// Test resuming the change stream from the rename using 'startAfter'.
assertResumeExpected({
coll: coll.getName(),
spec: {startAfter: resumeTokenRename},
- expected: expectedChanges
+ expected: [{operationType: "invalidate"}]
});
+
// Test resuming the change stream from the invalidate after the rename using 'startAfter'.
+ expectedChanges = [{
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ }];
assertResumeExpected({
coll: coll.getName(),
spec: {startAfter: resumeTokenInvalidate},
diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js
index ba311400c87..c34d4067af2 100644
--- a/jstests/change_streams/whole_db_resumability.js
+++ b/jstests/change_streams/whole_db_resumability.js
@@ -131,6 +131,9 @@
];
const dropDbChanges = cst.assertNextChangesEqual(
{cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop});
+ const resumeTokenDrop = dropDbChanges[0]._id;
+ const resumeTokenDropDb = dropDbChanges[1]._id;
+ const resumeTokenInvalidate = dropDbChanges[2]._id;
// Resume from the first collection drop.
resumeCursor = cst.startWatchingChanges({
@@ -142,7 +145,7 @@
// Resume from the second collection drop using 'resumeAfter'.
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: dropDbChanges[0]._id}}],
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
collection: 1,
});
cst.assertNextChangesEqual(
@@ -150,55 +153,53 @@
// Resume from the second collection drop using 'startAfter'.
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAfter: dropDbChanges[0]._id}}],
+ pipeline: [{$changeStream: {startAfter: resumeTokenDrop}}],
collection: 1,
});
cst.assertNextChangesEqual(
{cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop.slice(1)});
// Recreate the test collection.
- coll = assertCreateCollection(testDB, coll.getName());
assert.writeOK(coll.insert({_id: "after recreate"}));
- let expectedInsert = {
+ let expectedInsert = [{
operationType: "insert",
- ns: {db: db.getName(), coll: coll.getName()},
+ ns: {db: testDB.getName(), coll: coll.getName()},
fullDocument: {_id: "after recreate"},
documentKey: {_id: "after recreate"}
- };
-
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream using
- // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature.
+ }];
// Test resuming from the 'dropDatabase' entry using 'resumeAfter'.
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}],
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
// Test resuming from the 'invalidate' entry using 'resumeAfter'.
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
+ cursor: {},
+ collation: {locale: "simple"},
+ }),
+ ErrorCodes.InvalidResumeToken);
// Test resuming from the 'dropDatabase' entry using 'startAfter'.
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAfter: dropDbChanges[1]._id}}],
+ pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
// Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the
// insert after recreating the db/collection.
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}],
+ pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});