summaryrefslogtreecommitdiff
path: root/jstests/change_streams/metadata_notifications.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/metadata_notifications.js')
-rw-r--r--jstests/change_streams/metadata_notifications.js491
1 files changed, 245 insertions, 246 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index 4d1f29abf2a..8b3aae094fe 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -3,121 +3,198 @@
// invalidated by a database drop.
// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
- "use strict";
-
- load("jstests/libs/change_stream_util.js");
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For isSharded.
-
- db = db.getSiblingDB(jsTestName());
- let cst = new ChangeStreamTest(db);
-
- db.getMongo().forceReadMode('commands');
-
- // Test that it is possible to open a new change stream cursor on a collection that does not
- // exist.
- const collName = "test";
- assertDropCollection(db, collName);
-
- // Asserts that resuming a change stream with 'spec' and an explicit simple collation returns
- // the results specified by 'expected'.
- function assertResumeExpected({coll, spec, expected}) {
- const cursor = cst.startWatchingChanges({
- collection: coll,
- pipeline: [{$changeStream: spec}],
- aggregateOptions: {collation: {locale: "simple"}}
- });
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
- }
-
- // Cursor creation succeeds, but there are no results. We do not expect to see a notification
- // for collection creation.
- let cursor = cst.startWatchingChanges(
- {collection: collName, pipeline: [{$changeStream: {}}, {$project: {operationType: 1}}]});
-
- // We explicitly test getMore, to ensure that the getMore command for a non-existent collection
- // does not return an error.
- let change = cst.getNextBatch(cursor);
- assert.neq(change.id, 0);
- assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
-
- // Dropping the empty database should not generate any notification for the change stream, since
- // the collection does not exist yet.
- assert.commandWorked(db.dropDatabase());
- change = cst.getNextBatch(cursor);
- assert.neq(change.id, 0);
- assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
-
- // After collection creation, we expect to see oplog entries for each subsequent operation.
- let coll = assertCreateCollection(db, collName);
- assert.writeOK(coll.insert({_id: 0}));
-
- // Determine the number of shards that the collection is distributed across.
- const numShards = FixtureHelpers.numberOfShardsForCollection(coll);
-
- change = cst.getOneChange(cursor);
- assert.eq(change.operationType, "insert", tojson(change));
-
- // Create oplog entries of type insert, update, delete, and drop.
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(coll.remove({_id: 1}));
- assertDropCollection(db, coll.getName());
-
- // We should get oplog entries of type insert, update, delete, drop, and invalidate. The cursor
- // should be closed.
- let expectedChanges = [
- {operationType: "insert"},
- {operationType: "update"},
- {operationType: "delete"},
- {operationType: "drop"},
- {operationType: "invalidate"},
- ];
- let changes = cst.assertNextChangesEqual(
- {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
- const resumeToken = changes[0]._id;
- const resumeTokenDrop = changes[3]._id;
- const resumeTokenInvalidate = changes[4]._id;
-
- // Verify we can startAfter the invalidate. We should see one drop event for every other shard
- // that the collection was present on, or nothing if the collection was not sharded. This test
- // exercises the bug described in SERVER-41196.
- const restartedStream = coll.watch([], {startAfter: resumeTokenInvalidate});
- for (let i = 0; i < numShards - 1; ++i) {
- assert.soon(() => restartedStream.hasNext());
- const nextEvent = restartedStream.next();
- assert.eq(nextEvent.operationType, "drop", () => tojson(nextEvent));
- }
- assert(!restartedStream.hasNext(), () => tojson(restartedStream.next()));
-
- // Verify that we can resume a stream after a collection drop without an explicit collation.
- assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- }));
-
- // Recreate the collection.
- coll = assertCreateCollection(db, collName);
- assert.writeOK(coll.insert({_id: "after recreate"}));
-
- // Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a
- // sharded passthrough suite, resuming from the drop will first return the drop from the other
- // shard before returning an invalidate.
- cursor = cst.startWatchingChanges({
+"use strict";
+
+load("jstests/libs/change_stream_util.js");
+load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For isSharded.
+
+db = db.getSiblingDB(jsTestName());
+let cst = new ChangeStreamTest(db);
+
+db.getMongo().forceReadMode('commands');
+
+// Test that it is possible to open a new change stream cursor on a collection that does not
+// exist.
+const collName = "test";
+assertDropCollection(db, collName);
+
+// Asserts that resuming a change stream with 'spec' and an explicit simple collation returns
+// the results specified by 'expected'.
+function assertResumeExpected({coll, spec, expected}) {
+ const cursor = cst.startWatchingChanges({
collection: coll,
- pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
- aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}}
- });
- cst.consumeDropUpTo({
- cursor: cursor,
- dropType: "drop",
- expectedNext: {operationType: "invalidate"},
- expectInvalidate: true
+ pipeline: [{$changeStream: spec}],
+ aggregateOptions: {collation: {locale: "simple"}}
});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+}
+
+// Cursor creation succeeds, but there are no results. We do not expect to see a notification
+// for collection creation.
+let cursor = cst.startWatchingChanges(
+ {collection: collName, pipeline: [{$changeStream: {}}, {$project: {operationType: 1}}]});
+
+// We explicitly test getMore, to ensure that the getMore command for a non-existent collection
+// does not return an error.
+let change = cst.getNextBatch(cursor);
+assert.neq(change.id, 0);
+assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
+
+// Dropping the empty database should not generate any notification for the change stream, since
+// the collection does not exist yet.
+assert.commandWorked(db.dropDatabase());
+change = cst.getNextBatch(cursor);
+assert.neq(change.id, 0);
+assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
+
+// After collection creation, we expect to see oplog entries for each subsequent operation.
+let coll = assertCreateCollection(db, collName);
+assert.writeOK(coll.insert({_id: 0}));
+
+// Determine the number of shards that the collection is distributed across.
+const numShards = FixtureHelpers.numberOfShardsForCollection(coll);
+
+change = cst.getOneChange(cursor);
+assert.eq(change.operationType, "insert", tojson(change));
+
+// Create oplog entries of type insert, update, delete, and drop.
+assert.writeOK(coll.insert({_id: 1}));
+assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+assert.writeOK(coll.remove({_id: 1}));
+assertDropCollection(db, coll.getName());
+
+// We should get oplog entries of type insert, update, delete, drop, and invalidate. The cursor
+// should be closed.
+let expectedChanges = [
+ {operationType: "insert"},
+ {operationType: "update"},
+ {operationType: "delete"},
+ {operationType: "drop"},
+ {operationType: "invalidate"},
+];
+let changes = cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+const resumeToken = changes[0]._id;
+const resumeTokenDrop = changes[3]._id;
+const resumeTokenInvalidate = changes[4]._id;
+
+// Verify we can startAfter the invalidate. We should see one drop event for every other shard
+// that the collection was present on, or nothing if the collection was not sharded. This test
+// exercises the bug described in SERVER-41196.
+const restartedStream = coll.watch([], {startAfter: resumeTokenInvalidate});
+for (let i = 0; i < numShards - 1; ++i) {
+ assert.soon(() => restartedStream.hasNext());
+ const nextEvent = restartedStream.next();
+ assert.eq(nextEvent.operationType, "drop", () => tojson(nextEvent));
+}
+assert(!restartedStream.hasNext(), () => tojson(restartedStream.next()));
+
+// Verify that we can resume a stream after a collection drop without an explicit collation.
+assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+}));
+
+// Recreate the collection.
+coll = assertCreateCollection(db, collName);
+assert.writeOK(coll.insert({_id: "after recreate"}));
+
+// Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a
+// sharded passthrough suite, resuming from the drop will first return the drop from the other
+// shard before returning an invalidate.
+cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
+ aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}}
+});
+cst.consumeDropUpTo({
+ cursor: cursor,
+ dropType: "drop",
+ expectedNext: {operationType: "invalidate"},
+ expectInvalidate: true
+});
+
+// Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
+assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
+ cursor: {},
+ collation: {locale: "simple"},
+}),
+ ErrorCodes.InvalidResumeToken);
+
+// Test resuming the change stream from the collection drop using 'startAfter'.
+assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenDrop},
+ expected: [{operationType: "invalidate"}]
+});
+
+// Test resuming the change stream from the 'invalidate' notification using 'startAfter'.
+cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
+ aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}}
+});
+cst.consumeDropUpTo({
+ cursor: cursor,
+ dropType: "drop",
+ expectedNext: {
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after recreate"},
+ documentKey: {_id: "after recreate"}
+ },
+});
+
+// Test that renaming a collection being watched generates a "rename" entry followed by an
+// "invalidate". This is true if the change stream is on the source or target collection of the
+// rename. Sharded collections cannot be renamed.
+if (!FixtureHelpers.isSharded(coll)) {
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ assertDropCollection(db, "renamed_coll");
+ assert.writeOK(coll.renameCollection("renamed_coll"));
+ expectedChanges = [
+ {
+ operationType: "rename",
+ ns: {db: db.getName(), coll: collName},
+ to: {db: db.getName(), coll: "renamed_coll"},
+ },
+ {operationType: "invalidate"}
+ ];
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ coll = db["renamed_coll"];
- // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
+ // Repeat the test, this time with a change stream open on the target.
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ assert.writeOK(coll.renameCollection(collName));
+ expectedChanges = [
+ {
+ operationType: "rename",
+ ns: {db: db.getName(), coll: "renamed_coll"},
+ to: {db: db.getName(), coll: collName},
+ },
+ {operationType: "invalidate"}
+ ];
+ const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+ const resumeTokenRename = changes[0]._id;
+ const resumeTokenInvalidate = changes[1]._id;
+
+ coll = db[collName];
+ assert.writeOK(coll.insert({_id: "after rename"}));
+
+ // Test resuming the change stream from the collection rename using 'resumeAfter'.
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {resumeAfter: resumeTokenRename},
+ expected: [{operationType: "invalidate"}]
+ });
+ // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'.
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
@@ -126,154 +203,76 @@
}),
ErrorCodes.InvalidResumeToken);
- // Test resuming the change stream from the collection drop using 'startAfter'.
+ // Test resuming the change stream from the rename using 'startAfter'.
assertResumeExpected({
coll: coll.getName(),
- spec: {startAfter: resumeTokenDrop},
+ spec: {startAfter: resumeTokenRename},
expected: [{operationType: "invalidate"}]
});
- // Test resuming the change stream from the 'invalidate' notification using 'startAfter'.
- cursor = cst.startWatchingChanges({
- collection: coll,
- pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
- aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}}
- });
- cst.consumeDropUpTo({
- cursor: cursor,
- dropType: "drop",
- expectedNext: {
- operationType: "insert",
- ns: {db: db.getName(), coll: coll.getName()},
- fullDocument: {_id: "after recreate"},
- documentKey: {_id: "after recreate"}
- },
+ // Test resuming the change stream from the invalidate after the rename using 'startAfter'.
+ expectedChanges = [{
+ operationType: "insert",
+ ns: {db: db.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ }];
+ assertResumeExpected({
+ coll: coll.getName(),
+ spec: {startAfter: resumeTokenInvalidate},
+ expected: expectedChanges
});
- // Test that renaming a collection being watched generates a "rename" entry followed by an
- // "invalidate". This is true if the change stream is on the source or target collection of the
- // rename. Sharded collections cannot be renamed.
- if (!FixtureHelpers.isSharded(coll)) {
- cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
- assertDropCollection(db, "renamed_coll");
- assert.writeOK(coll.renameCollection("renamed_coll"));
- expectedChanges = [
- {
- operationType: "rename",
- ns: {db: db.getName(), coll: collName},
- to: {db: db.getName(), coll: "renamed_coll"},
- },
- {operationType: "invalidate"}
- ];
- cst.assertNextChangesEqual(
- {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
-
- coll = db["renamed_coll"];
-
- // Repeat the test, this time with a change stream open on the target.
- cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
- assert.writeOK(coll.renameCollection(collName));
- expectedChanges = [
- {
- operationType: "rename",
- ns: {db: db.getName(), coll: "renamed_coll"},
- to: {db: db.getName(), coll: collName},
- },
- {operationType: "invalidate"}
- ];
- const changes =
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
- const resumeTokenRename = changes[0]._id;
- const resumeTokenInvalidate = changes[1]._id;
-
- coll = db[collName];
- assert.writeOK(coll.insert({_id: "after rename"}));
-
- // Test resuming the change stream from the collection rename using 'resumeAfter'.
- assertResumeExpected({
- coll: coll.getName(),
- spec: {resumeAfter: resumeTokenRename},
- expected: [{operationType: "invalidate"}]
- });
- // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'.
- assert.commandFailedWithCode(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
- cursor: {},
- collation: {locale: "simple"},
- }),
- ErrorCodes.InvalidResumeToken);
-
- // Test resuming the change stream from the rename using 'startAfter'.
- assertResumeExpected({
- coll: coll.getName(),
- spec: {startAfter: resumeTokenRename},
- expected: [{operationType: "invalidate"}]
- });
-
- // Test resuming the change stream from the invalidate after the rename using 'startAfter'.
- expectedChanges = [{
- operationType: "insert",
- ns: {db: db.getName(), coll: coll.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- }];
- assertResumeExpected({
- coll: coll.getName(),
- spec: {startAfter: resumeTokenInvalidate},
- expected: expectedChanges
- });
-
- assertDropAndRecreateCollection(db, "renamed_coll");
- assert.writeOK(db.renamed_coll.insert({_id: 0}));
-
- // Repeat the test again, this time using the 'dropTarget' option with an existing target
- // collection.
- cursor =
- cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]});
- assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */));
- expectedChanges = [
- {
- operationType: "rename",
- ns: {db: db.getName(), coll: collName},
- to: {db: db.getName(), coll: "renamed_coll"},
- },
- {operationType: "invalidate"}
- ];
- cst.assertNextChangesEqual(
- {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
-
- coll = db["renamed_coll"];
-
- // Test the behavior of a change stream watching the target collection of a $out aggregation
- // stage.
- cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
- coll.aggregate([{$out: collName}]);
- // Note that $out will first create a temp collection, and then rename the temp collection
- // to the target. Do not explicitly check the 'ns' field.
- const rename = cst.getOneChange(cursor);
- assert.eq(rename.operationType, "rename", tojson(rename));
- assert.eq(rename.to, {db: db.getName(), coll: collName}, tojson(rename));
- assert.eq(cst.getOneChange(cursor, true).operationType, "invalidate");
- }
-
- // Test that dropping a database will first drop all of it's collections, invalidating any
- // change streams on those collections.
- cursor = cst.startWatchingChanges({
- collection: coll.getName(),
- pipeline: [{$changeStream: {}}],
- });
- assert.commandWorked(db.dropDatabase());
+ assertDropAndRecreateCollection(db, "renamed_coll");
+ assert.writeOK(db.renamed_coll.insert({_id: 0}));
+ // Repeat the test again, this time using the 'dropTarget' option with an existing target
+ // collection.
+ cursor =
+ cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]});
+ assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */));
expectedChanges = [
{
- operationType: "drop",
- ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "rename",
+ ns: {db: db.getName(), coll: collName},
+ to: {db: db.getName(), coll: "renamed_coll"},
},
{operationType: "invalidate"}
];
cst.assertNextChangesEqual(
{cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
- cst.cleanUp();
+ coll = db["renamed_coll"];
+
+ // Test the behavior of a change stream watching the target collection of a $out aggregation
+ // stage.
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ coll.aggregate([{$out: collName}]);
+ // Note that $out will first create a temp collection, and then rename the temp collection
+ // to the target. Do not explicitly check the 'ns' field.
+ const rename = cst.getOneChange(cursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: db.getName(), coll: collName}, tojson(rename));
+ assert.eq(cst.getOneChange(cursor, true).operationType, "invalidate");
+}
+
+// Test that dropping a database will first drop all of it's collections, invalidating any
+// change streams on those collections.
+cursor = cst.startWatchingChanges({
+ collection: coll.getName(),
+ pipeline: [{$changeStream: {}}],
+});
+assert.commandWorked(db.dropDatabase());
+
+expectedChanges = [
+ {
+ operationType: "drop",
+ ns: {db: db.getName(), coll: coll.getName()},
+ },
+ {operationType: "invalidate"}
+];
+cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+
+cst.cleanUp();
}());