diff options
Diffstat (limited to 'jstests/sharding/change_streams_whole_db.js')
-rw-r--r-- | jstests/sharding/change_streams_whole_db.js | 366 |
1 files changed, 183 insertions, 183 deletions
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index 4051493c04f..322be4a19b4 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -1,192 +1,192 @@ // Tests the behavior of a change stream on a whole database in a sharded cluster. // @tags: [uses_change_streams] (function() { - "use strict"; - - load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. - load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). - load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. - load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. - - // For supportsMajorityReadConcern(). - load("jstests/multiVersion/libs/causal_consistency_helpers.js"); - - if (!supportsMajorityReadConcern()) { - jsTestLog("Skipping test since storage engine doesn't support majority read concern."); - return; +"use strict"; + +load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. +load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). +load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. + +// For supportsMajorityReadConcern(). +load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + +if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; +} + +const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} } - - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - // Use a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} - } - }); - - const mongosDB = st.s0.getDB("test"); - const mongosColl = mongosDB[jsTestName()]; - - let cst = new ChangeStreamTest(mongosDB); - let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - - // Test that if there are no changes, we return an empty batch. - assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); - - // Test that the change stream returns operations on the unsharded test collection. - assert.writeOK(mongosColl.insert({_id: 0})); - let expected = { - documentKey: {_id: 0}, - fullDocument: {_id: 0}, +}); + +const mongosDB = st.s0.getDB("test"); +const mongosColl = mongosDB[jsTestName()]; + +let cst = new ChangeStreamTest(mongosDB); +let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + +// Test that if there are no changes, we return an empty batch. +assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + +// Test that the change stream returns operations on the unsharded test collection. +assert.writeOK(mongosColl.insert({_id: 0})); +let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +// Create a new sharded collection. +mongosDB.createCollection(jsTestName() + "_sharded_on_x"); +const mongosCollShardedOnX = mongosDB[jsTestName() + "_sharded_on_x"]; + +// Shard, split, and move one chunk to shard1. +st.shardColl(mongosCollShardedOnX.getName(), {x: 1}, {x: 0}, {x: 1}, mongosDB.getName()); + +// Write a document to each chunk. +assert.writeOK(mongosCollShardedOnX.insert({_id: 0, x: -1})); +assert.writeOK(mongosCollShardedOnX.insert({_id: 1, x: 1})); + +// Verify that the change stream returns both inserts. +expected = [ + { + documentKey: {_id: 0, x: -1}, + fullDocument: {_id: 0, x: -1}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 1, x: 1}, + fullDocument: {_id: 1, x: 1}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + } +]; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + +// Now send inserts to both the sharded and unsharded collections, and verify that the change +// streams returns them in order. +assert.writeOK(mongosCollShardedOnX.insert({_id: 2, x: 2})); +assert.writeOK(mongosColl.insert({_id: 1})); + +// Verify that the change stream returns both inserts. +expected = [ + { + documentKey: {_id: 2, x: 2}, + fullDocument: {_id: 2, x: 2}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 1}, + fullDocument: {_id: 1}, ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, operationType: "insert", - }; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - - // Create a new sharded collection. - mongosDB.createCollection(jsTestName() + "_sharded_on_x"); - const mongosCollShardedOnX = mongosDB[jsTestName() + "_sharded_on_x"]; - - // Shard, split, and move one chunk to shard1. - st.shardColl(mongosCollShardedOnX.getName(), {x: 1}, {x: 0}, {x: 1}, mongosDB.getName()); - - // Write a document to each chunk. - assert.writeOK(mongosCollShardedOnX.insert({_id: 0, x: -1})); - assert.writeOK(mongosCollShardedOnX.insert({_id: 1, x: 1})); - - // Verify that the change stream returns both inserts. - expected = [ - { - documentKey: {_id: 0, x: -1}, - fullDocument: {_id: 0, x: -1}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, - operationType: "insert", - }, - { - documentKey: {_id: 1, x: 1}, - fullDocument: {_id: 1, x: 1}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, - operationType: "insert", - } - ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - - // Now send inserts to both the sharded and unsharded collections, and verify that the change - // streams returns them in order. - assert.writeOK(mongosCollShardedOnX.insert({_id: 2, x: 2})); - assert.writeOK(mongosColl.insert({_id: 1})); - - // Verify that the change stream returns both inserts. - expected = [ - { - documentKey: {_id: 2, x: 2}, - fullDocument: {_id: 2, x: 2}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, - operationType: "insert", - }, - { - documentKey: {_id: 1}, - fullDocument: {_id: 1}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - } - ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - - // Create a third sharded collection with a compound shard key. - mongosDB.createCollection(jsTestName() + "_sharded_compound"); - const mongosCollShardedCompound = mongosDB[jsTestName() + "_sharded_compound"]; - - // Shard, split, and move one chunk to shard1. - st.shardColl(mongosCollShardedCompound.getName(), - {y: 1, x: 1}, - {y: 1, x: MinKey}, - {y: 1, x: MinKey}, - mongosDB.getName()); - - // Write a document to each chunk. - assert.writeOK(mongosCollShardedCompound.insert({_id: 0, y: -1, x: 0})); - assert.writeOK(mongosCollShardedCompound.insert({_id: 1, y: 1, x: 0})); - - // Verify that the change stream returns both inserts. - expected = [ - { - documentKey: {_id: 0, y: -1, x: 0}, - fullDocument: {_id: 0, y: -1, x: 0}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, - operationType: "insert", - }, - { - documentKey: {_id: 1, y: 1, x: 0}, - fullDocument: {_id: 1, y: 1, x: 0}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, - operationType: "insert", - } - ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - - // Send inserts to all 3 collections and verify that the results contain the correct - // documentKeys and are in the correct order. - assert.writeOK(mongosCollShardedOnX.insert({_id: 3, x: 3})); - assert.writeOK(mongosColl.insert({_id: 3})); - assert.writeOK(mongosCollShardedCompound.insert({_id: 2, x: 0, y: -2})); - - // Verify that the change stream returns both inserts. - expected = [ - { - documentKey: {_id: 3, x: 3}, - fullDocument: {_id: 3, x: 3}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, - operationType: "insert", - }, - { - documentKey: {_id: 3}, - fullDocument: {_id: 3}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }, + } +]; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + +// Create a third sharded collection with a compound shard key. +mongosDB.createCollection(jsTestName() + "_sharded_compound"); +const mongosCollShardedCompound = mongosDB[jsTestName() + "_sharded_compound"]; + +// Shard, split, and move one chunk to shard1. +st.shardColl(mongosCollShardedCompound.getName(), + {y: 1, x: 1}, + {y: 1, x: MinKey}, + {y: 1, x: MinKey}, + mongosDB.getName()); + +// Write a document to each chunk. +assert.writeOK(mongosCollShardedCompound.insert({_id: 0, y: -1, x: 0})); +assert.writeOK(mongosCollShardedCompound.insert({_id: 1, y: 1, x: 0})); + +// Verify that the change stream returns both inserts. +expected = [ + { + documentKey: {_id: 0, y: -1, x: 0}, + fullDocument: {_id: 0, y: -1, x: 0}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 1, y: 1, x: 0}, + fullDocument: {_id: 1, y: 1, x: 0}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, + operationType: "insert", + } +]; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + +// Send inserts to all 3 collections and verify that the results contain the correct +// documentKeys and are in the correct order. +assert.writeOK(mongosCollShardedOnX.insert({_id: 3, x: 3})); +assert.writeOK(mongosColl.insert({_id: 3})); +assert.writeOK(mongosCollShardedCompound.insert({_id: 2, x: 0, y: -2})); + +// Verify that the change stream returns both inserts. +expected = [ + { + documentKey: {_id: 3, x: 3}, + fullDocument: {_id: 3, x: 3}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 3}, + fullDocument: {_id: 3}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 2, x: 0, y: -2}, + fullDocument: {_id: 2, x: 0, y: -2}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, + operationType: "insert", + }, +]; + +const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); +// Store the resume token of the first insert to use after dropping the collection. +const resumeTokenBeforeDrop = results[0]._id; + +// Write one more document to the collection that will be dropped, to be returned after +// resuming. +assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4})); + +// Drop the collection, invalidating the open change stream. +assertDropCollection(mongosDB, mongosCollShardedOnX.getName()); + +// Resume the change stream from before the collection drop, and verify that the documentKey +// field contains the extracted shard key from the resume token. +cursor = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {resumeAfter: resumeTokenBeforeDrop}}, + {$match: {"ns.coll": mongosCollShardedOnX.getName()}} + ], + collection: 1 +}); +cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [ { - documentKey: {_id: 2, x: 0, y: -2}, - fullDocument: {_id: 2, x: 0, y: -2}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()}, - operationType: "insert", + documentKey: {_id: 4, x: 4}, + fullDocument: {_id: 4, x: 4}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", }, - ]; - - const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - // Store the resume token of the first insert to use after dropping the collection. - const resumeTokenBeforeDrop = results[0]._id; - - // Write one more document to the collection that will be dropped, to be returned after - // resuming. - assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4})); - - // Drop the collection, invalidating the open change stream. - assertDropCollection(mongosDB, mongosCollShardedOnX.getName()); - - // Resume the change stream from before the collection drop, and verify that the documentKey - // field contains the extracted shard key from the resume token. - cursor = cst.startWatchingChanges({ - pipeline: [ - {$changeStream: {resumeAfter: resumeTokenBeforeDrop}}, - {$match: {"ns.coll": mongosCollShardedOnX.getName()}} - ], - collection: 1 - }); - cst.assertNextChangesEqual({ - cursor: cursor, - expectedChanges: [ - { - documentKey: {_id: 4, x: 4}, - fullDocument: {_id: 4, x: 4}, - ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, - operationType: "insert", - }, - ] - }); - - cst.cleanUp(); - - st.stop(); + ] +}); + +cst.cleanUp(); + +st.stop(); })(); |