diff options
-rw-r--r-- | jstests/change_streams/change_stream.js | 40 | ||||
-rw-r--r-- | jstests/sharding/aggregation_internal_parameters.js | 10 | ||||
-rw-r--r-- | jstests/sharding/change_streams.js | 416 |
3 files changed, 277 insertions, 189 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 7c5688b0704..9f41255c599 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -132,6 +132,27 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + jsTestLog("Testing multi:true update"); + assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1})); + assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1})); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true})); + expected = [ + { + documentKey: {_id: 4}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {b: 2}} + }, + { + documentKey: {_id: 5}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {b: 2}} + } + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + jsTestLog("Testing delete"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.remove({_id: 1})); @@ -142,6 +163,25 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + jsTestLog("Testing justOne:false delete"); + assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1})); + assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1})); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + assert.writeOK(db.t1.remove({a: 1}, {justOne: false})); + expected = [ + { + documentKey: {_id: 6}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + }, + { + documentKey: {_id: 7}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + } + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + jsTestLog("Testing intervening write on another collection"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); diff --git a/jstests/sharding/aggregation_internal_parameters.js b/jstests/sharding/aggregation_internal_parameters.js index 98549493373..2076aa465d8 100644 --- a/jstests/sharding/aggregation_internal_parameters.js +++ b/jstests/sharding/aggregation_internal_parameters.js @@ -5,15 +5,7 @@ (function() { "use strict"; - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - enableMajorityReadConcern: '', - // Use a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1} - } - }); + const st = new ShardingTest({shards: 2, rs: {nodes: 1, enableMajorityReadConcern: ''}}); const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index 43a48598f8d..e2ef69362c9 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -15,185 +15,241 @@ return; } - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - enableMajorityReadConcern: '', - // Use a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1} + function runTest(collName, shardKey) { + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Intentionally disable the periodic no-op writer in order to allow the test have + // control of advancing the cluster time. For when it is enabled later in the test, + // use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + const mongosColl = mongosDB[collName]; + + // + // Sanity tests + // + + // Test that $sort and $group are banned from running in a $changeStream pipeline. + assertErrorCode(mongosDB.NegativeTest, + [{$changeStream: {}}, {$sort: {operationType: 1}}], + ErrorCodes.IllegalOperation); + assertErrorCode(mongosDB.NegativeTest, + [{$changeStream: {}}, {$group: {_id: '$documentKey'}}], + ErrorCodes.IllegalOperation); + + // Test that using change streams with any stages not allowed to run on mongos results in an + // error. + assertErrorCode( + mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); + + // + // Main tests + // + + function makeShardKey(value) { + var obj = {}; + obj[shardKey] = value; + return obj; } - }); - - const mongosDB = st.s0.getDB(jsTestName()); - const mongosColl = mongosDB[jsTestName()]; - - assert.commandWorked(mongosDB.dropDatabase()); - - // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. - assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); - st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - - // Shard the test collection on _id. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); - - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); - - // Move the [0, MaxKey) chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); - - // Write a document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1})); - assert.writeOK(mongosColl.insert({_id: 1})); - - let changeStream = mongosColl.aggregate([{$changeStream: {}}]); - - // Test that a change stream can see inserts on shard 0. - assert.writeOK(mongosColl.insert({_id: 1000})); - assert.writeOK(mongosColl.insert({_id: -1000})); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: {_id: 1000}, - fullDocument: {_id: 1000}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Now do another write to shard 0, advancing that shard's clock and enabling the stream to - // return the earlier write to shard 1. - assert.writeOK(mongosColl.insert({_id: 1001})); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: {_id: -1000}, - fullDocument: {_id: -1000}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Test that all changes are eventually visible due to the periodic noop writer. - assert.commandWorked( - st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - assert.commandWorked( - st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - assert.soon(() => changeStream.hasNext()); - - assertChangeStreamEventEq(changeStream.next(), { - documentKey: {_id: 1001}, - fullDocument: {_id: 1001}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - changeStream.close(); - - // Test that using change streams with any stages not allowed to run on mongos results in an - // error. - assertErrorCode( - mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); - - // Test that it is legal to open a change stream, even if the - // 'internalQueryProhibitMergingOnMongos' parameter is set. - assert.commandWorked( - mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); - let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); - tempCursor.close(); - assert.commandWorked( - mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); - - // Test that $sort and $group are banned from running in a $changeStream pipeline. - assertErrorCode(mongosColl, - [{$changeStream: {}}, {$sort: {operationType: 1}}], - ErrorCodes.IllegalOperation); - assertErrorCode(mongosColl, - [{$changeStream: {}}, {$group: {_id: "$documentKey"}}], - ErrorCodes.IllegalOperation); - - assert.writeOK(mongosColl.remove({})); - // We awaited the replication of the first write, so the change stream shouldn't return it. - // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. - assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - assert(!changeStream.hasNext()); - - // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and - // close the cursor. - jsTestLog("Testing getMore command closes cursor for invalidate entries"); - mongosColl.drop(); - // Wait for the drop to actually happen. - assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - mongosColl.getDB(), mongosColl.getName())); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - assert(changeStream.isExhausted()); - - jsTestLog("Testing aggregate command closes cursor for invalidate entries"); - // Shard the test collection on _id. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); - - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); - // Move the [0, MaxKey) chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); - - // Write one document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - assert(!changeStream.hasNext()); - - // Store a valid resume token before dropping the collection, to be used later in the test. - assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); - - assert.soon(() => changeStream.hasNext()); - const resumeToken = changeStream.next()._id; - - mongosColl.drop(); - - assert.soon(() => changeStream.hasNext()); - let next = changeStream.next(); - assert.eq(next.operationType, "insert"); - assert.eq(next.documentKey._id, 2); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // With an explicit collation, test that we can resume from before the collection drop. - changeStream = mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); - - assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "insert"); - assert.eq(next.documentKey, {_id: 2}); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // Without an explicit collation, test that we *cannot* resume from before the collection drop. - assert.commandFailedWithCode(mongosDB.runCommand({ - aggregate: mongosColl.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {} - }), - ErrorCodes.InvalidResumeToken); - - st.stop(); + + function makeShardKeyDocument(value, optExtraFields) { + var obj = {}; + if (shardKey !== '_id') + obj['_id'] = value; + obj[shardKey] = value; + return Object.assign(obj, optExtraFields); + } + + jsTestLog('Testing change streams with shard key ' + shardKey); + // Shard the test collection and split it into 2 chunks: + // [MinKey, 0) - shard0, [0, MaxKey) - shard1 + st.shardColl(mongosColl, + makeShardKey(1) /* shard key */, + makeShardKey(0) /* split at */, + makeShardKey(1) /* move to shard 1 */); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1))); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + // Test that a change stream can see inserts on shard 0. + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000))); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(1000), + fullDocument: makeShardKeyDocument(1000), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Because the periodic noop writer is disabled, do another write to shard 0 in order to + // advance that shard's clock and enabling the stream to return the earlier write to shard 1 + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001))); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(-1000), + fullDocument: makeShardKeyDocument(-1000), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Test that all changes are eventually visible due to the periodic noop writer. + assert.commandWorked( + st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.commandWorked( + st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(1001), + fullDocument: makeShardKeyDocument(1001), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + changeStream.close(); + + jsTestLog('Testing multi-update change streams with shard key ' + shardKey); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0}))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0}))); + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true})); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + operationType: "update", + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + documentKey: makeShardKeyDocument(-10), + updateDescription: {updatedFields: {b: 2}, removedFields: []}, + }); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + operationType: "update", + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + documentKey: makeShardKeyDocument(10), + updateDescription: {updatedFields: {b: 2}, removedFields: []}, + }); + changeStream.close(); + + // Test that it is legal to open a change stream, even if the + // 'internalQueryProhibitMergingOnMongos' parameter is set. + assert.commandWorked( + st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); + let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); + tempCursor.close(); + assert.commandWorked( + st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); + + assert.writeOK(mongosColl.remove({})); + // We awaited the replication of the first write, so the change stream shouldn't return it. + // Use { w: "majority" } to deal with journaling correctly, even though we only have one + // node. + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext()); + + // Drop the collection and test that we return a "drop" followed by an "invalidate" entry + // and close the cursor. + jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' + + shardKey); + mongosColl.drop(); + // Wait for the drop to actually happen. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(changeStream.isExhausted()); + + jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' + + shardKey); + // Shard the test collection and split it into 2 chunks: + // [MinKey, 0) - shard0, [0, MaxKey) - shard1 + st.shardColl(mongosColl, + makeShardKey(1) /* shard key */, + makeShardKey(0) /* split at */, + makeShardKey(1) /* move to shard 1 */); + + // Write one document to each chunk. + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext()); + + // Store a valid resume token before dropping the collection, to be used later in the test + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}})); + + assert.soon(() => changeStream.hasNext()); + const resumeToken = changeStream.next()._id; + + mongosColl.drop(); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(2), + fullDocument: makeShardKeyDocument(2), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // With an explicit collation, test that we can resume from before the collection drop + changeStream = + mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(2), + fullDocument: makeShardKeyDocument(2), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // Without an explicit collation, test that we *cannot* resume from before the collection + // drop + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + st.stop(); + } + + runTest('with_id_shard_key', '_id'); + runTest('with_non_id_shard_key', 'non_id'); })(); |