summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_metadata_notifications.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/sharding/change_stream_metadata_notifications.js')
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js292
1 files changed, 146 insertions, 146 deletions
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js
index f535012a7b2..48138d089ec 100644
--- a/jstests/sharding/change_stream_metadata_notifications.js
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -2,154 +2,154 @@
// Legacy getMore fails after dropping the database that the original cursor is on.
// @tags: [requires_find_command]
(function() {
- "use strict";
+"use strict";
- load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
- load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
- // For supportsMajorityReadConcern.
- load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+// For supportsMajorityReadConcern.
+load('jstests/multiVersion/libs/causal_consistency_helpers.js');
- if (!supportsMajorityReadConcern()) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- return;
- }
+if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+}
- const st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 1,
- enableMajorityReadConcern: '',
- }
- });
-
- const mongosDB = st.s0.getDB(jsTestName());
- const mongosColl = mongosDB[jsTestName()];
-
- assert.commandWorked(mongosDB.dropDatabase());
-
- // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
-
- // Shard the test collection on a field called 'shardKey'.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}}));
-
- // Move the [0, MaxKey] chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()}));
-
- // Write a document to each chunk.
- assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}}));
-
- let changeStream = mongosColl.watch();
-
- // We awaited the replication of the first writes, so the change stream shouldn't return them.
- assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}}));
- assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
- assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
-
- // Drop the collection and test that we return a "drop" entry, followed by an "invalidate"
- // entry.
- mongosColl.drop();
-
- // Test that we see the two writes that happened before the collection drop.
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey.shardKey, -1);
- const resumeTokenFromFirstUpdate = next._id;
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey.shardKey, 1);
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, {_id: 2});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "drop");
- assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- // With an explicit collation, test that we can resume from before the collection drop.
- changeStream = mongosColl.watch(
- [], {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey, {shardKey: 1, _id: 1});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, {shardKey: 2, _id: 2});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "drop");
- assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- // Test that we can resume the change stream without specifying an explicit collation.
- assert.commandWorked(mongosDB.runCommand({
- aggregate: mongosColl.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
- cursor: {}
- }));
-
- // Recreate and shard the collection.
- assert.commandWorked(mongosDB.createCollection(mongosColl.getName()));
-
- // Shard the test collection on shardKey.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
-
- // Test that resuming the change stream on the recreated collection succeeds, since we will not
- // attempt to inherit the collection's default collation and can therefore ignore the new UUID.
- assert.commandWorked(mongosDB.runCommand({
- aggregate: mongosColl.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
- cursor: {}
- }));
-
- // Recreate the collection as unsharded and open a change stream on it.
- assertDropAndRecreateCollection(mongosDB, mongosColl.getName());
-
- changeStream = mongosColl.watch();
-
- // Drop the database and verify that the stream returns a collection drop followed by an
- // invalidate.
- assert.commandWorked(mongosDB.dropDatabase());
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "drop");
- assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- st.stop();
+const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ }
+});
+
+const mongosDB = st.s0.getDB(jsTestName());
+const mongosColl = mongosDB[jsTestName()];
+
+assert.commandWorked(mongosDB.dropDatabase());
+
+// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
+assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+// Shard the test collection on a field called 'shardKey'.
+assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}}));
+
+// Move the [0, MaxKey] chunk to st.shard1.shardName.
+assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()}));
+
+// Write a document to each chunk.
+assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}}));
+assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}}));
+
+let changeStream = mongosColl.watch();
+
+// We awaited the replication of the first writes, so the change stream shouldn't return them.
+assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}}));
+assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
+assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
+
+// Drop the collection and test that we return a "drop" entry, followed by an "invalidate"
+// entry.
+mongosColl.drop();
+
+// Test that we see the two writes that happened before the collection drop.
+assert.soon(() => changeStream.hasNext());
+let next = changeStream.next();
+assert.eq(next.operationType, "update");
+assert.eq(next.documentKey.shardKey, -1);
+const resumeTokenFromFirstUpdate = next._id;
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "update");
+assert.eq(next.documentKey.shardKey, 1);
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "insert");
+assert.eq(next.documentKey, {_id: 2});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "drop");
+assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "invalidate");
+assert(changeStream.isExhausted());
+
+// With an explicit collation, test that we can resume from before the collection drop.
+changeStream =
+ mongosColl.watch([], {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "update");
+assert.eq(next.documentKey, {shardKey: 1, _id: 1});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "insert");
+assert.eq(next.documentKey, {shardKey: 2, _id: 2});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "drop");
+assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "invalidate");
+assert(changeStream.isExhausted());
+
+// Test that we can resume the change stream without specifying an explicit collation.
+assert.commandWorked(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ cursor: {}
+}));
+
+// Recreate and shard the collection.
+assert.commandWorked(mongosDB.createCollection(mongosColl.getName()));
+
+// Shard the test collection on shardKey.
+assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+// Test that resuming the change stream on the recreated collection succeeds, since we will not
+// attempt to inherit the collection's default collation and can therefore ignore the new UUID.
+assert.commandWorked(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ cursor: {}
+}));
+
+// Recreate the collection as unsharded and open a change stream on it.
+assertDropAndRecreateCollection(mongosDB, mongosColl.getName());
+
+changeStream = mongosColl.watch();
+
+// Drop the database and verify that the stream returns a collection drop followed by an
+// invalidate.
+assert.commandWorked(mongosDB.dropDatabase());
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "drop");
+assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+assert.soon(() => changeStream.hasNext());
+next = changeStream.next();
+assert.eq(next.operationType, "invalidate");
+assert(changeStream.isExhausted());
+
+st.stop();
})();