summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_unsharded_becomes_sharded.js
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-03 17:00:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-11 15:53:03 -0400
commitfff261ac550155065fce4b7b1529061f18980599 (patch)
tree09ce022d7b8319f1af3c2db2354427ecfe1aa389 /jstests/sharding/change_streams_unsharded_becomes_sharded.js
parent0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff)
downloadmongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'jstests/sharding/change_streams_unsharded_becomes_sharded.js')
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js243
1 files changed, 156 insertions, 87 deletions
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index bb6019a4651..ea5178601b1 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -28,97 +28,166 @@
}
});
- const mongosDB = st.s0.getDB(testName);
+ const mongosDB = st.s0.getDB("test");
const mongosColl = mongosDB[testName];
- mongosDB.createCollection(testName);
- mongosColl.createIndex({x: 1});
-
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
-
- // Establish a change stream cursor on the unsharded collection.
- let cst = new ChangeStreamTest(mongosDB);
- let cursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: mongosColl});
- assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
-
- // Verify that the cursor picks up documents inserted while the collection is unsharded. The
- // 'documentKey' at this point is simply the _id field.
- assert.writeOK(mongosColl.insert({_id: 0, x: 0}));
- const[preShardCollectionChange] = cst.assertNextChangesEqual({
- cursor: cursor,
- expectedChanges: [{
- documentKey: {_id: 0},
- fullDocument: {_id: 0, x: 0},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- }]
- });
+ function testUnshardedBecomesSharded(collToWatch) {
+ mongosColl.drop();
+ mongosDB.createCollection(testName);
+ mongosColl.createIndex({x: 1});
+
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Establish a change stream cursor on the unsharded collection.
+ const cst = new ChangeStreamTest(mongosDB);
+
+ // Create a different collection in the same database, and verify that it doesn't affect the
+ // results of the change stream.
+ const mongosCollOther = mongosDB[testName + "other"];
+ mongosCollOther.drop();
+ mongosDB.createCollection(testName + "other");
+ mongosCollOther.createIndex({y: 1});
+
+ let cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
+ collection: collToWatch
+ });
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Verify that the cursor picks up documents inserted while the collection is unsharded. The
+ // 'documentKey' at this point is simply the _id field.
+ assert.writeOK(mongosColl.insert({_id: 0, x: 0}));
+ assert.writeOK(mongosCollOther.insert({_id: 0, y: 0}));
+ const[preShardCollectionChange] = cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [{
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 0},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }]
+ });
+
+ // Record the resume token for this change, before the collection is sharded.
+ const preShardCollectionResumeToken = preShardCollectionChange._id;
+
+ // Shard the test collection with shard key {x: 1} and split into 2 chunks.
+ st.shardColl(mongosColl.getName(), {x: 1}, {x: 0}, false, mongosDB.getName());
+
+ // Shard the other collection with shard key {y: 1} and split into 2 chunks.
+ st.shardColl(mongosCollOther.getName(), {y: 1}, {y: 0}, false, mongosDB.getName());
+
+ // List the changes we expect to see for the next two operations on the sharded collection.
+ // Later, we will resume the stream using the token generated before the collection was
+ // sharded, and will need to confirm that we can still see these two changes.
+ const postShardCollectionChanges = [
+ {
+ documentKey: {x: 1, _id: 1},
+ fullDocument: {_id: 1, x: 1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {x: -1, _id: -1},
+ fullDocument: {_id: -1, x: -1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }
+ ];
+
+ // Verify that the cursor on the original shard is still valid and sees new inserted
+ // documents. The 'documentKey' field should now include the shard key, even before a
+ // 'kNewShardDetected' operation has been generated by the migration of a chunk to a new
+ // shard.
+ assert.writeOK(mongosColl.insert({_id: 1, x: 1}));
+ assert.writeOK(mongosCollOther.insert({_id: 1, y: 1}));
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});
+
+ // Move the [minKey, 0) chunk to shard1.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {x: -1},
+ to: st.rs1.getURL(),
+ _waitForDelete: true
+ }));
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosCollOther.getFullName(),
+ find: {y: -1},
+ to: st.rs1.getURL(),
+ _waitForDelete: true
+ }));
+
+ // Make sure the change stream cursor sees a document inserted on the recipient shard.
+ assert.writeOK(mongosColl.insert({_id: -1, x: -1}));
+ assert.writeOK(mongosCollOther.insert({_id: -1, y: -1}));
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]});
+
+ // Confirm that we can resume the stream on the sharded collection using the token generated
+ // while the collection was unsharded, whose documentKey contains the _id field but not the
+ // shard key.
+ let resumedCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}],
+ collection: mongosColl
+ });
+
+ // Verify that we see both of the insertions which occurred after the collection was
+ // sharded.
+ cst.assertNextChangesEqual(
+ {cursor: resumedCursor, expectedChanges: postShardCollectionChanges});
+
+ // Test the behavior of a change stream when a sharded collection is dropped and recreated.
+ cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
+ collection: collToWatch
+ });
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Insert a couple documents to shard1, creating a scenario where the getMore to shard0 will
+ // indicate that the change stream is invalidated yet shard1 will still have data to return.
+ assert.writeOK(mongosColl.insert({_id: -2, x: -2}));
+ assert.writeOK(mongosColl.insert({_id: -3, x: -3}));
+
+ // Drop and recreate the collection.
+ mongosColl.drop();
+ mongosDB.createCollection(mongosColl.getName());
+ mongosColl.createIndex({z: 1});
+
+ // Shard the collection on a different shard key and ensure that each shard has a chunk.
+ st.shardColl(mongosColl.getName(), {z: 1}, {z: 0}, {z: -1}, mongosDB.getName());
+
+ assert.writeOK(mongosColl.insert({_id: -1, z: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1, z: 1}));
+
+ // Verify that the change stream picks up the inserts, however the shard key is missing
+ // since the collection has since been dropped and recreated.
+ cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: -2},
+ fullDocument: {_id: -2, x: -2},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: -3},
+ fullDocument: {_id: -3, x: -3},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }
+ ]
+ });
+
+ cst.cleanUp();
+ }
- // Record the resume token for this change, before the collection is sharded.
- const preShardCollectionResumeToken = preShardCollectionChange._id;
-
- // Enable sharding on the previously unsharded collection.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
-
- // Shard the collection on x.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {x: 1}}));
-
- // Ensure that the primary shard has an up-to-date routing table.
- assert.commandWorked(st.rs0.getPrimary().getDB("admin").runCommand(
- {_flushRoutingTableCacheUpdates: mongosColl.getFullName()}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {x: 0}}));
-
- // List the changes we expect to see for the next two operations on the sharded collection.
- // Later, we will resume the stream using the token generated before the collection was sharded,
- // and will need to confirm that we can still see these two changes.
- const postShardCollectionChanges = [
- {
- documentKey: {x: 1, _id: 1},
- fullDocument: {_id: 1, x: 1},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- },
- {
- documentKey: {x: -1, _id: -1},
- fullDocument: {_id: -1, x: -1},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- }
- ];
-
- // Verify that the cursor on the original shard is still valid and sees new inserted documents.
- // The 'documentKey' field should now include the shard key, even before a 'kNewShardDetected'
- // operation has been generated by the migration of a chunk to a new shard.
- assert.writeOK(mongosColl.insert({_id: 1, x: 1}));
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});
-
- // Move the [minKey, 0) chunk to shard1.
- assert.commandWorked(mongosDB.adminCommand({
- moveChunk: mongosColl.getFullName(),
- find: {x: -1},
- to: st.rs1.getURL(),
- _waitForDelete: true
- }));
-
- // Make sure the change stream cursor sees a document inserted on the recipient shard.
- assert.writeOK(mongosColl.insert({_id: -1, x: -1}));
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]});
-
- // Confirm that we can resume the stream on the sharded collection using the token generated
- // while the collection was unsharded, whose documentKey contains the _id field but not the
- // shard key.
- let resumedCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}],
- collection: mongosColl
- });
+ // First test against a change stream on a single collection.
+ testUnshardedBecomesSharded(mongosColl.getName());
- // Verify that we see both of the insertions which occurred after the collection was sharded.
- cst.assertNextChangesEqual(
- {cursor: resumedCursor, expectedChanges: postShardCollectionChanges});
+ // Test against a change stream on the entire database.
+ testUnshardedBecomesSharded(1);
st.stop();
})();