diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-10-04 17:13:24 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-10-09 17:46:10 -0400 |
commit | 4f17acbd9ca2ba9b91a4c72813fcb413146cfdcf (patch) | |
tree | a3429a032a49624b8a6b0be854455de45d9e19f4 /jstests/sharding | |
parent | 893d4efbdfc7d536d7b6c44a9cb31dcdb7f8fd20 (diff) | |
download | mongo-4f17acbd9ca2ba9b91a4c72813fcb413146cfdcf.tar.gz |
SERVER-29141 Enable change streams on sharded collections
Diffstat (limited to 'jstests/sharding')
-rw-r--r-- | jstests/sharding/change_stream_invalidation.js | 91 | ||||
-rw-r--r-- | jstests/sharding/change_stream_remove_shard.js | 176 | ||||
-rw-r--r-- | jstests/sharding/change_streams.js | 169 | ||||
-rw-r--r-- | jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js | 180 | ||||
-rw-r--r-- | jstests/sharding/resume_change_stream.js | 140 |
5 files changed, 707 insertions, 49 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js new file mode 100644 index 00000000000..f4904d5182e --- /dev/null +++ b/jstests/sharding/change_stream_invalidation.js @@ -0,0 +1,91 @@ +// Tests invalidation of change streams on sharded collections. +(function() { + "use strict"; + + load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. + load('jstests/libs/write_concern_util.js'); // For stopReplicationOnSecondaries. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + // We awaited the replication of the first writes, so the change stream shouldn't return them. + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + + // Drop the collection and test that we return "invalidate" entry and close the cursor. + mongosColl.drop(); + st.rs0.awaitReplication(); + st.rs1.awaitReplication(); + + // Test that we see the two writes that happened before the invalidation. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, -1); + const resumeTokenFromFirstUpdate = next._id; + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, 1); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + + assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed"); + + // Test that it is not possible to resume a change stream after a collection has been dropped. + // Once it's been dropped, we won't be able to figure out the shard key. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], + readConcern: {level: "majority"}, + cursor: {} + }), + 40615); + + st.stop(); +})(); diff --git a/jstests/sharding/change_stream_remove_shard.js b/jstests/sharding/change_stream_remove_shard.js new file mode 100644 index 00000000000..1cb8678e9f1 --- /dev/null +++ b/jstests/sharding/change_stream_remove_shard.js @@ -0,0 +1,176 @@ +// Tests the behavior of removing a shard while a change stream is open. +(function() { + "use strict"; + + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). + load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + // This test only works on storage engines that support committed reads, skip it if the + // configured engine doesn't support it. + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + // Use a ShardingTest with 3 shards to ensure there can still be at least 2 left after removing + // one. This will ensure the change stream is still merging on mongos after the removal, and + // cannot forward the entire pipeline to the shards. + const st = new ShardingTest({ + shards: 3, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use the noop writer with a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 3 chunks: [MinKey, 0), [0, 1000), and [1000, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 1000}})); + + // Move the [0, 1000) chunk to shard 1. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 0}, to: st.rs1.getURL()})); + + // Move the [1000, MaxKey] chunk to shard 2. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1000}, to: st.rs2.getURL()})); + + // Use a small batch size to enable us to spread the iteration of the shard's cursor over + // multiple getMores. + const batchSize = 2; + let changeStream = mongosColl.aggregate([{$changeStream: {}}], {batchSize: batchSize}); + + // Write some documents for the change stream to consume. Be sure to write enough to each shard + // that we can't consume them all in one batch. + for (let i = 0; i < 2 * batchSize; ++i) { + assert.writeOK(mongosColl.insert({_id: -1 - i}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: i}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1000 + i}, {writeConcern: {w: "majority"}})); + } + + // Remove shard 2. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + to: st.rs0.getURL(), + find: {_id: 1000}, + _waitForDelete: true + })); + let removeStatus; + assert.soon(function() { + removeStatus = assert.commandWorked(mongosDB.adminCommand({removeShard: st.rs2.getURL()})); + return removeStatus.state === "completed"; + }, () => `Shard removal timed out, most recent removeShard response: ${tojson(removeStatus)}`); + + // The shard removal will not invalidate any cursors, so we still expect to be able to see + // changes as long as the shard is still running. + let resumeTokenFromShard2; + for (let nextChangeId of[-1, 0, 1000]) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey._id, nextChangeId); + if (nextChangeId === 1000) { + resumeTokenFromShard2 = next._id; + } + } + + // Now actually stop the removed shard, eventually the change stream should get an error. + st.rs2.stopSet(); + assert.soon(function() { + try { + // We should encounter an error before running out of changes. + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "insert"); + } catch (error) { + return true; + } + return false; + }, "Expected change stream to error due to missing host"); + + // Test that it is not possible to resume a change stream with a resume token from the removed + // shard, which will not exist anymore. + + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromShard2}}], + expectedCode: 40585 + }); + + // Now do the same test, only this time there will be only one shard remaining after removing + // shard 1. + + changeStream = mongosColl.aggregate([{$changeStream: {}}], {batchSize: batchSize}); + + // Insert more than one batch of changes on each (remaining) shard. + const nDocsInEachChunkAfterFirstRemoval = 2 * batchSize; + for (let i = 0; i < 2 * batchSize; ++i) { + assert.writeOK(mongosColl.insert({_id: -1 - nDocsInEachChunkAfterFirstRemoval - i})); + assert.writeOK(mongosColl.insert({_id: nDocsInEachChunkAfterFirstRemoval + i})); + } + + // Remove shard 1. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: mongosColl.getFullName(), + to: st.rs0.getURL(), + find: {_id: 0}, + _waitForDelete: true + })); + assert.soon(function() { + removeStatus = assert.commandWorked(mongosDB.adminCommand({removeShard: st.rs1.getURL()})); + return removeStatus.state === "completed"; + }, () => `Shard removal timed out, most recent removeShard response: ${tojson(removeStatus)}`); + + // The shard removal will not invalidate any cursors, so we still expect to be able to see + // changes as long as the shard is still running. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey._id, -1 - nDocsInEachChunkAfterFirstRemoval); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey._id, nDocsInEachChunkAfterFirstRemoval); + const resumeTokenFromShard1 = next._id; + + // Stop the removed shard, eventually the change stream should get an error. + st.rs1.stopSet(); + assert.soon(function() { + try { + // We should encounter an error before running out of changes. + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "insert"); + } catch (error) { + return true; + } + return false; + }, "Expected change stream to error due to missing host"); + + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromShard1}}], + expectedCode: 40585 + }); + + st.stop(); +})(); diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js new file mode 100644 index 00000000000..200190000da --- /dev/null +++ b/jstests/sharding/change_streams.js @@ -0,0 +1,169 @@ +// Tests the behavior of change streams on sharded collections. +(function() { + "use strict"; + + load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). + load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey) chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1})); + assert.writeOK(mongosColl.insert({_id: 1})); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0}}]); + + // Test that a change stream can see inserts on shard 0. + assert.writeOK(mongosColl.insert({_id: 1000})); + assert.writeOK(mongosColl.insert({_id: -1000})); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); + assert.docEq(changeStream.next(), { + documentKey: {_id: 1000}, + fullDocument: {_id: 1000}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Now do another write to shard 0, advancing that shard's clock and enabling the stream to + // return the earlier write to shard 1. + assert.writeOK(mongosColl.insert({_id: 1001})); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); + assert.docEq(changeStream.next(), { + documentKey: {_id: -1000}, + fullDocument: {_id: -1000}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Test that all changes are eventually visible due to the periodic noop writer. + assert.commandWorked( + st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.commandWorked( + st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.soon(() => changeStream.hasNext()); + + assert.docEq(changeStream.next(), { + documentKey: {_id: 1001}, + fullDocument: {_id: 1001}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + changeStream.close(); + + // Test that using change streams with any stages not allowed to run on mongos results in an + // error. + assertErrorCode(mongosColl, [{$changeStream: {fullDocument: "updateLookup"}}], 40470); + assertErrorCode( + mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); + + // Test that it is legal to open a change stream, even if the + // 'internalQueryProhibitMergingOnMongos' parameter is set. + assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); + let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); + tempCursor.close(); + // TODO SERVER-29137: $sort and $group should be banned. + tempCursor = assert.doesNotThrow( + () => mongosColl.aggregate( + [{$changeStream: {}}, {$sort: {operationType: 1}}, {$group: {_id: "$documentKey"}}])); + tempCursor.close(); + assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); + + assert.writeOK(mongosColl.remove({})); + // We awaited the replication of the first write, so the change stream shouldn't return it. + // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. + assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]); + assert(!changeStream.hasNext()); + + // Drop the collection and test that we return "invalidate" entry and close the cursor. + jsTestLog("Testing getMore command closes cursor for invalidate entries"); + mongosColl.drop(); + // Wait for the drop to actually happen. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed"); + + jsTestLog("Testing aggregate command closes cursor for invalidate entries"); + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + // Move the [0, MaxKey) chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write one document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + // Get a valid resume token that the next aggregate command can use. + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + + assert.soon(() => changeStream.hasNext()); + const resumeToken = changeStream.next()._id; + + // It should not possible to resume a change stream after a collection drop, even if the + // invalidate has not been received. + assert(mongosColl.drop()); + // Wait for the drop to actually happen. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + expectedCode: 40615 + }); + +})(); diff --git a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js b/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js index 6292903250c..523eacf47d9 100644 --- a/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js +++ b/jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js @@ -1,5 +1,7 @@ // Tests that change stream returns a special entry and close the cursor when it's migrating // a chunk to a new shard. +// TODO: SERVER-30834 the mongos should internally swallow and automatically retry the 'retryNeeded' +// entries, so the client shouldn't see any invalidations. (function() { 'use strict'; @@ -11,74 +13,154 @@ return; } - const st = new ShardingTest({ - shards: 2, - mongos: 1, - rs: {nodes: 1}, - other: {rsOptions: {enableMajorityReadConcern: ""}} - }); + const rsNodeOptions = { + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + }; + const st = + new ShardingTest({shards: 2, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}}); const mongos = st.s; - const admin = mongos.getDB('admin'); - const coll = mongos.getCollection('foo.bar'); - const dbOnShard = st.rs0.getPrimary().getDB('foo'); + const mongosColl = mongos.getCollection('test.foo'); + const mongosDB = mongos.getDB("test"); - // Shard collection. - assert.commandWorked(mongos.adminCommand({enableSharding: coll.getDB().getName()})); + // Enable sharding to inform mongos of the database, allowing us to open a cursor. + assert.commandWorked(mongos.adminCommand({enableSharding: mongosDB.getName()})); - // Just to be sure what primary we start from. - st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName); - assert.commandWorked(mongos.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); + // Make sure all chunks start on shard 0. + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); - st.rs0.awaitReplication(); - let res = assert.commandWorked(dbOnShard.runCommand( - {aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}})); - assert.neq(res.cursor.id, 0); - assert.eq(res.cursor.firstBatch.length, 0); + // Open a change stream cursor before the collection is sharded. + const changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext(), "Do not expect any results yet"); + + // Once we have a cursor, actually shard the collection. + assert.commandWorked( + mongos.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); // Insert two documents. - assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}})); - assert.writeOK(coll.insert({_id: 20}, {writeConcern: {w: "majority"}})); - mongos.adminCommand({split: coll.getFullName(), middle: {_id: 10}}); + assert.writeOK(mongosColl.insert({_id: 0}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 20}, {writeConcern: {w: "majority"}})); + + // Split the collection into two chunks: [MinKey, 10) and [10, MaxKey]. + assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 10}})); - // Migrate the first chunk to shard1. + // Migrate the [10, MaxKey] chunk to shard1. assert.commandWorked(mongos.adminCommand({ - moveChunk: coll.getFullName(), - find: {_id: 0}, + moveChunk: mongosColl.getFullName(), + find: {_id: 20}, to: st.shard1.shardName, _waitForDelete: true })); - res = assert.commandWorked( - dbOnShard.runCommand({getMore: res.cursor.id, collection: coll.getName()})); - assert.eq(res.cursor.nextBatch.length, 3); - assert.eq(res.cursor.nextBatch[0].operationType, "insert"); - assert.eq(res.cursor.nextBatch[1].operationType, "insert"); - assert.eq(res.cursor.nextBatch[2].operationType, "retryNeeded"); - const resumeToken = res.cursor.nextBatch[2]._id; - // Verify the cursor has been closed since the chunk migrated is the first chunk on shard1. - assert.eq(res.cursor.id, 0); - - // Change stream only gets closed on the first chunk migration to a new shard. - // Verify the second chunk migration doesn't close cursors. - assert.writeOK(coll.insert({_id: 30}, {writeConcern: {w: "majority"}})); - // Migrate the second chunk to shard1. + for (let id of[0, 20]) { + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: id}); + } + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "retryNeeded"); + const retryResumeToken = next._id; + + // A change stream only gets closed on the first chunk migration to a new shard. Test that + // another chunk split and migration does not invalidate the cursor. + const resumedCursor = mongosColl.aggregate([{$changeStream: {resumeAfter: retryResumeToken}}]); + + // Insert into both the chunks. + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 21}, {writeConcern: {w: "majority"}})); + + // Split again, and move a second chunk to the first shard. The new chunks are: + // [MinKey, 0), [0, 10), and [10, MaxKey]. + assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); assert.commandWorked(mongos.adminCommand({ - moveChunk: coll.getFullName(), - find: {_id: 20}, + moveChunk: mongosColl.getFullName(), + find: {_id: 5}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Insert again, into all three chunks. + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 22}, {writeConcern: {w: "majority"}})); + + // Make sure we can see all the inserts, without any 'retryNeeded' entries. + for (let nextExpectedId of[1, 21, -2, 2, 22]) { + assert.soon(() => resumedCursor.hasNext()); + assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId}); + } + + // Verify the original cursor has been closed since the first migration, and that it can't see + // any new inserts. + assert(!changeStream.hasNext()); + + // Test that migrating the last chunk to shard 1 (meaning all chunks are now on the same shard) + // will not invalidate the change stream. + + // Insert into all three chunks. + assert.writeOK(mongosColl.insert({_id: -3}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 3}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 23}, {writeConcern: {w: "majority"}})); + + // Move the last chunk, [MinKey, 0), to shard 1. + assert.commandWorked(mongos.adminCommand({ + moveChunk: mongosColl.getFullName(), + find: {_id: -5}, to: st.shard1.shardName, _waitForDelete: true })); - res = assert.commandWorked(dbOnShard.runCommand({ - aggregate: coll.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {} + // Insert again, into all three chunks. + assert.writeOK(mongosColl.insert({_id: -4}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 4}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 24}, {writeConcern: {w: "majority"}})); + + // Make sure we can see all the inserts, without any 'retryNeeded' entries. + assert.soon(() => resumedCursor.hasNext()); + for (let nextExpectedId of[-3, 3, 23, -4, 4, 24]) { + assert.soon(() => resumedCursor.hasNext()); + assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId}); + } + + // Now test that adding a new shard and migrating a chunk to it will again invalidate the + // cursor. + const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions}); + newShard.startSet({shardsvr: ''}); + newShard.initiate(); + assert.commandWorked(mongos.adminCommand({addShard: newShard.getURL(), name: "newShard"})); + + // At this point, there haven't been any migrations to that shard, so we should still be able to + // use the change stream. + assert.writeOK(mongosColl.insert({_id: -5}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 5}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 25}, {writeConcern: {w: "majority"}})); + + for (let nextExpectedId of[-5, 5, 25]) { + assert.soon(() => resumedCursor.hasNext()); + assert.eq(resumedCursor.next().documentKey, {_id: nextExpectedId}); + } + + // Now migrate a chunk to the new shard and verify the stream is closed. + assert.commandWorked(mongos.adminCommand({ + moveChunk: mongosColl.getFullName(), + find: {_id: 20}, + to: "newShard", + _waitForDelete: true })); - assert.eq(res.cursor.firstBatch.length, 1); - assert.eq(res.cursor.firstBatch[0].operationType, "insert"); - // Verify the cursor is not closed. - assert.neq(res.cursor.id, 0); + assert.writeOK(mongosColl.insert({_id: -6}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 6}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 26}, {writeConcern: {w: "majority"}})); + + // We again need to wait for the noop writer on shard 0 to ensure we can return the new results + // (in this case the 'retryNeeded' entry) from shard 1. + assert.soon(() => resumedCursor.hasNext()); + assert.eq(resumedCursor.next().operationType, "retryNeeded"); + assert(!resumedCursor.hasNext()); st.stop(); + newShard.stopSet(); })(); diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js new file mode 100644 index 00000000000..0e1b751a339 --- /dev/null +++ b/jstests/sharding/resume_change_stream.js @@ -0,0 +1,140 @@ +// Tests resuming change streams on sharded collections. +// We need to use a readConcern in this test, which requires read commands. +// @tags: [requires_find_command] +(function() { + "use strict"; + + jsTestLog("Skipping test until SERVER-31475 is resolved"); + return; + + load('jstests/replsets/rslib.js'); // For getLatestOp. + load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + + // For supportsMajorityReadConcern. + load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + + // This test only works on storage engines that support committed reads, skip it if the + // configured engine doesn't support it. + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const oplogSize = 1; // size in MB + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + oplogSize: oplogSize, + enableMajorityReadConcern: '', + // Use the noop writer with a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey] chunk to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + // We awaited the replication of the first writes, so the change stream shouldn't return them. + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + st.rs0.awaitReplication(); + st.rs1.awaitReplication(); + + // Test that we see the two writes, and remember their resume tokens. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, -1); + const resumeTokenFromFirstUpdateOnShard0 = next._id; + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, 1); + const resumeTokenFromFirstUpdateOnShard1 = next._id; + + changeStream.close(); + + // Write some additional documents, then test that it's possible to resume after the first + // update. + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + changeStream = + mongosColl.aggregate([{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}]); + + for (let nextExpectedId of[1, -2, 2]) { + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().documentKey._id, nextExpectedId); + } + + changeStream.close(); + + // Test that the stream can't resume if the resume token is no longer present in the oplog. + + // Roll over the entire oplog on the shard with the resume token for the first update. + const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1. + const mostRecentOplogEntry = getLatestOp(shardWithResumeToken); + assert.neq(mostRecentOplogEntry, null); + const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); + let i = 0; + + function oplogIsRolledOver() { + // The oplog is rolled over if what used to be the most recent thing is now older than the + // oldest thing in the oplog. Said another way, the oplog is rolled over when everything in + // the oplog is newer than what used to be the newest entry. + return bsonWoCompare( + mostRecentOplogEntry.ts, + getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) < + 0; + } + + while (!oplogIsRolledOver()) { + assert.writeOK(mongosColl.insert({_id: 100 + i++, long_str: largeStr}, + {writeConcern: {w: "majority"}})); + sleep(100); + } + + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + expectedCode: 40576 + }); + + // Test that the change stream can't resume if the resume token *is* present in the oplog, but + // one of the shards has rolled over its oplog enough that it doesn't have a long enough history + // to resume. Since we just rolled over the oplog on shard 1, we know that + // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't have + // any changes earlier than that, so won't be able to resume. + ChangeStreamTest.assertChangeStreamThrowsCode({ + collection: mongosColl, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + expectedCode: 40576 + }); + + st.stop(); +})(); |