diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-03 17:00:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-11 15:53:03 -0400 |
commit | fff261ac550155065fce4b7b1529061f18980599 (patch) | |
tree | 09ce022d7b8319f1af3c2db2354427ecfe1aa389 /jstests/sharding/change_streams_unsharded_becomes_sharded.js | |
parent | 0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff) | |
download | mongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz |
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'jstests/sharding/change_streams_unsharded_becomes_sharded.js')
-rw-r--r-- | jstests/sharding/change_streams_unsharded_becomes_sharded.js | 243 |
1 files changed, 156 insertions, 87 deletions
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js index bb6019a4651..ea5178601b1 100644 --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -28,97 +28,166 @@ } }); - const mongosDB = st.s0.getDB(testName); + const mongosDB = st.s0.getDB("test"); const mongosColl = mongosDB[testName]; - mongosDB.createCollection(testName); - mongosColl.createIndex({x: 1}); - - st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - - // Establish a change stream cursor on the unsharded collection. - let cst = new ChangeStreamTest(mongosDB); - let cursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: mongosColl}); - assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); - - // Verify that the cursor picks up documents inserted while the collection is unsharded. The - // 'documentKey' at this point is simply the _id field. - assert.writeOK(mongosColl.insert({_id: 0, x: 0})); - const[preShardCollectionChange] = cst.assertNextChangesEqual({ - cursor: cursor, - expectedChanges: [{ - documentKey: {_id: 0}, - fullDocument: {_id: 0, x: 0}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }] - }); + function testUnshardedBecomesSharded(collToWatch) { + mongosColl.drop(); + mongosDB.createCollection(testName); + mongosColl.createIndex({x: 1}); + + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Establish a change stream cursor on the unsharded collection. + const cst = new ChangeStreamTest(mongosDB); + + // Create a different collection in the same database, and verify that it doesn't affect the + // results of the change stream. + const mongosCollOther = mongosDB[testName + "other"]; + mongosCollOther.drop(); + mongosDB.createCollection(testName + "other"); + mongosCollOther.createIndex({y: 1}); + + let cursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}], + collection: collToWatch + }); + assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + + // Verify that the cursor picks up documents inserted while the collection is unsharded. The + // 'documentKey' at this point is simply the _id field. + assert.writeOK(mongosColl.insert({_id: 0, x: 0})); + assert.writeOK(mongosCollOther.insert({_id: 0, y: 0})); + const[preShardCollectionChange] = cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [{ + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 0}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }] + }); + + // Record the resume token for this change, before the collection is sharded. + const preShardCollectionResumeToken = preShardCollectionChange._id; + + // Shard the test collection with shard key {x: 1} and split into 2 chunks. + st.shardColl(mongosColl.getName(), {x: 1}, {x: 0}, false, mongosDB.getName()); + + // Shard the other collection with shard key {y: 1} and split into 2 chunks. + st.shardColl(mongosCollOther.getName(), {y: 1}, {y: 0}, false, mongosDB.getName()); + + // List the changes we expect to see for the next two operations on the sharded collection. + // Later, we will resume the stream using the token generated before the collection was + // sharded, and will need to confirm that we can still see these two changes. + const postShardCollectionChanges = [ + { + documentKey: {x: 1, _id: 1}, + fullDocument: {_id: 1, x: 1}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }, + { + documentKey: {x: -1, _id: -1}, + fullDocument: {_id: -1, x: -1}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + } + ]; + + // Verify that the cursor on the original shard is still valid and sees new inserted + // documents. The 'documentKey' field should now include the shard key, even before a + // 'kNewShardDetected' operation has been generated by the migration of a chunk to a new + // shard. + assert.writeOK(mongosColl.insert({_id: 1, x: 1})); + assert.writeOK(mongosCollOther.insert({_id: 1, y: 1})); + cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]}); + + // Move the [minKey, 0) chunk to shard1. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + find: {x: -1}, + to: st.rs1.getURL(), + _waitForDelete: true + })); + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosCollOther.getFullName(), + find: {y: -1}, + to: st.rs1.getURL(), + _waitForDelete: true + })); + + // Make sure the change stream cursor sees a document inserted on the recipient shard. + assert.writeOK(mongosColl.insert({_id: -1, x: -1})); + assert.writeOK(mongosCollOther.insert({_id: -1, y: -1})); + cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]}); + + // Confirm that we can resume the stream on the sharded collection using the token generated + // while the collection was unsharded, whose documentKey contains the _id field but not the + // shard key. + let resumedCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}], + collection: mongosColl + }); + + // Verify that we see both of the insertions which occurred after the collection was + // sharded. + cst.assertNextChangesEqual( + {cursor: resumedCursor, expectedChanges: postShardCollectionChanges}); + + // Test the behavior of a change stream when a sharded collection is dropped and recreated. + cursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}], + collection: collToWatch + }); + assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + + // Insert a couple documents to shard1, creating a scenario where the getMore to shard0 will + // indicate that the change stream is invalidated yet shard1 will still have data to return. + assert.writeOK(mongosColl.insert({_id: -2, x: -2})); + assert.writeOK(mongosColl.insert({_id: -3, x: -3})); + + // Drop and recreate the collection. + mongosColl.drop(); + mongosDB.createCollection(mongosColl.getName()); + mongosColl.createIndex({z: 1}); + + // Shard the collection on a different shard key and ensure that each shard has a chunk. + st.shardColl(mongosColl.getName(), {z: 1}, {z: 0}, {z: -1}, mongosDB.getName()); + + assert.writeOK(mongosColl.insert({_id: -1, z: -1})); + assert.writeOK(mongosColl.insert({_id: 1, z: 1})); + + // Verify that the change stream picks up the inserts, however the shard key is missing + // since the collection has since been dropped and recreated. + cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [ + { + documentKey: {_id: -2}, + fullDocument: {_id: -2, x: -2}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: -3}, + fullDocument: {_id: -3, x: -3}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + } + ] + }); + + cst.cleanUp(); + } - // Record the resume token for this change, before the collection is sharded. - const preShardCollectionResumeToken = preShardCollectionChange._id; - - // Enable sharding on the previously unsharded collection. - assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); - - // Shard the collection on x. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {x: 1}})); - - // Ensure that the primary shard has an up-to-date routing table. - assert.commandWorked(st.rs0.getPrimary().getDB("admin").runCommand( - {_flushRoutingTableCacheUpdates: mongosColl.getFullName()})); - - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). - assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {x: 0}})); - - // List the changes we expect to see for the next two operations on the sharded collection. - // Later, we will resume the stream using the token generated before the collection was sharded, - // and will need to confirm that we can still see these two changes. - const postShardCollectionChanges = [ - { - documentKey: {x: 1, _id: 1}, - fullDocument: {_id: 1, x: 1}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }, - { - documentKey: {x: -1, _id: -1}, - fullDocument: {_id: -1, x: -1}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - } - ]; - - // Verify that the cursor on the original shard is still valid and sees new inserted documents. - // The 'documentKey' field should now include the shard key, even before a 'kNewShardDetected' - // operation has been generated by the migration of a chunk to a new shard. - assert.writeOK(mongosColl.insert({_id: 1, x: 1})); - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]}); - - // Move the [minKey, 0) chunk to shard1. - assert.commandWorked(mongosDB.adminCommand({ - moveChunk: mongosColl.getFullName(), - find: {x: -1}, - to: st.rs1.getURL(), - _waitForDelete: true - })); - - // Make sure the change stream cursor sees a document inserted on the recipient shard. - assert.writeOK(mongosColl.insert({_id: -1, x: -1})); - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]}); - - // Confirm that we can resume the stream on the sharded collection using the token generated - // while the collection was unsharded, whose documentKey contains the _id field but not the - // shard key. - let resumedCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}], - collection: mongosColl - }); + // First test against a change stream on a single collection. + testUnshardedBecomesSharded(mongosColl.getName()); - // Verify that we see both of the insertions which occurred after the collection was sharded. - cst.assertNextChangesEqual( - {cursor: resumedCursor, expectedChanges: postShardCollectionChanges}); + // Test against a change stream on the entire database. + testUnshardedBecomesSharded(1); st.stop(); })(); |