summaryrefslogtreecommitdiff
path: root/jstests/sharding
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-10-04 17:13:24 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-10-09 17:46:10 -0400
commit4f17acbd9ca2ba9b91a4c72813fcb413146cfdcf (patch)
treea3429a032a49624b8a6b0be854455de45d9e19f4 /jstests/sharding
parent893d4efbdfc7d536d7b6c44a9cb31dcdb7f8fd20 (diff)
downloadmongo-4f17acbd9ca2ba9b91a4c72813fcb413146cfdcf.tar.gz
SERVER-29141 Enable change streams on sharded collections
Diffstat (limited to 'jstests/sharding')
-rw-r--r--jstests/sharding/change_stream_invalidation.js91
-rw-r--r--jstests/sharding/change_stream_remove_shard.js176
-rw-r--r--jstests/sharding/change_streams.js169
-rw-r--r--jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js180
-rw-r--r--jstests/sharding/resume_change_stream.js140
5 files changed, 707 insertions, 49 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js
new file mode 100644
index 00000000000..f4904d5182e
--- /dev/null
+++ b/jstests/sharding/change_stream_invalidation.js
@@ -0,0 +1,91 @@
+// Tests invalidation of change streams on sharded collections.
+(function() {
+ "use strict";
+
+ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+ load('jstests/libs/write_concern_util.js'); // For stopReplicationOnSecondaries.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // We awaited the replication of the first writes, so the change stream shouldn't return them.
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+
+ // Drop the collection and test that we return "invalidate" entry and close the cursor.
+ mongosColl.drop();
+ st.rs0.awaitReplication();
+ st.rs1.awaitReplication();
+
+ // Test that we see the two writes that happened before the invalidation.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, -1);
+ const resumeTokenFromFirstUpdate = next._id;
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, 1);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+
+ assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed");
+
+ // Test that it is not possible to resume a change stream after a collection has been dropped.
+ // Once it's been dropped, we won't be able to figure out the shard key.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ readConcern: {level: "majority"},
+ cursor: {}
+ }),
+ 40615);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/change_stream_remove_shard.js b/jstests/sharding/change_stream_remove_shard.js
new file mode 100644
index 00000000000..1cb8678e9f1
--- /dev/null
+++ b/jstests/sharding/change_stream_remove_shard.js
@@ -0,0 +1,176 @@
+// Tests the behavior of removing a shard while a change stream is open.
+(function() {
+ "use strict";
+
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
+ load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ // This test only works on storage engines that support committed reads, skip it if the
+ // configured engine doesn't support it.
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ // Use a ShardingTest with 3 shards to ensure there can still be at least 2 left after removing
+ // one. This will ensure the change stream is still merging on mongos after the removal, and
+ // cannot forward the entire pipeline to the shards.
+ const st = new ShardingTest({
+ shards: 3,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use the noop writer with a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 3 chunks: [MinKey, 0), [0, 1000), and [1000, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 1000}}));
+
+ // Move the [0, 1000) chunk to shard 1.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 0}, to: st.rs1.getURL()}));
+
+ // Move the [1000, MaxKey] chunk to shard 2.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1000}, to: st.rs2.getURL()}));
+
+ // Use a small batch size to enable us to spread the iteration of the shard's cursor over
+ // multiple getMores.
+ const batchSize = 2;
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}], {batchSize: batchSize});
+
+ // Write some documents for the change stream to consume. Be sure to write enough to each shard
+ // that we can't consume them all in one batch.
+ for (let i = 0; i < 2 * batchSize; ++i) {
+ assert.writeOK(mongosColl.insert({_id: -1 - i}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: i}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1000 + i}, {writeConcern: {w: "majority"}}));
+ }
+
+ // Remove shard 2.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ to: st.rs0.getURL(),
+ find: {_id: 1000},
+ _waitForDelete: true
+ }));
+ let removeStatus;
+ assert.soon(function() {
+ removeStatus = assert.commandWorked(mongosDB.adminCommand({removeShard: st.rs2.getURL()}));
+ return removeStatus.state === "completed";
+ }, () => `Shard removal timed out, most recent removeShard response: ${tojson(removeStatus)}`);
+
+ // The shard removal will not invalidate any cursors, so we still expect to be able to see
+ // changes as long as the shard is still running.
+ let resumeTokenFromShard2;
+ for (let nextChangeId of[-1, 0, 1000]) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey._id, nextChangeId);
+ if (nextChangeId === 1000) {
+ resumeTokenFromShard2 = next._id;
+ }
+ }
+
+ // Now actually stop the removed shard, eventually the change stream should get an error.
+ st.rs2.stopSet();
+ assert.soon(function() {
+ try {
+ // We should encounter an error before running out of changes.
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "insert");
+ } catch (error) {
+ return true;
+ }
+ return false;
+ }, "Expected change stream to error due to missing host");
+
+ // Test that it is not possible to resume a change stream with a resume token from the removed
+ // shard, which will not exist anymore.
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromShard2}}],
+ expectedCode: 40585
+ });
+
+ // Now do the same test, only this time there will be only one shard remaining after removing
+ // shard 1.
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}], {batchSize: batchSize});
+
+ // Insert more than one batch of changes on each (remaining) shard.
+ const nDocsInEachChunkAfterFirstRemoval = 2 * batchSize;
+ for (let i = 0; i < 2 * batchSize; ++i) {
+ assert.writeOK(mongosColl.insert({_id: -1 - nDocsInEachChunkAfterFirstRemoval - i}));
+ assert.writeOK(mongosColl.insert({_id: nDocsInEachChunkAfterFirstRemoval + i}));
+ }
+
+ // Remove shard 1.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ to: st.rs0.getURL(),
+ find: {_id: 0},
+ _waitForDelete: true
+ }));
+ assert.soon(function() {
+ removeStatus = assert.commandWorked(mongosDB.adminCommand({removeShard: st.rs1.getURL()}));
+ return removeStatus.state === "completed";
+ }, () => `Shard removal timed out, most recent removeShard response: ${tojson(removeStatus)}`);
+
+ // The shard removal will not invalidate any cursors, so we still expect to be able to see
+ // changes as long as the shard is still running.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey._id, -1 - nDocsInEachChunkAfterFirstRemoval);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey._id, nDocsInEachChunkAfterFirstRemoval);
+ const resumeTokenFromShard1 = next._id;
+
+ // Stop the removed shard, eventually the change stream should get an error.
+ st.rs1.stopSet();
+ assert.soon(function() {
+ try {
+ // We should encounter an error before running out of changes.
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "insert");
+ } catch (error) {
+ return true;
+ }
+ return false;
+ }, "Expected change stream to error due to missing host");
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromShard1}}],
+ expectedCode: 40585
+ });
+
+ st.stop();
+})();
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
new file mode 100644
index 00000000000..200190000da
--- /dev/null
+++ b/jstests/sharding/change_streams.js
@@ -0,0 +1,169 @@
+// Tests the behavior of change streams on sharded collections.
+(function() {
+ "use strict";
+
+ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
+ load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey) chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1}));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0}}]);
+
+ // Test that a change stream can see inserts on shard 0.
+ assert.writeOK(mongosColl.insert({_id: 1000}));
+ assert.writeOK(mongosColl.insert({_id: -1000}));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: 1000},
+ fullDocument: {_id: 1000},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Now do another write to shard 0, advancing that shard's clock and enabling the stream to
+ // return the earlier write to shard 1.
+ assert.writeOK(mongosColl.insert({_id: 1001}));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: -1000},
+ fullDocument: {_id: -1000},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Test that all changes are eventually visible due to the periodic noop writer.
+ assert.commandWorked(
+ st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.commandWorked(
+ st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.soon(() => changeStream.hasNext());
+
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: 1001},
+ fullDocument: {_id: 1001},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+ changeStream.close();
+
+ // Test that using change streams with any stages not allowed to run on mongos results in an
+ // error.
+ assertErrorCode(mongosColl, [{$changeStream: {fullDocument: "updateLookup"}}], 40470);
+ assertErrorCode(
+ mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
+
+ // Test that it is legal to open a change stream, even if the
+ // 'internalQueryProhibitMergingOnMongos' parameter is set.
+ assert.commandWorked(
+ mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
+ let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
+ tempCursor.close();
+ // TODO SERVER-29137: $sort and $group should be banned.
+ tempCursor = assert.doesNotThrow(
+ () => mongosColl.aggregate(
+ [{$changeStream: {}}, {$sort: {operationType: 1}}, {$group: {_id: "$documentKey"}}]));
+ tempCursor.close();
+ assert.commandWorked(
+ mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
+
+ assert.writeOK(mongosColl.remove({}));
+ // We awaited the replication of the first write, so the change stream shouldn't return it.
+ // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
+ assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]);
+ assert(!changeStream.hasNext());
+
+ // Drop the collection and test that we return "invalidate" entry and close the cursor.
+ jsTestLog("Testing getMore command closes cursor for invalidate entries");
+ mongosColl.drop();
+ // Wait for the drop to actually happen.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed");
+
+ jsTestLog("Testing aggregate command closes cursor for invalidate entries");
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ // Move the [0, MaxKey) chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write one document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ // Get a valid resume token that the next aggregate command can use.
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+
+ assert.soon(() => changeStream.hasNext());
+ const resumeToken = changeStream.next()._id;
+
+ // It should not possible to resume a change stream after a collection drop, even if the
+ // invalidate has not been received.
+ assert(mongosColl.drop());
+ // Wait for the drop to actually happen.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ expectedCode: 40615
+ });
+
+})();
diff --git a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js b/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
index 6292903250c..523eacf47d9 100644
--- a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
+++ b/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
@@ -1,5 +1,7 @@
// Tests that change stream returns a special entry and close the cursor when it's migrating
// a chunk to a new shard.
+// TODO: SERVER-30834 the mongos should internally swallow and automatically retry the 'retryNeeded'
+// entries, so the client shouldn't see any invalidations.
(function() {
'use strict';
@@ -11,74 +13,154 @@
return;
}
- const st = new ShardingTest({
- shards: 2,
- mongos: 1,
- rs: {nodes: 1},
- other: {rsOptions: {enableMajorityReadConcern: ""}}
- });
+ const rsNodeOptions = {
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ };
+ const st =
+ new ShardingTest({shards: 2, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
const mongos = st.s;
- const admin = mongos.getDB('admin');
- const coll = mongos.getCollection('foo.bar');
- const dbOnShard = st.rs0.getPrimary().getDB('foo');
+ const mongosColl = mongos.getCollection('test.foo');
+ const mongosDB = mongos.getDB("test");
- // Shard collection.
- assert.commandWorked(mongos.adminCommand({enableSharding: coll.getDB().getName()}));
+ // Enable sharding to inform mongos of the database, allowing us to open a cursor.
+ assert.commandWorked(mongos.adminCommand({enableSharding: mongosDB.getName()}));
- // Just to be sure what primary we start from.
- st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName);
- assert.commandWorked(mongos.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
+ // Make sure all chunks start on shard 0.
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
- st.rs0.awaitReplication();
- let res = assert.commandWorked(dbOnShard.runCommand(
- {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}));
- assert.neq(res.cursor.id, 0);
- assert.eq(res.cursor.firstBatch.length, 0);
+ // Open a change stream cursor before the collection is sharded.
+ const changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext(), "Do not expect any results yet");
+
+ // Once we have a cursor, actually shard the collection.
+ assert.commandWorked(
+ mongos.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
// Insert two documents.
- assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
- assert.writeOK(coll.insert({_id: 20}, {writeConcern: {w: "majority"}}));
- mongos.adminCommand({split: coll.getFullName(), middle: {_id: 10}});
+ assert.writeOK(mongosColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 20}, {writeConcern: {w: "majority"}}));
+
+ // Split the collection into two chunks: [MinKey, 10) and [10, MaxKey].
+ assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 10}}));
- // Migrate the first chunk to shard1.
+ // Migrate the [10, MaxKey] chunk to shard1.
assert.commandWorked(mongos.adminCommand({
- moveChunk: coll.getFullName(),
- find: {_id: 0},
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: 20},
to: st.shard1.shardName,
_waitForDelete: true
}));
- res = assert.commandWorked(
- dbOnShard.runCommand({getMore: res.cursor.id, collection: coll.getName()}));
- assert.eq(res.cursor.nextBatch.length, 3);
- assert.eq(res.cursor.nextBatch[0].operationType, "insert");
- assert.eq(res.cursor.nextBatch[1].operationType, "insert");
- assert.eq(res.cursor.nextBatch[2].operationType, "retryNeeded");
- const resumeToken = res.cursor.nextBatch[2]._id;
- // Verify the cursor has been closed since the chunk migrated is the first chunk on shard1.
- assert.eq(res.cursor.id, 0);
-
- // Change stream only gets closed on the first chunk migration to a new shard.
- // Verify the second chunk migration doesn't close cursors.
- assert.writeOK(coll.insert({_id: 30}, {writeConcern: {w: "majority"}}));
- // Migrate the second chunk to shard1.
+ for (let id of[0, 20]) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: id});
+ }
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "retryNeeded");
+ const retryResumeToken = next._id;
+
+ // A change stream only gets closed on the first chunk migration to a new shard. Test that
+ // another chunk split and migration does not invalidate the cursor.
+ const resumedCursor = mongosColl.aggregate([{$changeStream: {resumeAfter: retryResumeToken}}]);
+
+ // Insert into both the chunks.
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 21}, {writeConcern: {w: "majority"}}));
+
+ // Split again, and move a second chunk to the first shard. The new chunks are:
+ // [MinKey, 0), [0, 10), and [10, MaxKey].
+ assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
assert.commandWorked(mongos.adminCommand({
- moveChunk: coll.getFullName(),
- find: {_id: 20},
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: 5},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Insert again, into all three chunks.
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 22}, {writeConcern: {w: "majority"}}));
+
+ // Make sure we can see all the inserts, without any 'retryNeeded' entries.
+ for (let nextExpectedId of[1, 21, -2, 2, 22]) {
+ assert.soon(() => resumedCursor.hasNext());
+ assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ }
+
+ // Verify the original cursor has been closed since the first migration, and that it can't see
+ // any new inserts.
+ assert(!changeStream.hasNext());
+
+ // Test that migrating the last chunk to shard 1 (meaning all chunks are now on the same shard)
+ // will not invalidate the change stream.
+
+ // Insert into all three chunks.
+ assert.writeOK(mongosColl.insert({_id: -3}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 3}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 23}, {writeConcern: {w: "majority"}}));
+
+ // Move the last chunk, [MinKey, 0), to shard 1.
+ assert.commandWorked(mongos.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: -5},
to: st.shard1.shardName,
_waitForDelete: true
}));
- res = assert.commandWorked(dbOnShard.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
+ // Insert again, into all three chunks.
+ assert.writeOK(mongosColl.insert({_id: -4}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 4}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 24}, {writeConcern: {w: "majority"}}));
+
+ // Make sure we can see all the inserts, without any 'retryNeeded' entries.
+ assert.soon(() => resumedCursor.hasNext());
+ for (let nextExpectedId of[-3, 3, 23, -4, 4, 24]) {
+ assert.soon(() => resumedCursor.hasNext());
+ assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ }
+
+ // Now test that adding a new shard and migrating a chunk to it will again invalidate the
+ // cursor.
+ const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions});
+ newShard.startSet({shardsvr: ''});
+ newShard.initiate();
+ assert.commandWorked(mongos.adminCommand({addShard: newShard.getURL(), name: "newShard"}));
+
+ // At this point, there haven't been any migrations to that shard, so we should still be able to
+ // use the change stream.
+ assert.writeOK(mongosColl.insert({_id: -5}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 5}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 25}, {writeConcern: {w: "majority"}}));
+
+ for (let nextExpectedId of[-5, 5, 25]) {
+ assert.soon(() => resumedCursor.hasNext());
+ assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId});
+ }
+
+ // Now migrate a chunk to the new shard and verify the stream is closed.
+ assert.commandWorked(mongos.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {_id: 20},
+ to: "newShard",
+ _waitForDelete: true
}));
- assert.eq(res.cursor.firstBatch.length, 1);
- assert.eq(res.cursor.firstBatch[0].operationType, "insert");
- // Verify the cursor is not closed.
- assert.neq(res.cursor.id, 0);
+ assert.writeOK(mongosColl.insert({_id: -6}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 6}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 26}, {writeConcern: {w: "majority"}}));
+
+ // We again need to wait for the noop writer on shard 0 to ensure we can return the new results
+ // (in this case the 'retryNeeded' entry) from shard 1.
+ assert.soon(() => resumedCursor.hasNext());
+ assert.eq(resumedCursor.next().operationType, "retryNeeded");
+ assert(!resumedCursor.hasNext());
st.stop();
+ newShard.stopSet();
})();
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
new file mode 100644
index 00000000000..0e1b751a339
--- /dev/null
+++ b/jstests/sharding/resume_change_stream.js
@@ -0,0 +1,140 @@
+// Tests resuming change streams on sharded collections.
+// We need to use a readConcern in this test, which requires read commands.
+// @tags: [requires_find_command]
+(function() {
+ "use strict";
+
+ jsTestLog("Skipping test until SERVER-31475 is resolved");
+ return;
+
+ load('jstests/replsets/rslib.js'); // For getLatestOp.
+ load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+ // For supportsMajorityReadConcern.
+ load('jstests/multiVersion/libs/causal_consistency_helpers.js');
+
+ // This test only works on storage engines that support committed reads, skip it if the
+ // configured engine doesn't support it.
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const oplogSize = 1; // size in MB
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ oplogSize: oplogSize,
+ enableMajorityReadConcern: '',
+ // Use the noop writer with a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey] chunk to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // We awaited the replication of the first writes, so the change stream shouldn't return them.
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+ st.rs0.awaitReplication();
+ st.rs1.awaitReplication();
+
+ // Test that we see the two writes, and remember their resume tokens.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, -1);
+ const resumeTokenFromFirstUpdateOnShard0 = next._id;
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, 1);
+ const resumeTokenFromFirstUpdateOnShard1 = next._id;
+
+ changeStream.close();
+
+ // Write some additional documents, then test that it's possible to resume after the first
+ // update.
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+ changeStream =
+ mongosColl.aggregate([{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}]);
+
+ for (let nextExpectedId of[1, -2, 2]) {
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().documentKey._id, nextExpectedId);
+ }
+
+ changeStream.close();
+
+ // Test that the stream can't resume if the resume token is no longer present in the oplog.
+
+ // Roll over the entire oplog on the shard with the resume token for the first update.
+ const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1.
+ const mostRecentOplogEntry = getLatestOp(shardWithResumeToken);
+ assert.neq(mostRecentOplogEntry, null);
+ const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+ let i = 0;
+
+ function oplogIsRolledOver() {
+ // The oplog is rolled over if what used to be the most recent thing is now older than the
+ // oldest thing in the oplog. Said another way, the oplog is rolled over when everything in
+ // the oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(
+ mostRecentOplogEntry.ts,
+ getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) <
+ 0;
+ }
+
+ while (!oplogIsRolledOver()) {
+ assert.writeOK(mongosColl.insert({_id: 100 + i++, long_str: largeStr},
+ {writeConcern: {w: "majority"}}));
+ sleep(100);
+ }
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ expectedCode: 40576
+ });
+
+ // Test that the change stream can't resume if the resume token *is* present in the oplog, but
+ // one of the shards has rolled over its oplog enough that it doesn't have a long enough history
+ // to resume. Since we just rolled over the oplog on shard 1, we know that
+ // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't have
+ // any changes earlier than that, so won't be able to resume.
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ collection: mongosColl,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ expectedCode: 40576
+ });
+
+ st.stop();
+})();