summaryrefslogtreecommitdiff
path: root/jstests/change_streams/metadata_notifications.js
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-05-30 13:28:25 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-20 11:34:41 -0400
commit43f8fc8c45b854c192f39368f843352c479008f5 (patch)
treec7010e3111ddb8fcfc290d26d767fca7d7b95ec7 /jstests/change_streams/metadata_notifications.js
parentb4056d66c4a563ce0d3afda7720dde3c3cd01e05 (diff)
downloadmongo-43f8fc8c45b854c192f39368f843352c479008f5.tar.gz
SERVER-35030: Add 'startAfter' option to the $changeStream stage
Diffstat (limited to 'jstests/change_streams/metadata_notifications.js')
-rw-r--r--jstests/change_streams/metadata_notifications.js133
1 files changed, 87 insertions, 46 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index ef17b2b4249..f1d4373606a 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -19,6 +19,17 @@
const collName = "test";
assertDropCollection(db, collName);
+ // Asserts that resuming a change stream with 'spec' and an explicit simple collation returns
+ // the results specified by 'expected'.
+ function assertResumeExpected({coll, spec, expected}) {
+ const cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: spec}],
+ aggregateOptions: {collation: {locale: "simple"}}
+ });
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ }
+
// Cursor creation succeeds, but there are no results. We do not expect to see a notification
// for collection creation.
let cursor = cst.startWatchingChanges(
@@ -50,14 +61,10 @@
assert.writeOK(coll.remove({_id: 1}));
assertDropCollection(db, coll.getName());
- // Get a valid resume token that the next aggregate command can use.
- change = cst.getOneChange(cursor);
- assert.eq(change.operationType, "insert");
- const resumeToken = change._id;
-
- // We should get 4 oplog entries of type update, delete, drop, and invalidate. The cursor should
- // be closed.
+ // We should get oplog entries of type insert, update, delete, drop, and invalidate. The cursor
+ // should be closed.
let expectedChanges = [
+ {operationType: "insert"},
{operationType: "update"},
{operationType: "delete"},
{operationType: "drop"},
@@ -65,6 +72,9 @@
];
const changes = cst.assertNextChangesEqual(
{cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+ const resumeToken = changes[0]._id;
+ const resumeTokenDrop = changes[3]._id;
+ const resumeTokenInvalidate = changes[4]._id;
// It should not be possible to resume a change stream after a collection drop without an
// explicit collation, even if the invalidate has not been received.
@@ -77,31 +87,39 @@
// Recreate the collection.
coll = assertCreateCollection(db, collName);
- assert.writeOK(coll.insert({_id: 0}));
+ assert.writeOK(coll.insert({_id: "after recreate"}));
// TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream after an
- // invalidate is a bug, not a feature.
-
- // Test resuming the change stream from the collection drop.
- assert.doesNotThrow(function() {
- const resumeTokenDrop = changes[2]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}});
- assert.soon(() => resumeCursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- resumeCursor.next();
+ // be sure that it doesn't crash the server, but the ability to resume a change stream using
+ // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature.
+
+ // Test resuming the change stream from the collection drop using 'resumeAfter'.
+ expectedChanges = [{
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after recreate"},
+ documentKey: {_id: "after recreate"}
+ }];
+ assertResumeExpected(
+ {coll: coll.getName(), spec: {resumeAfter: resumeTokenDrop}, expected: expectedChanges});
+
+ // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {resumeAfter: resumeTokenInvalidate},
+ expected: expectedChanges
});
- // Test resuming the change stream from the invalidate after the drop.
- assert.doesNotThrow(function() {
- const resumeTokenInvalidate = changes[3]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}});
- assert.soon(() => resumeCursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- resumeCursor.next();
+
+ // Test resuming the change stream from the collection drop using 'startAfter'.
+ assertResumeExpected(
+ {coll: coll.getName(), spec: {startAfter: resumeTokenDrop}, expected: expectedChanges});
+
+ // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. This
+ // is expected to behave identical to resuming from the drop.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenInvalidate},
+ expected: expectedChanges
});
// Test that renaming a collection being watched generates a "rename" entry followed by an
@@ -111,7 +129,7 @@
cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
assertDropCollection(db, "renamed_coll");
assert.writeOK(coll.renameCollection("renamed_coll"));
- let expected = [
+ expectedChanges = [
{
operationType: "rename",
ns: {db: db.getName(), coll: collName},
@@ -120,14 +138,14 @@
{operationType: "invalidate"}
];
cst.assertNextChangesEqual(
- {cursor: cursor, expectedChanges: expected, expectInvalidate: true});
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
coll = db["renamed_coll"];
// Repeat the test, this time with a change stream open on the target.
cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
assert.writeOK(coll.renameCollection(collName));
- expected = [
+ expectedChanges = [
{
operationType: "rename",
ns: {db: db.getName(), coll: "renamed_coll"},
@@ -135,25 +153,48 @@
},
{operationType: "invalidate"}
];
- const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ const changes =
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+ const resumeTokenRename = changes[0]._id;
+ const resumeTokenInvalidate = changes[1]._id;
coll = db[collName];
+ assert.writeOK(coll.insert({_id: "after rename"}));
// TODO SERVER-34789: The code below should throw an error. We exercise this behavior here
// to be sure that it doesn't crash the server, but the ability to resume a change stream
- // after an invalidate is a bug, not a feature.
+ // after an invalidate using 'resumeAfter' is a bug, not a feature.
+
+ // Test resuming the change stream from the collection rename using 'resumeAfter'.
+ expectedChanges = [{
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ }];
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {resumeAfter: resumeTokenRename},
+ expected: expectedChanges
+ });
+ // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {resumeAfter: resumeTokenInvalidate},
+ expected: expectedChanges
+ });
- // Test resuming the change stream from the collection rename.
- assert.doesNotThrow(function() {
- const resumeTokenRename = changes[0]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenRename, collation: {locale: "simple"}});
+ // Test resuming the change stream from the rename using 'startAfter'.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenRename},
+ expected: expectedChanges
});
- // Test resuming the change stream from the invalidate after the drop.
- assert.doesNotThrow(function() {
- const resumeTokenInvalidate = changes[1]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}});
+ // Test resuming the change stream from the invalidate after the rename using 'startAfter'.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenInvalidate},
+ expected: expectedChanges
});
assertDropAndRecreateCollection(db, "renamed_coll");
@@ -164,7 +205,7 @@
cursor =
cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]});
assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */));
- expected = [
+ expectedChanges = [
{
operationType: "rename",
ns: {db: db.getName(), coll: collName},
@@ -173,7 +214,7 @@
{operationType: "invalidate"}
];
cst.assertNextChangesEqual(
- {cursor: cursor, expectedChanges: expected, expectInvalidate: true});
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
coll = db["renamed_coll"];