diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-13 17:48:19 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-07 16:34:49 -0400 |
commit | 620d5006e06453c9b7c54bdf6b2f4c191553bbe1 (patch) | |
tree | 1d2ee6b9d4fe0d06903c470d1693d46e3757beae /jstests/sharding/change_stream_metadata_notifications.js | |
parent | 424dc423aff51129ef64ff109c7288330570fa28 (diff) | |
download | mongo-620d5006e06453c9b7c54bdf6b2f4c191553bbe1.tar.gz |
SERVER-35028: Add change stream notifications for collection drop and rename
Diffstat (limited to 'jstests/sharding/change_stream_metadata_notifications.js')
-rw-r--r-- | jstests/sharding/change_stream_metadata_notifications.js | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js new file mode 100644 index 00000000000..ec8865698d5 --- /dev/null +++ b/jstests/sharding/change_stream_metadata_notifications.js @@ -0,0 +1,158 @@ +// Tests metadata notifications of change streams on sharded collections. +// Legacy getMore fails after dropping the database that the original cursor is on. +// @tags: [requires_find_command] +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. + load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on a field called 'shardKey'. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}})); + + // Move the [0, MaxKey] chunk to st.shard1.shardName. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}})); + + let changeStream = mongosColl.watch(); + + // We awaited the replication of the first writes, so the change stream shouldn't return them. + assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2})); + + // Drop the collection and test that we return a "drop" entry, followed by an "invalidate" + // entry. + mongosColl.drop(); + + // Test that we see the two writes that happened before the collection drop. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey.shardKey, -1); + const resumeTokenFromFirstUpdate = next._id; + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey.shardKey, 1); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: 2}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); + + // With an explicit collation, test that we can resume from before the collection drop. + changeStream = + mongosColl.watch([{$project: {_id: 0}}], + {resumeAfter: resumeTokenFromFirstUpdate, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey, {shardKey: 1, _id: 1}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {shardKey: 2, _id: 2}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); + + // Test that we cannot resume the change stream without specifying an explicit collation. + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + // Recreate and shard the collection. + assert.commandWorked(mongosDB.createCollection(mongosColl.getName())); + + // Shard the test collection on shardKey. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); + + // Test that resuming the change stream on the recreated collection fails since the UUID has + // changed. + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + // Recreate the collection as unsharded and open a change stream on it. + assertDropAndRecreateCollection(mongosDB, mongosColl.getName()); + + changeStream = mongosColl.watch(); + + // Drop the database and verify that the stream returns a collection drop followed by an + // invalidate. + assert.commandWorked(mongosDB.dropDatabase()); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); + + st.stop(); +})(); |