summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_metadata_notifications.js
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-13 17:48:19 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-07 16:34:49 -0400
commit620d5006e06453c9b7c54bdf6b2f4c191553bbe1 (patch)
tree1d2ee6b9d4fe0d06903c470d1693d46e3757beae /jstests/sharding/change_stream_metadata_notifications.js
parent424dc423aff51129ef64ff109c7288330570fa28 (diff)
downloadmongo-620d5006e06453c9b7c54bdf6b2f4c191553bbe1.tar.gz
SERVER-35028: Add change stream notifications for collection drop and rename
Diffstat (limited to 'jstests/sharding/change_stream_metadata_notifications.js')
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js158
1 files changed, 158 insertions, 0 deletions
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js
new file mode 100644
index 00000000000..ec8865698d5
--- /dev/null
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -0,0 +1,158 @@
+// Tests metadata notifications of change streams on sharded collections.
+// Legacy getMore fails after dropping the database that the original cursor is on.
+// @tags: [requires_find_command]
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on a field called 'shardKey'.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}}));
+
+ // Move the [0, MaxKey] chunk to st.shard1.shardName.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}}));
+
+ let changeStream = mongosColl.watch();
+
+ // We awaited the replication of the first writes, so the change stream shouldn't return them.
+ assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
+
+ // Drop the collection and test that we return a "drop" entry, followed by an "invalidate"
+ // entry.
+ mongosColl.drop();
+
+ // Test that we see the two writes that happened before the collection drop.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey.shardKey, -1);
+ const resumeTokenFromFirstUpdate = next._id;
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey.shardKey, 1);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: 2});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ // With an explicit collation, test that we can resume from before the collection drop.
+ changeStream =
+ mongosColl.watch([{$project: {_id: 0}}],
+ {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, {shardKey: 1, _id: 1});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {shardKey: 2, _id: 2});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ // Test that we cannot resume the change stream without specifying an explicit collation.
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ // Recreate and shard the collection.
+ assert.commandWorked(mongosDB.createCollection(mongosColl.getName()));
+
+ // Shard the test collection on shardKey.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+ // Test that resuming the change stream on the recreated collection fails since the UUID has
+ // changed.
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ // Recreate the collection as unsharded and open a change stream on it.
+ assertDropAndRecreateCollection(mongosDB, mongosColl.getName());
+
+ changeStream = mongosColl.watch();
+
+ // Drop the database and verify that the stream returns a collection drop followed by an
+ // invalidate.
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ st.stop();
+})();