summaryrefslogtreecommitdiff
path: root/jstests/sharding
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-03 17:00:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-11 15:53:03 -0400
commitfff261ac550155065fce4b7b1529061f18980599 (patch)
tree09ce022d7b8319f1af3c2db2354427ecfe1aa389 /jstests/sharding
parent0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff)
downloadmongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'jstests/sharding')
-rw-r--r--jstests/sharding/aggregation_currentop.js5
-rw-r--r--jstests/sharding/change_stream_lookup_single_shard_cluster.js13
-rw-r--r--jstests/sharding/change_streams.js3
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js243
-rw-r--r--jstests/sharding/change_streams_whole_db.js167
-rw-r--r--jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js51
-rw-r--r--jstests/sharding/resume_change_stream.js291
-rw-r--r--jstests/sharding/resume_change_stream_on_subset_of_shards.js3
8 files changed, 519 insertions, 257 deletions
diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js
index 1a0d4b72281..dce4cf5482f 100644
--- a/jstests/sharding/aggregation_currentop.js
+++ b/jstests/sharding/aggregation_currentop.js
@@ -19,6 +19,7 @@
"use strict";
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+ load("jstests/libs/namespace_utils.js"); // For getCollectionNameFromFullNamespace.
// Replica set nodes started with --shardsvr do not enable key generation until they are added
// to a sharded cluster and reject commands with gossiped clusterTime from users without the
@@ -196,10 +197,6 @@
awaitShell();
}
- function getCollectionNameFromFullNamespace(ns) {
- return ns.split(/\.(.+)/)[1];
- }
-
// Generic function for running getMore on a $currentOp aggregation cursor and returning the
// command response.
function getMoreTest({conn, curOpSpec, getMoreBatchSize}) {
diff --git a/jstests/sharding/change_stream_lookup_single_shard_cluster.js b/jstests/sharding/change_stream_lookup_single_shard_cluster.js
index 02766b7b592..f642064cd57 100644
--- a/jstests/sharding/change_stream_lookup_single_shard_cluster.js
+++ b/jstests/sharding/change_stream_lookup_single_shard_cluster.js
@@ -36,14 +36,21 @@
assert.eq(explainPlan.mergeType, "mongos");
// Open a $changeStream on the collection with 'updateLookup' and update the test doc.
- const stream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
+ const stream = mongosColl.watch([], {fullDocument: "updateLookup"});
+ const wholeDbStream = mongosDB.watch([], {fullDocument: "updateLookup"});
+
mongosColl.update({_id: 1}, {$set: {updated: true}});
- // Verify that the document is successfully retrieved from the sharded collection.
+ // Verify that the document is successfully retrieved from the single-collection and whole-db
+ // change streams.
assert.soon(() => stream.hasNext());
assert.docEq(stream.next().fullDocument, {_id: 1, updated: true});
+ assert.soon(() => wholeDbStream.hasNext());
+ assert.docEq(wholeDbStream.next().fullDocument, {_id: 1, updated: true});
+
stream.close();
+ wholeDbStream.close();
st.stop();
-})(); \ No newline at end of file
+})();
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index 92547fabe62..04f29289338 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -163,7 +163,8 @@
mongosColl.getDB(), mongosColl.getName()));
ChangeStreamTest.assertChangeStreamThrowsCode({
- collection: mongosColl,
+ db: mongosDB,
+ collName: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
expectedCode: 40615
});
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index bb6019a4651..ea5178601b1 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -28,97 +28,166 @@
}
});
- const mongosDB = st.s0.getDB(testName);
+ const mongosDB = st.s0.getDB("test");
const mongosColl = mongosDB[testName];
- mongosDB.createCollection(testName);
- mongosColl.createIndex({x: 1});
-
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
-
- // Establish a change stream cursor on the unsharded collection.
- let cst = new ChangeStreamTest(mongosDB);
- let cursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: mongosColl});
- assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
-
- // Verify that the cursor picks up documents inserted while the collection is unsharded. The
- // 'documentKey' at this point is simply the _id field.
- assert.writeOK(mongosColl.insert({_id: 0, x: 0}));
- const[preShardCollectionChange] = cst.assertNextChangesEqual({
- cursor: cursor,
- expectedChanges: [{
- documentKey: {_id: 0},
- fullDocument: {_id: 0, x: 0},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- }]
- });
+ function testUnshardedBecomesSharded(collToWatch) {
+ mongosColl.drop();
+ mongosDB.createCollection(testName);
+ mongosColl.createIndex({x: 1});
+
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Establish a change stream cursor on the unsharded collection.
+ const cst = new ChangeStreamTest(mongosDB);
+
+ // Create a different collection in the same database, and verify that it doesn't affect the
+ // results of the change stream.
+ const mongosCollOther = mongosDB[testName + "other"];
+ mongosCollOther.drop();
+ mongosDB.createCollection(testName + "other");
+ mongosCollOther.createIndex({y: 1});
+
+ let cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
+ collection: collToWatch
+ });
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Verify that the cursor picks up documents inserted while the collection is unsharded. The
+ // 'documentKey' at this point is simply the _id field.
+ assert.writeOK(mongosColl.insert({_id: 0, x: 0}));
+ assert.writeOK(mongosCollOther.insert({_id: 0, y: 0}));
+ const[preShardCollectionChange] = cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [{
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 0},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }]
+ });
+
+ // Record the resume token for this change, before the collection is sharded.
+ const preShardCollectionResumeToken = preShardCollectionChange._id;
+
+ // Shard the test collection with shard key {x: 1} and split into 2 chunks.
+ st.shardColl(mongosColl.getName(), {x: 1}, {x: 0}, false, mongosDB.getName());
+
+ // Shard the other collection with shard key {y: 1} and split into 2 chunks.
+ st.shardColl(mongosCollOther.getName(), {y: 1}, {y: 0}, false, mongosDB.getName());
+
+ // List the changes we expect to see for the next two operations on the sharded collection.
+ // Later, we will resume the stream using the token generated before the collection was
+ // sharded, and will need to confirm that we can still see these two changes.
+ const postShardCollectionChanges = [
+ {
+ documentKey: {x: 1, _id: 1},
+ fullDocument: {_id: 1, x: 1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {x: -1, _id: -1},
+ fullDocument: {_id: -1, x: -1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }
+ ];
+
+ // Verify that the cursor on the original shard is still valid and sees new inserted
+ // documents. The 'documentKey' field should now include the shard key, even before a
+ // 'kNewShardDetected' operation has been generated by the migration of a chunk to a new
+ // shard.
+ assert.writeOK(mongosColl.insert({_id: 1, x: 1}));
+ assert.writeOK(mongosCollOther.insert({_id: 1, y: 1}));
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});
+
+ // Move the [minKey, 0) chunk to shard1.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosColl.getFullName(),
+ find: {x: -1},
+ to: st.rs1.getURL(),
+ _waitForDelete: true
+ }));
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: mongosCollOther.getFullName(),
+ find: {y: -1},
+ to: st.rs1.getURL(),
+ _waitForDelete: true
+ }));
+
+ // Make sure the change stream cursor sees a document inserted on the recipient shard.
+ assert.writeOK(mongosColl.insert({_id: -1, x: -1}));
+ assert.writeOK(mongosCollOther.insert({_id: -1, y: -1}));
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]});
+
+ // Confirm that we can resume the stream on the sharded collection using the token generated
+ // while the collection was unsharded, whose documentKey contains the _id field but not the
+ // shard key.
+ let resumedCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}],
+ collection: mongosColl
+ });
+
+ // Verify that we see both of the insertions which occurred after the collection was
+ // sharded.
+ cst.assertNextChangesEqual(
+ {cursor: resumedCursor, expectedChanges: postShardCollectionChanges});
+
+ // Test the behavior of a change stream when a sharded collection is dropped and recreated.
+ cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
+ collection: collToWatch
+ });
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Insert a couple documents to shard1, creating a scenario where the getMore to shard0 will
+ // indicate that the change stream is invalidated yet shard1 will still have data to return.
+ assert.writeOK(mongosColl.insert({_id: -2, x: -2}));
+ assert.writeOK(mongosColl.insert({_id: -3, x: -3}));
+
+ // Drop and recreate the collection.
+ mongosColl.drop();
+ mongosDB.createCollection(mongosColl.getName());
+ mongosColl.createIndex({z: 1});
+
+ // Shard the collection on a different shard key and ensure that each shard has a chunk.
+ st.shardColl(mongosColl.getName(), {z: 1}, {z: 0}, {z: -1}, mongosDB.getName());
+
+ assert.writeOK(mongosColl.insert({_id: -1, z: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1, z: 1}));
+
+ // Verify that the change stream picks up the inserts, however the shard key is missing
+ // since the collection has since been dropped and recreated.
+ cst.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: -2},
+ fullDocument: {_id: -2, x: -2},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: -3},
+ fullDocument: {_id: -3, x: -3},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }
+ ]
+ });
+
+ cst.cleanUp();
+ }
- // Record the resume token for this change, before the collection is sharded.
- const preShardCollectionResumeToken = preShardCollectionChange._id;
-
- // Enable sharding on the previously unsharded collection.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
-
- // Shard the collection on x.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {x: 1}}));
-
- // Ensure that the primary shard has an up-to-date routing table.
- assert.commandWorked(st.rs0.getPrimary().getDB("admin").runCommand(
- {_flushRoutingTableCacheUpdates: mongosColl.getFullName()}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {x: 0}}));
-
- // List the changes we expect to see for the next two operations on the sharded collection.
- // Later, we will resume the stream using the token generated before the collection was sharded,
- // and will need to confirm that we can still see these two changes.
- const postShardCollectionChanges = [
- {
- documentKey: {x: 1, _id: 1},
- fullDocument: {_id: 1, x: 1},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- },
- {
- documentKey: {x: -1, _id: -1},
- fullDocument: {_id: -1, x: -1},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- }
- ];
-
- // Verify that the cursor on the original shard is still valid and sees new inserted documents.
- // The 'documentKey' field should now include the shard key, even before a 'kNewShardDetected'
- // operation has been generated by the migration of a chunk to a new shard.
- assert.writeOK(mongosColl.insert({_id: 1, x: 1}));
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});
-
- // Move the [minKey, 0) chunk to shard1.
- assert.commandWorked(mongosDB.adminCommand({
- moveChunk: mongosColl.getFullName(),
- find: {x: -1},
- to: st.rs1.getURL(),
- _waitForDelete: true
- }));
-
- // Make sure the change stream cursor sees a document inserted on the recipient shard.
- assert.writeOK(mongosColl.insert({_id: -1, x: -1}));
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]});
-
- // Confirm that we can resume the stream on the sharded collection using the token generated
- // while the collection was unsharded, whose documentKey contains the _id field but not the
- // shard key.
- let resumedCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}],
- collection: mongosColl
- });
+ // First test against a change stream on a single collection.
+ testUnshardedBecomesSharded(mongosColl.getName());
- // Verify that we see both of the insertions which occurred after the collection was sharded.
- cst.assertNextChangesEqual(
- {cursor: resumedCursor, expectedChanges: postShardCollectionChanges});
+ // Test against a change stream on the entire database.
+ testUnshardedBecomesSharded(1);
st.stop();
})();
diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js
new file mode 100644
index 00000000000..fc5443315a6
--- /dev/null
+++ b/jstests/sharding/change_streams_whole_db.js
@@ -0,0 +1,167 @@
+// Tests the behavior of a change stream on a whole database in a sharded cluster.
+(function() {
+ "use strict";
+
+ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
+ load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ }
+ });
+
+ const mongosDB = st.s0.getDB("test");
+
+ // TODO SERVER-34138 will add support for opening a change stream before a database exists.
+ assert.commandFailedWithCode(
+ mongosDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {batchSize: 1}}),
+ ErrorCodes.NamespaceNotFound);
+
+ const mongosColl = mongosDB[jsTestName()];
+ mongosDB.createCollection(jsTestName());
+
+ let cst = new ChangeStreamTest(mongosDB);
+ let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+
+ // Test that if there are no changes, we return an empty batch.
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Test that the change stream returns operations on the unsharded test collection.
+ assert.writeOK(mongosColl.insert({_id: 0}));
+ let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ };
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Create a new sharded collection.
+ mongosDB.createCollection(jsTestName() + "_sharded_on_x");
+ const mongosCollShardedOnX = mongosDB[jsTestName() + "_sharded_on_x"];
+
+ // Shard, split, and move one chunk to shard1.
+ st.shardColl(mongosCollShardedOnX.getName(), {x: 1}, {x: 0}, {x: 1}, mongosDB.getName());
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 0, x: -1}));
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 1, x: 1}));
+
+ // Verify that the change stream returns both inserts.
+ expected = [
+ {
+ documentKey: {_id: 0, x: -1},
+ fullDocument: {_id: 0, x: -1},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 1, x: 1},
+ fullDocument: {_id: 1, x: 1},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ // Now send inserts to both the sharded and unsharded collections, and verify that the change
+ // streams returns them in order.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 2, x: 2}));
+ assert.writeOK(mongosColl.insert({_id: 1}));
+
+ // Verify that the change stream returns both inserts.
+ expected = [
+ {
+ documentKey: {_id: 2, x: 2},
+ fullDocument: {_id: 2, x: 2},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ // Create a third sharded collection with a compound shard key.
+ mongosDB.createCollection(jsTestName() + "_sharded_compound");
+ const mongosCollShardedCompound = mongosDB[jsTestName() + "_sharded_compound"];
+
+ // Shard, split, and move one chunk to shard1.
+ st.shardColl(mongosCollShardedCompound.getName(),
+ {y: 1, x: 1},
+ {y: 1, x: MinKey},
+ {y: 1, x: MinKey},
+ mongosDB.getName());
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosCollShardedCompound.insert({_id: 0, y: -1, x: 0}));
+ assert.writeOK(mongosCollShardedCompound.insert({_id: 1, y: 1, x: 0}));
+
+ // Verify that the change stream returns both inserts.
+ expected = [
+ {
+ documentKey: {_id: 0, y: -1, x: 0},
+ fullDocument: {_id: 0, y: -1, x: 0},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 1, y: 1, x: 0},
+ fullDocument: {_id: 1, y: 1, x: 0},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()},
+ operationType: "insert",
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ // Send inserts to all 3 collections and verify that the results contain the correct
+ // documentKeys and are in the correct order.
+ assert.writeOK(mongosCollShardedOnX.insert({_id: 3, x: 3}));
+ assert.writeOK(mongosColl.insert({_id: 3}));
+ assert.writeOK(mongosCollShardedCompound.insert({_id: 2, x: 0, y: -2}));
+
+ // Verify that the change stream returns both inserts.
+ expected = [
+ {
+ documentKey: {_id: 3, x: 3},
+ fullDocument: {_id: 3, x: 3},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 3},
+ fullDocument: {_id: 3},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 2, x: 0, y: -2},
+ fullDocument: {_id: 2, x: 0, y: -2},
+ ns: {db: mongosDB.getName(), coll: mongosCollShardedCompound.getName()},
+ operationType: "insert",
+ },
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ cst.cleanUp();
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
index bc69d734a78..29689156c87 100644
--- a/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
+++ b/jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
@@ -49,7 +49,8 @@
to: st.rs1.getURL()
}));
- const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
+ const changeStreamSingleColl = mongosColl.watch([], {fullDocument: "updateLookup"});
+ const changeStreamWholeDb = mongosDB.watch([], {fullDocument: "updateLookup"});
const nDocs = 6;
const bValues = ["one", "two", "three", "four", "five", "six"];
@@ -66,19 +67,22 @@
assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 1}}));
}
- for (let id = 0; id < nDocs; ++id) {
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
- assert.docEq(next.fullDocument,
- Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 1}));
- }
+ [changeStreamSingleColl, changeStreamWholeDb].forEach(function(changeStream) {
+ jsTestLog(`Testing updateLookup on namespace ${changeStream._ns}`);
+ for (let id = 0; id < nDocs; ++id) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
+ assert.docEq(next.fullDocument,
+ Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 1}));
+ }
+ });
// Test that the change stream can still see the updated post image, even if a chunk is
// migrated.
@@ -94,14 +98,17 @@
to: st.rs0.getURL()
}));
- for (let id = 0; id < nDocs; ++id) {
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
- assert.docEq(next.fullDocument,
- Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 2}));
- }
+ [changeStreamSingleColl, changeStreamWholeDb].forEach(function(changeStream) {
+ jsTestLog(`Testing updateLookup after moveChunk on namespace ${changeStream._ns}`);
+ for (let id = 0; id < nDocs; ++id) {
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
+ assert.docEq(next.fullDocument,
+ Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 2}));
+ }
+ });
st.stop();
})();
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js
index b7a43d5eadb..4f224007389 100644
--- a/jstests/sharding/resume_change_stream.js
+++ b/jstests/sharding/resume_change_stream.js
@@ -32,168 +32,181 @@
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
- assert.commandWorked(mongosDB.dropDatabase());
+ let cst = new ChangeStreamTest(mongosDB);
- // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+ function testResume(mongosColl, collToWatch) {
+ mongosColl.drop();
- // Shard the test collection on _id.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+ // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
- // Move the [0, MaxKey] chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
- // Write a document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ // Move the [0, MaxKey] chunk to st.shard1.shardName.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
- let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
- // We awaited the replication of the first writes, so the change stream shouldn't return them.
- assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
+ let changeStream = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true});
- // Record current time to resume a change stream later in the test.
- const resumeTimeFirstUpdate = mongosDB.runCommand({isMaster: 1}).$clusterTime.clusterTime;
+ // We awaited the replication of the first writes, so the change stream shouldn't return
+ // them.
+ assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
- assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+ // Record current time to resume a change stream later in the test.
+ const resumeTimeFirstUpdate = mongosDB.runCommand({isMaster: 1}).$clusterTime.clusterTime;
- // Test that we see the two writes, and remember their resume tokens.
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, -1);
- const resumeTokenFromFirstUpdateOnShard0 = next._id;
+ assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.eq(next.documentKey._id, 1);
- const resumeTokenFromFirstUpdateOnShard1 = next._id;
+ // Test that we see the two writes, and remember their resume tokens.
+ let next = cst.getOneChange(changeStream);
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, -1);
+ const resumeTokenFromFirstUpdateOnShard0 = next._id;
- changeStream.close();
+ next = cst.getOneChange(changeStream);
+ assert.eq(next.operationType, "update");
+ assert.eq(next.documentKey._id, 1);
+ const resumeTokenFromFirstUpdateOnShard1 = next._id;
- // Write some additional documents, then test that it's possible to resume after the first
- // update.
- assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
- changeStream =
- mongosColl.aggregate([{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}]);
+ // Write some additional documents, then test that it's possible to resume after the first
+ // update.
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
- for (let nextExpectedId of[1, -2, 2]) {
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().documentKey._id, nextExpectedId);
- }
-
- changeStream.close();
-
- // Test that the stream can't resume if the resume token is no longer present in the oplog.
-
- // Roll over the entire oplog on the shard with the resume token for the first update.
- const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1.
- const mostRecentOplogEntry = getLatestOp(shardWithResumeToken);
- assert.neq(mostRecentOplogEntry, null);
- const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
- let i = 0;
-
- function oplogIsRolledOver() {
- // The oplog has rolled over if the op that used to be newest is now older than the oplog's
- // current oldest entry. Said another way, the oplog is rolled over when everything in the
- // oplog is newer than what used to be the newest entry.
- return bsonWoCompare(
- mostRecentOplogEntry.ts,
- getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) <
- 0;
- }
+ changeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ collection: collToWatch
+ });
- while (!oplogIsRolledOver()) {
- let idVal = 100 + (i++);
- assert.writeOK(
- mongosColl.insert({_id: idVal, long_str: largeStr}, {writeConcern: {w: "majority"}}));
- sleep(100);
- }
+ for (let nextExpectedId of[1, -2, 2]) {
+ assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId);
+ }
- ChangeStreamTest.assertChangeStreamThrowsCode({
- collection: mongosColl,
- pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}],
- expectedCode: 40576
- });
+ // Test that the stream can't resume if the resume token is no longer present in the oplog.
+
+ // Roll over the entire oplog on the shard with the resume token for the first update.
+ const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1.
+ const mostRecentOplogEntry = getLatestOp(shardWithResumeToken);
+ assert.neq(mostRecentOplogEntry, null);
+ const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+ let i = 0;
+
+ function oplogIsRolledOver() {
+ // The oplog has rolled over if the op that used to be newest is now older than the
+ // oplog's current oldest entry. Said another way, the oplog is rolled over when
+ // everything in the oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(mostRecentOplogEntry.ts, getLeastRecentOp({
+ server: shardWithResumeToken,
+ readConcern: "majority"
+ }).ts) < 0;
+ }
- ChangeStreamTest.assertChangeStreamThrowsCode({
- collection: mongosColl,
- pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTimeFirstUpdate}}}],
- expectedCode: 40576
- });
+ while (!oplogIsRolledOver()) {
+ let idVal = 100 + (i++);
+ assert.writeOK(mongosColl.insert({_id: idVal, long_str: largeStr},
+ {writeConcern: {w: "majority"}}));
+ sleep(100);
+ }
- // Test that the change stream can't resume if the resume token *is* present in the oplog, but
- // one of the shards has rolled over its oplog enough that it doesn't have a long enough history
- // to resume. Since we just rolled over the oplog on shard 1, we know that
- // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't have
- // any changes earlier than that, so won't be able to resume.
- ChangeStreamTest.assertChangeStreamThrowsCode({
- collection: mongosColl,
- pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
- expectedCode: 40576
- });
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: mongosDB,
+ collName: collToWatch,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}],
+ expectedCode: 40576
+ });
+
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: mongosDB,
+ collName: collToWatch,
+ pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTimeFirstUpdate}}}],
+ expectedCode: 40576
+ });
+
+ // Test that the change stream can't resume if the resume token *is* present in the oplog,
+ // but one of the shards has rolled over its oplog enough that it doesn't have a long enough
+ // history to resume. Since we just rolled over the oplog on shard 1, we know that
+ // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't
+ // have any changes earlier than that, so won't be able to resume.
+ ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: mongosDB,
+ collName: collToWatch,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}],
+ expectedCode: 40576
+ });
+
+ // Drop the collection.
+ assert(mongosColl.drop());
+
+ // Shard the test collection on shardKey.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey].
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}}));
+
+ // Move the [50, MaxKey] chunk to st.shard1.shardName.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()}));
+
+ const numberOfDocs = 100;
+
+ // Insert test documents.
+ for (let counter = 0; counter < numberOfDocs / 5; ++counter) {
+ assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3},
+ {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4},
+ {writeConcern: {w: "majority"}}));
+ }
- // Drop the collection.
- assert(mongosColl.drop());
-
- // Shard the test collection on shardKey.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey].
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}}));
-
- // Move the [50, MaxKey] chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()}));
-
- const numberOfDocs = 100;
-
- // Insert test documents.
- for (let counter = 0; counter < numberOfDocs / 5; ++counter) {
- assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0},
- {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1},
- {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2},
- {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3},
- {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4},
- {writeConcern: {w: "majority"}}));
+ let allChangesCursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true});
+
+ // Perform the multi-update that will induce timestamp collisions
+ assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true}));
+
+ // Loop over documents and open inner change streams resuming from a specified position.
+ // Note we skip the last document as it does not have the next document so we would
+ // hang indefinitely.
+ for (let counter = 0; counter < numberOfDocs - 1; ++counter) {
+ let next = cst.getOneChange(allChangesCursor);
+
+ const resumeToken = next._id;
+ const caseInsensitive = {locale: "en_US", strength: 2};
+ let resumedCaseInsensitiveCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ collection: collToWatch,
+ aggregateOptions: {collation: caseInsensitive}
+ });
+ cst.getOneChange(resumedCaseInsensitiveCursor);
+ }
}
- let allChangesCursor = mongosColl.aggregate([{$changeStream: {}}]);
+ // Test change stream on a single collection.
+ testResume(mongosColl, mongosColl.getName());
- // Perform the multi-update that will induce timestamp collisions
- assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true}));
-
- // Loop over documents and open inner change streams resuming from a specified position.
- // Note we skip the last document as it does not have the next document so we would
- // hang indefinitely.
- for (let counter = 0; counter < numberOfDocs - 1; ++counter) {
- assert.soon(() => allChangesCursor.hasNext());
- let next = allChangesCursor.next();
-
- const resumeToken = next._id;
- const caseInsensitive = {locale: "en_US", strength: 2};
- let resumedCaseInsensitiveCursor = mongosColl.aggregate(
- [{$changeStream: {resumeAfter: resumeToken}}], {collation: caseInsensitive});
- assert.soon(() => resumedCaseInsensitiveCursor.hasNext());
- resumedCaseInsensitiveCursor.close();
- }
+ // Test change stream on all collections.
+ testResume(mongosColl, 1);
- allChangesCursor.close();
+ cst.cleanUp();
st.stop();
})();
diff --git a/jstests/sharding/resume_change_stream_on_subset_of_shards.js b/jstests/sharding/resume_change_stream_on_subset_of_shards.js
index b65690412e0..f3e405bc676 100644
--- a/jstests/sharding/resume_change_stream_on_subset_of_shards.js
+++ b/jstests/sharding/resume_change_stream_on_subset_of_shards.js
@@ -59,7 +59,8 @@
// it has been dropped.
changeStream.close();
ChangeStreamTest.assertChangeStreamThrowsCode({
- collection: mongosColl,
+ db: mongosDB,
+ collName: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
expectedCode: 40615
});