summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/change_stream.js40
-rw-r--r--jstests/sharding/aggregation_internal_parameters.js10
-rw-r--r--jstests/sharding/change_streams.js416
3 files changed, 277 insertions, 189 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 7c5688b0704..9f41255c599 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -132,6 +132,27 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ jsTestLog("Testing multi:true update");
+ assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1}));
+ assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1}));
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true}));
+ expected = [
+ {
+ documentKey: {_id: 4},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {b: 2}}
+ },
+ {
+ documentKey: {_id: 5},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {b: 2}}
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
jsTestLog("Testing delete");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
@@ -142,6 +163,25 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ jsTestLog("Testing justOne:false delete");
+ assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1}));
+ assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1}));
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ assert.writeOK(db.t1.remove({a: 1}, {justOne: false}));
+ expected = [
+ {
+ documentKey: {_id: 6},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ },
+ {
+ documentKey: {_id: 7},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
jsTestLog("Testing intervening write on another collection");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
diff --git a/jstests/sharding/aggregation_internal_parameters.js b/jstests/sharding/aggregation_internal_parameters.js
index 98549493373..2076aa465d8 100644
--- a/jstests/sharding/aggregation_internal_parameters.js
+++ b/jstests/sharding/aggregation_internal_parameters.js
@@ -5,15 +5,7 @@
(function() {
"use strict";
- const st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 1,
- enableMajorityReadConcern: '',
- // Use a higher frequency for periodic noops to speed up the test.
- setParameter: {periodicNoopIntervalSecs: 1}
- }
- });
+ const st = new ShardingTest({shards: 2, rs: {nodes: 1, enableMajorityReadConcern: ''}});
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index 43a48598f8d..e2ef69362c9 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -15,185 +15,241 @@
return;
}
- const st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 1,
- enableMajorityReadConcern: '',
- // Use a higher frequency for periodic noops to speed up the test.
- setParameter: {periodicNoopIntervalSecs: 1}
+ function runTest(collName, shardKey) {
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Intentionally disable the periodic no-op writer in order to allow the test have
+ // control of advancing the cluster time. For when it is enabled later in the test,
+ // use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ const mongosColl = mongosDB[collName];
+
+ //
+ // Sanity tests
+ //
+
+ // Test that $sort and $group are banned from running in a $changeStream pipeline.
+ assertErrorCode(mongosDB.NegativeTest,
+ [{$changeStream: {}}, {$sort: {operationType: 1}}],
+ ErrorCodes.IllegalOperation);
+ assertErrorCode(mongosDB.NegativeTest,
+ [{$changeStream: {}}, {$group: {_id: '$documentKey'}}],
+ ErrorCodes.IllegalOperation);
+
+ // Test that using change streams with any stages not allowed to run on mongos results in an
+ // error.
+ assertErrorCode(
+ mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
+
+ //
+ // Main tests
+ //
+
+ function makeShardKey(value) {
+ var obj = {};
+ obj[shardKey] = value;
+ return obj;
}
- });
-
- const mongosDB = st.s0.getDB(jsTestName());
- const mongosColl = mongosDB[jsTestName()];
-
- assert.commandWorked(mongosDB.dropDatabase());
-
- // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
-
- // Shard the test collection on _id.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
-
- // Move the [0, MaxKey) chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
-
- // Write a document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}));
- assert.writeOK(mongosColl.insert({_id: 1}));
-
- let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
-
- // Test that a change stream can see inserts on shard 0.
- assert.writeOK(mongosColl.insert({_id: 1000}));
- assert.writeOK(mongosColl.insert({_id: -1000}));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: {_id: 1000},
- fullDocument: {_id: 1000},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Now do another write to shard 0, advancing that shard's clock and enabling the stream to
- // return the earlier write to shard 1.
- assert.writeOK(mongosColl.insert({_id: 1001}));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: {_id: -1000},
- fullDocument: {_id: -1000},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Test that all changes are eventually visible due to the periodic noop writer.
- assert.commandWorked(
- st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
- assert.commandWorked(
- st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
- assert.soon(() => changeStream.hasNext());
-
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: {_id: 1001},
- fullDocument: {_id: 1001},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
- changeStream.close();
-
- // Test that using change streams with any stages not allowed to run on mongos results in an
- // error.
- assertErrorCode(
- mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
-
- // Test that it is legal to open a change stream, even if the
- // 'internalQueryProhibitMergingOnMongos' parameter is set.
- assert.commandWorked(
- mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
- let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
- tempCursor.close();
- assert.commandWorked(
- mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
-
- // Test that $sort and $group are banned from running in a $changeStream pipeline.
- assertErrorCode(mongosColl,
- [{$changeStream: {}}, {$sort: {operationType: 1}}],
- ErrorCodes.IllegalOperation);
- assertErrorCode(mongosColl,
- [{$changeStream: {}}, {$group: {_id: "$documentKey"}}],
- ErrorCodes.IllegalOperation);
-
- assert.writeOK(mongosColl.remove({}));
- // We awaited the replication of the first write, so the change stream shouldn't return it.
- // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
- assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
- assert(!changeStream.hasNext());
-
- // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and
- // close the cursor.
- jsTestLog("Testing getMore command closes cursor for invalidate entries");
- mongosColl.drop();
- // Wait for the drop to actually happen.
- assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
- mongosColl.getDB(), mongosColl.getName()));
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- jsTestLog("Testing aggregate command closes cursor for invalidate entries");
- // Shard the test collection on _id.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
- // Move the [0, MaxKey) chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
-
- // Write one document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
- assert(!changeStream.hasNext());
-
- // Store a valid resume token before dropping the collection, to be used later in the test.
- assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
-
- assert.soon(() => changeStream.hasNext());
- const resumeToken = changeStream.next()._id;
-
- mongosColl.drop();
-
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey._id, 2);
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // With an explicit collation, test that we can resume from before the collection drop.
- changeStream = mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, {_id: 2});
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // Without an explicit collation, test that we *cannot* resume from before the collection drop.
- assert.commandFailedWithCode(mongosDB.runCommand({
- aggregate: mongosColl.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
-
- st.stop();
+
+ function makeShardKeyDocument(value, optExtraFields) {
+ var obj = {};
+ if (shardKey !== '_id')
+ obj['_id'] = value;
+ obj[shardKey] = value;
+ return Object.assign(obj, optExtraFields);
+ }
+
+ jsTestLog('Testing change streams with shard key ' + shardKey);
+ // Shard the test collection and split it into 2 chunks:
+ // [MinKey, 0) - shard0, [0, MaxKey) - shard1
+ st.shardColl(mongosColl,
+ makeShardKey(1) /* shard key */,
+ makeShardKey(0) /* split at */,
+ makeShardKey(1) /* move to shard 1 */);
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1)));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1)));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // Test that a change stream can see inserts on shard 0.
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000)));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000)));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(1000),
+ fullDocument: makeShardKeyDocument(1000),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Because the periodic noop writer is disabled, do another write to shard 0 in order to
+ // advance that shard's clock and enabling the stream to return the earlier write to shard 1
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001)));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(-1000),
+ fullDocument: makeShardKeyDocument(-1000),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Test that all changes are eventually visible due to the periodic noop writer.
+ assert.commandWorked(
+ st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.commandWorked(
+ st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(1001),
+ fullDocument: makeShardKeyDocument(1001),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+ changeStream.close();
+
+ jsTestLog('Testing multi-update change streams with shard key ' + shardKey);
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0})));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0})));
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true}));
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ operationType: "update",
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ documentKey: makeShardKeyDocument(-10),
+ updateDescription: {updatedFields: {b: 2}, removedFields: []},
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ operationType: "update",
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ documentKey: makeShardKeyDocument(10),
+ updateDescription: {updatedFields: {b: 2}, removedFields: []},
+ });
+ changeStream.close();
+
+ // Test that it is legal to open a change stream, even if the
+ // 'internalQueryProhibitMergingOnMongos' parameter is set.
+ assert.commandWorked(
+ st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
+ let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
+ tempCursor.close();
+ assert.commandWorked(
+ st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
+
+ assert.writeOK(mongosColl.remove({}));
+ // We awaited the replication of the first write, so the change stream shouldn't return it.
+ // Use { w: "majority" } to deal with journaling correctly, even though we only have one
+ // node.
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext());
+
+ // Drop the collection and test that we return a "drop" followed by an "invalidate" entry
+ // and close the cursor.
+ jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' +
+ shardKey);
+ mongosColl.drop();
+ // Wait for the drop to actually happen.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' +
+ shardKey);
+ // Shard the test collection and split it into 2 chunks:
+ // [MinKey, 0) - shard0, [0, MaxKey) - shard1
+ st.shardColl(mongosColl,
+ makeShardKey(1) /* shard key */,
+ makeShardKey(0) /* split at */,
+ makeShardKey(1) /* move to shard 1 */);
+
+ // Write one document to each chunk.
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext());
+
+ // Store a valid resume token before dropping the collection, to be used later in the test
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}}));
+
+ assert.soon(() => changeStream.hasNext());
+ const resumeToken = changeStream.next()._id;
+
+ mongosColl.drop();
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(2),
+ fullDocument: makeShardKeyDocument(2),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // With an explicit collation, test that we can resume from before the collection drop
+ changeStream =
+ mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(2),
+ fullDocument: makeShardKeyDocument(2),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // Without an explicit collation, test that we *cannot* resume from before the collection
+ // drop
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ st.stop();
+ }
+
+ runTest('with_id_shard_key', '_id');
+ runTest('with_non_id_shard_key', 'non_id');
})();