diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-09 12:25:35 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-04 12:09:50 -0400 |
commit | 753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch) | |
tree | 0cd094ebb24806358e02d111cc47dadf113484ff /jstests | |
parent | 851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff) | |
download | mongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz |
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/sharding/change_stream_invalidation.js | 34 | ||||
-rw-r--r-- | jstests/sharding/change_streams_primary_shard_unaware.js | 2 | ||||
-rw-r--r-- | jstests/sharding/change_streams_whole_db.js | 34 |
3 files changed, 52 insertions, 18 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js index 396db586c4f..eb51724f8ea 100644 --- a/jstests/sharding/change_stream_invalidation.js +++ b/jstests/sharding/change_stream_invalidation.js @@ -30,50 +30,53 @@ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - // Shard the test collection on _id. + // Shard the test collection on a field called 'shardKey'. assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}})); // Move the [0, MaxKey] chunk to st.shard1.shardName. assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()})); // Write a document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}})); - let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + let changeStream = mongosColl.watch(); // We awaited the replication of the first writes, so the change stream shouldn't return them. - assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); - assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2})); // Drop the collection and test that we return "invalidate" entry and close the cursor. mongosColl.drop(); - st.rs0.awaitReplication(); - st.rs1.awaitReplication(); // Test that we see the two writes that happened before the invalidation. assert.soon(() => changeStream.hasNext()); let next = changeStream.next(); assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, -1); + assert.eq(next.documentKey.shardKey, -1); const resumeTokenFromFirstUpdate = next._id; assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, 1); + assert.eq(next.documentKey.shardKey, 1); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); - assert.eq(next.operationType, "invalidate"); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: 2}); - assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed"); + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); // Test that it is not possible to resume a change stream after a collection has been dropped. // Once it's been dropped, we won't be able to figure out the shard key. @@ -82,7 +85,6 @@ assert.commandFailedWithCode(mongosDB.runCommand({ aggregate: mongosColl.getName(), pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], - readConcern: {level: "majority"}, cursor: {} }), 40615); diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js index ec4027bb800..89f8d0ebf6d 100644 --- a/jstests/sharding/change_streams_primary_shard_unaware.js +++ b/jstests/sharding/change_streams_primary_shard_unaware.js @@ -168,7 +168,7 @@ cstMongos2.assertNextChangesEqual({ cursor: cursorMongos2, expectedChanges: [{ - documentKey: {_id: 2}, + documentKey: {_id: 2, a: 2}, fullDocument: {_id: 2, a: 2}, ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()}, operationType: "insert", diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index d43294e7772..bc7d559610a 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -5,6 +5,7 @@ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. // For supportsMajorityReadConcern(). load("jstests/multiVersion/libs/causal_consistency_helpers.js"); @@ -151,7 +152,38 @@ operationType: "insert", }, ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + + const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + // Store the resume token of the first insert to use after dropping the collection. + const resumeTokenBeforeDrop = results[0]._id; + + // Write one more document to the collection that will be dropped, to be returned after + // resuming. + assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4})); + + // Drop the collection, invalidating the open change stream. + assertDropCollection(mongosDB, mongosCollShardedOnX.getName()); + + // Resume the change stream from before the collection drop, and verify that the documentKey + // field contains the extracted shard key from the resume token. + cursor = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {resumeAfter: resumeTokenBeforeDrop}}, + {$match: {"ns.coll": mongosCollShardedOnX.getName()}} + ], + collection: 1 + }); + cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [ + { + documentKey: {_id: 4, x: 4}, + fullDocument: {_id: 4, x: 4}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + ] + }); cst.cleanUp(); |