summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-04-22 03:41:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-23 03:06:21 +0000
commit78c459a1f73eda8a89b00472182767dd0eae980f (patch)
tree8433025da51dcc191e08b177130eefcdefb87be8
parentd1d2bb4551bb991a2b888cb86249f7702d1ded6d (diff)
downloadmongo-78c459a1f73eda8a89b00472182767dd0eae980f.tar.gz
SERVER-65497 Do not assume input document is unmodified in DSChangeStreamHandleTopologyChange
(cherry picked from commit c7b46ffe630f5e323d834d6278330336bbbde32c)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml3
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js90
-rw-r--r--jstests/change_streams/oplog_rewrite/projection_changes_type.js87
-rw-r--r--jstests/change_streams/projection_fakes_internal_event.js211
-rw-r--r--jstests/libs/change_stream_rewrite_util.js100
-rw-r--r--jstests/libs/override_methods/implicitly_shard_accessed_collections.js5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp35
7 files changed, 445 insertions, 86 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
index db39ec22c09..09d056f1e54 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml
@@ -3,6 +3,9 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
+ exclude_files:
+ # Expects a change stream cursor to be open on each of 2 shards.
+ - jstests/change_streams/projection_fakes_internal_event.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
index d268938640b..323b94f5956 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
@@ -12,70 +12,18 @@
"use strict";
load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
-load("jstests/libs/fixture_helpers.js"); // For isMongos.
const dbName = "change_stream_rewrite_null_existence_test";
-
const collName = "coll1";
-const collNameAfterRename = "coll_renamed";
// Establish a resume token at a point before anything actually happens in the test.
const startPoint = db.getMongo().watch().getResumeToken();
-// If this is a sharded passthrough, make sure we shard on something other than _id.
-if (FixtureHelpers.isMongos(db)) {
- assertDropCollection(db, collName);
- assert.commandWorked(db.adminCommand({enableSharding: dbName}));
- assert.commandWorked(
- db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}}));
-}
-
const testDB = db.getSiblingDB(dbName);
-let testColl = testDB[collName];
-
const numDocs = 8;
-// Generate the write workload.
-(function performWriteWorkload() {
- // Insert some documents.
- for (let i = 0; i < numDocs; ++i) {
- assert.commandWorked(testColl.insert(
- {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false}));
- }
-
- // Update half of them. We generate these updates individually so that they generate different
- // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields.
- const updateSpecs = [
- [{$set: {f2: true}}], // only populates 'updatedFields'
- [{$unset: ["f1"]}], // only populates 'removedFields'
- [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays'
- [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields
- ];
- for (let i = 0; i < numDocs / 2; ++i) {
- assert.commandWorked(
- testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)]));
- }
-
- // Replace the other half.
- for (let i = numDocs / 2; i < numDocs; ++i) {
- assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i}));
- }
-
- // Delete half of the updated documents.
- for (let i = 0; i < numDocs / 4; ++i) {
- assert.commandWorked(testColl.remove({_id: i, shardKey: i}));
- }
-})();
-
-// Rename the collection.
-assert.commandWorked(testColl.renameCollection(collNameAfterRename));
-testColl = testDB[collNameAfterRename];
-
-// Drop the collection.
-assert(testColl.drop());
-
-// Drop the database.
-assert.commandWorked(testDB.dropDatabase());
+// Generate a write workload for the change stream to consume.
+generateChangeStreamWriteWorkload(testDB, collName, numDocs);
// Function to generate a list of all paths to be tested from those observed in the event stream.
function traverseEvent(event, outputMap, prefixPath = "") {
@@ -110,11 +58,6 @@ function traverseEvent(event, outputMap, prefixPath = "") {
}
}
- // Helper function to check whether this value is a plain old javascript object.
- function isPlainObject(value) {
- return (value && typeof (value) == "object" && value.constructor === Object);
- }
-
// Add a predicate on the full field, whether scalar, object, or array.
addToPredicatesList(fieldPath, fieldVal);
@@ -139,29 +82,8 @@ function traverseEvent(event, outputMap, prefixPath = "") {
}
}
-// Helper function to fully exhaust a change stream from the start point and return all events.
-function getAllChangeStreamEvents(extraStages, csOptions) {
- // Open a whole-cluster stream based on the supplied arguments.
- const csCursor = testDB.getMongo().watch(
- extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions));
-
- // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will
- // guarantee that all shards have returned results, and we expect all results to fit into a
- // single batch, so we know we have exhausted the stream.
- while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) {
- csCursor.hasNext(); // runs a getMore
- }
-
- // Close the cursor since we have already retrieved all results.
- csCursor.close();
-
- // Extract all events from the streams. Since the cursor is closed, it will not attempt to
- // retrieve any more batches from the server.
- return csCursor.toArray();
-}
-
// Obtain a list of all events that occurred during the write workload.
-const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"});
+const allEvents = getAllChangeStreamEvents(testDB, [], {fullDocument: "updateLookup"}, startPoint);
jsTestLog(`All events: ${tojson(allEvents)}`);
@@ -253,8 +175,10 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) {
const optimizedPipeline = [matchExpr];
// Extract all results from each of the pipelines.
- const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig);
- const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig);
+ const nonOptimizedOutput =
+ getAllChangeStreamEvents(testDB, nonOptimizedPipeline, csConfig, startPoint);
+ const optimizedOutput =
+ getAllChangeStreamEvents(testDB, optimizedPipeline, csConfig, startPoint);
// We never expect to see more optimized results than unoptimized.
assert(optimizedOutput.length <= nonOptimizedOutput.length,
diff --git a/jstests/change_streams/oplog_rewrite/projection_changes_type.js b/jstests/change_streams/oplog_rewrite/projection_changes_type.js
new file mode 100644
index 00000000000..b463444cea9
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/projection_changes_type.js
@@ -0,0 +1,87 @@
+/**
+ * Tests that a projection which retains expected fields but changes their types does not cause the
+ * change stream framework to throw exceptions. Exercises the fix for SERVER-65497.
+ * @tags: [ requires_fcv_60 ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = jsTestName();
+const collName = "coll1";
+
+// Establish a resume token at a point before anything actually happens in the test.
+const startPoint = db.getMongo().watch().getResumeToken();
+
+const testDB = db.getSiblingDB(dbName);
+const numDocs = 8;
+
+// Generate a write workload for the change stream to consume.
+generateChangeStreamWriteWorkload(testDB, collName, numDocs);
+
+// Obtain a list of all events that occurred during the write workload.
+const fullEvents = getAllChangeStreamEvents(testDB, [], {showExpandedEvents: true}, startPoint);
+
+// Traverse each of the events and build up a projection which empties objects and arrays, and
+// changes the type of all scalar fields. Be sure to retain the _id field unmodified.
+const computedProjection = {};
+for (let event of fullEvents) {
+ for (let fieldName in event) {
+ if (fieldName === "_id" || computedProjection.hasOwnProperty(fieldName)) {
+ continue;
+ }
+ const fieldVal = event[fieldName];
+ if (Array.isArray(fieldVal)) {
+ computedProjection[fieldName] = {$literal: []};
+ } else if (isPlainObject(fieldVal)) {
+ computedProjection[fieldName] = {$literal: {}};
+ } else if (isNumber(fieldVal)) {
+ computedProjection[fieldName] = {$literal: "dummy_value"};
+ } else {
+ computedProjection[fieldName] = {$literal: NumberInt(1)};
+ }
+ }
+}
+
+// Helper function which reads all change stream events, performs the specified projection on each,
+// and confirms both that it succeeds and that all events in the original stream were observed.
+function assertProjection(testProjection) {
+ // Test both $project, which will exclude all but the specified computed fields, and $addFields,
+ // which will retain all fields and overwrite the specified fields with the computed values.
+ for (let projType of ["$project", "$addFields"]) {
+ // Log the projection that we are about to test.
+ jsTestLog(`Testing projection: ${tojsononeline({[projType]: testProjection})}`);
+
+ // Read all events from the stream and apply the projection to each of them.
+ const projectedEvents = getAllChangeStreamEvents(
+ testDB, [{[projType]: testProjection}], {showExpandedEvents: true}, startPoint);
+
+ // Assert that we see the same events in the projected stream as in the original.
+ assert.eq(projectedEvents.map(elem => ({_id: elem._id})),
+ fullEvents.map(elem => ({_id: elem._id})));
+ }
+}
+
+// Extract the list of fields that we observed in the stream. We randomize the order of the array so
+// that, over time, every combination of fields will be tested without having to generate a power
+// set each time this test runs.
+const gitHash = assert.commandWorked(testDB.runCommand("buildInfo")).gitVersion;
+Random.setRandomSeed(Number("0x" + gitHash.substring(0, 13))); // max 2^52-1
+const fieldsToInclude = Object.keys(computedProjection)
+ .map(value => ({value, sort: Random.rand()}))
+ .sort((a, b) => a.sort - b.sort)
+ .map(({value}) => value);
+
+// Now iterate through each field and test both projection of that single field, and projection of
+// all accumulated fields encountered so far.
+const accumulatedProjection = {};
+for (let fieldName of fieldsToInclude) {
+ // Test projection of this single field.
+ const currentFieldProjection = {[fieldName]: computedProjection[fieldName]};
+ assertProjection(currentFieldProjection);
+
+ // Test projection of all accumulated fields.
+ assertProjection(Object.assign(accumulatedProjection, currentFieldProjection));
+}
+})(); \ No newline at end of file
diff --git a/jstests/change_streams/projection_fakes_internal_event.js b/jstests/change_streams/projection_fakes_internal_event.js
new file mode 100644
index 00000000000..77bbbedf358
--- /dev/null
+++ b/jstests/change_streams/projection_fakes_internal_event.js
@@ -0,0 +1,211 @@
+/**
+ * Tests that a user projection which fakes an internal topology-change event is handled gracefully
+ * in a sharded cluster.
+ * TODO SERVER-65778: rework this test when we can handle faked internal events more robustly.
+ * @tags: [assumes_read_preference_unchanged]
+ */
+(function() {
+"use strict";
+
+const numShards = 2;
+
+const st = new ShardingTest({
+ shards: numShards,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+
+const testDB = mongosConn.getDB(jsTestName());
+const adminDB = mongosConn.getDB("admin");
+const testColl = testDB.test;
+
+// Insert one test document that points to a valid shard, and one that points to an invalid shard.
+// These will generate change events that look identical to a config.shards entry, except for 'ns'.
+// It also means that the documentKey field in the resume token will look like a potentially valid
+// new-shard document.
+const existingShardDoc = testDB.getSiblingDB("config").shards.find({_id: st.rs0.name}).next();
+const existingShardWrongNameDoc = {
+ _id: "nonExistentName",
+ host: existingShardDoc.host
+};
+const existingShardWrongHostDoc = {
+ _id: st.rs1.name,
+ host: `${st.rs1.name}/${st.rs1.host}-wrong:${st.rs1.ports[0]}`
+};
+const fakeShardDoc = {
+ _id: "shardX",
+ host: "shardX/nonExistentHost:27017"
+};
+const invalidShardDoc = {
+ _id: "shardY",
+ host: null
+};
+const configDotShardsNs = {
+ db: "config",
+ coll: "shards"
+};
+assert.commandWorked(testColl.insert(existingShardWrongNameDoc));
+assert.commandWorked(testColl.insert(existingShardWrongHostDoc));
+assert.commandWorked(testColl.insert(existingShardDoc));
+assert.commandWorked(testColl.insert(invalidShardDoc));
+assert.commandWorked(testColl.insert(fakeShardDoc));
+
+// Log the shard description documents that we just inserted into the collection.
+jsTestLog("Shard docs: " + tojson(testColl.find().toArray()));
+
+// Helper function which opens a stream with the given projection and asserts that its behaviour
+// conforms to the specified arguments; it will either throw the given error code, or return the
+// expected events. Passing an empty array will confirm that we see no events in the stream. We
+// further confirm that the faked events do not cause additional cursors to be opened.
+function assertChangeStreamBehaviour(projection, expectedEvents, expectedErrorCode = null) {
+ // Can't expect both to see events and to throw an exception.
+ assert(!(expectedEvents && expectedErrorCode));
+
+ // Generate a random ID for this stream.
+ const commentID = `${Math.random()}`;
+
+ // Create a change stream cursor with the specified projection.
+ var csCursor = testColl.watch([{$addFields: projection}],
+ {startAtOperationTime: Timestamp(1, 1), comment: commentID});
+
+ // Confirm that the observed events match the expected events, if specified.
+ if (expectedEvents && expectedEvents.length > 0) {
+ for (let expectedEvent of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const nextEvent = csCursor.next();
+ for (let fieldName in expectedEvent) {
+ assert.eq(
+ expectedEvent[fieldName], nextEvent[fieldName], {expectedEvent, nextEvent});
+ }
+ }
+ }
+ // If there are no expected events, confirm that the token advances without seeing anything.
+ if (expectedEvents && expectedEvents.length == 0) {
+ const startPoint = csCursor.getResumeToken();
+ assert.soon(() => {
+ assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
+ return bsonWoCompare(csCursor.getResumeToken(), startPoint) > 0;
+ });
+ }
+
+ // If we expect an error code, assert that we throw it soon.
+ if (expectedErrorCode) {
+ assert.soon(() => {
+ try {
+ assert.throwsWithCode(() => csCursor.hasNext(), expectedErrorCode);
+ } catch (err) {
+ return false;
+ }
+ return true;
+ });
+ } else {
+ // Otherwise, confirm that we still only have a single cursor on each shard. It's possible
+ // that the same cursor will be listed as both active and inactive, so group by cursorId.
+ const openCursors = adminDB
+ .aggregate([
+ {$currentOp: {idleCursors: true}},
+ {$match: {"cursor.originatingCommand.comment": commentID}},
+ {
+ $group: {
+ _id: {shard: "$shard", cursorId: "$cursor.cursorId"},
+ currentOps: {$push: "$$ROOT"}
+ }
+ }
+ ])
+ .toArray();
+ assert.eq(openCursors.length, numShards, openCursors);
+ }
+
+ // Close the change stream when we are done.
+ csCursor.close();
+}
+
+// Test that a projection which fakes a 'kNewShardDetected' event is swallowed but has no effect.
+let testProjection = {operationType: "kNewShardDetected"};
+assertChangeStreamBehaviour(testProjection, []);
+
+// Test that a projection which fakes an event on config.shards with a non-string operationType is
+// allowed to pass through.
+testProjection = {
+ ns: configDotShardsNs,
+ operationType: null
+};
+assertChangeStreamBehaviour(testProjection, [
+ {operationType: null, fullDocument: existingShardWrongNameDoc},
+ {operationType: null, fullDocument: existingShardWrongHostDoc},
+ {operationType: null, fullDocument: existingShardDoc},
+ {operationType: null, fullDocument: invalidShardDoc},
+ {operationType: null, fullDocument: fakeShardDoc}
+]);
+
+// Test that a projection which fakes an event on config.shards with a non-timestamp clusterTime
+// is allowed to pass through.
+testProjection = {
+ ns: configDotShardsNs,
+ clusterTime: null
+};
+assertChangeStreamBehaviour(testProjection, [
+ {clusterTime: null, fullDocument: existingShardWrongNameDoc},
+ {clusterTime: null, fullDocument: existingShardWrongHostDoc},
+ {clusterTime: null, fullDocument: existingShardDoc},
+ {clusterTime: null, fullDocument: invalidShardDoc},
+ {clusterTime: null, fullDocument: fakeShardDoc}
+]);
+
+// Test that a projection which fakes an event on config.shards with a non-object fullDocument
+// is allowed to pass through.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: null
+};
+assertChangeStreamBehaviour(testProjection, [
+ {fullDocument: null},
+ {fullDocument: null},
+ {fullDocument: null},
+ {fullDocument: null},
+ {fullDocument: null}
+]);
+
+// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
+// pointing to an existing shard is swallowed but has no effect.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: existingShardDoc
+};
+assertChangeStreamBehaviour(testProjection, []);
+
+// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
+// pointing to an existing shard's host, but the wrong shard name, throws as it attempts to connect.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: existingShardWrongNameDoc
+};
+assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound);
+
+// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
+// pointing to an existing shard's name, but the wrong host, is swallowed and has no effect.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: existingShardWrongHostDoc
+};
+assertChangeStreamBehaviour(testProjection, []);
+
+// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
+// pointing to a non-existent shard throws as it attempts to connect.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: fakeShardDoc
+};
+assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound);
+
+// Test that a projection which fakes a new-shard event on config.shards with an invalid
+// fullDocument throws a validation exception.
+testProjection = {
+ ns: configDotShardsNs,
+ fullDocument: invalidShardDoc
+};
+assertChangeStreamBehaviour(testProjection, null, ErrorCodes.TypeMismatch);
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/libs/change_stream_rewrite_util.js b/jstests/libs/change_stream_rewrite_util.js
index b692a64cf4e..593126f5b33 100644
--- a/jstests/libs/change_stream_rewrite_util.js
+++ b/jstests/libs/change_stream_rewrite_util.js
@@ -3,6 +3,106 @@
*/
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load("jstests/libs/fixture_helpers.js"); // For isMongos.
+
+// Function which generates a write workload on the specified collection, including all events that
+// a change stream may consume. Assumes that the specified collection does not already exist.
+function generateChangeStreamWriteWorkload(db, collName, numDocs, includInvalidatingEvents = true) {
+ // If this is a sharded passthrough, make sure we shard on something other than _id so that a
+ // non-id field appears in the documentKey. This will generate 'create' and 'shardCollection'.
+ if (FixtureHelpers.isMongos(db)) {
+ assert.commandWorked(db.adminCommand({enableSharding: db.getName()}));
+ assert.commandWorked(db.adminCommand(
+ {shardCollection: `${db.getName()}.${collName}`, key: {shardKey: "hashed"}}));
+ }
+
+ // If the collection hasn't already been created, do so here.
+ let testColl = assertCreateCollection(db, collName);
+
+ // Build an index, collMod it, then drop it.
+ assert.commandWorked(testColl.createIndex({a: 1}));
+ assert.commandWorked(db.runCommand({
+ collMod: testColl.getName(),
+ index: {keyPattern: {a: 1}, hidden: true, expireAfterSeconds: 500}
+ }));
+ assert.commandWorked(testColl.dropIndex({a: 1}));
+
+ // Insert some documents.
+ for (let i = 0; i < numDocs; ++i) {
+ assert.commandWorked(testColl.insert(
+ {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false}));
+ }
+
+ // Update half of them. We generate these updates individually so that they generate different
+ // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields.
+ const updateSpecs = [
+ [{$set: {f2: true}}], // only populates 'updatedFields'
+ [{$unset: ["f1"]}], // only populates 'removedFields'
+ [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays'
+ [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields
+ ];
+ for (let i = 0; i < numDocs / 2; ++i) {
+ assert.commandWorked(
+ testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)]));
+ }
+
+ // Replace the other half.
+ for (let i = numDocs / 2; i < numDocs; ++i) {
+ assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i}));
+ }
+
+ // Delete half of the updated documents.
+ for (let i = 0; i < numDocs / 4; ++i) {
+ assert.commandWorked(testColl.remove({_id: i, shardKey: i}));
+ }
+
+ // If the caller is prepared to handle potential invalidations, include the following events.
+ if (includInvalidatingEvents) {
+ // Rename the collection.
+ const collNameAfterRename = `${testColl.getName()}_renamed`;
+ assert.commandWorked(testColl.renameCollection(collNameAfterRename));
+ testColl = db[collNameAfterRename];
+
+ // Rename it back.
+ assert.commandWorked(testColl.renameCollection(collName));
+ testColl = db[collName];
+
+ // Drop the collection.
+ assert(testColl.drop());
+
+ // Drop the database.
+ assert.commandWorked(db.dropDatabase());
+ }
+ return testColl;
+}
+
+// Helper function to fully exhaust a change stream from the specified point and return all events.
+// Assumes that all relevant events can fit into a single 16MB batch.
+function getAllChangeStreamEvents(db, extraPipelineStages = [], csOptions = {}, resumeToken) {
+ // Open a whole-cluster stream based on the supplied arguments.
+ const csCursor = db.getMongo().watch(
+ extraPipelineStages,
+ Object.assign({resumeAfter: resumeToken, maxAwaitTimeMS: 1}, csOptions));
+
+ // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will
+ // guarantee that all shards have returned results, and we expect all results to fit into a
+ // single batch, so we know we have exhausted the stream.
+ while (bsonWoCompare(csCursor._postBatchResumeToken, resumeToken) == 0) {
+ csCursor.hasNext(); // runs a getMore
+ }
+
+ // Close the cursor since we have already retrieved all results.
+ csCursor.close();
+
+ // Extract all events from the streams. Since the cursor is closed, it will not attempt to
+ // retrieve any more batches from the server.
+ return csCursor.toArray();
+}
+
+// Helper function to check whether this value is a plain old javascript object.
+function isPlainObject(value) {
+ return (value && typeof (value) == "object" && value.constructor === Object);
+}
// Verifies the number of change streams events returned from a particular shard.
function assertNumChangeStreamDocsReturnedFromShard(stats, shardName, expectedTotalReturned) {
diff --git a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js
index 98a65324eda..338b3d0f02f 100644
--- a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js
+++ b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js
@@ -65,6 +65,11 @@ function shardCollection(collection) {
}
function shardCollectionWithSpec({db, collName, shardKey, timeseriesSpec}) {
+ // Don't attempt to shard if this operation is running on mongoD.
+ if (!FixtureHelpers.isMongos(db)) {
+ return;
+ }
+
var dbName = db.getName();
var fullName = dbName + "." + collName;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 2f0772a29bc..0f32b20aa47 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -58,8 +58,14 @@ bool isShardConfigEvent(const Document& eventDoc) {
auto opType = eventDoc[DocumentSourceChangeStream::kOperationTypeField];
- if (!opType.missing() &&
- opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ // If opType isn't a string, then this document has been manipulated. This means it cannot have
+ // been produced by the internal shard-monitoring cursor that we opened on the config servers,
+ // or by the kNewShardDetectedOpType mechanism, which bypasses filtering and projection stages.
+ if (opType.getType() != BSONType::String) {
+ return false;
+ }
+
+ if (opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) {
// If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the
// client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event
// has reached the mongoS.
@@ -73,10 +79,33 @@ bool isShardConfigEvent(const Document& eventDoc) {
return true;
}
+ // Check whether this event occurred on the config.shards collection.
auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField];
- return nsObj.getType() == BSONType::Object &&
+ const bool isConfigDotShardsEvent = nsObj["db"_sd].getType() == BSONType::String &&
nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() &&
+ nsObj["coll"_sd].getType() == BSONType::String &&
nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll();
+
+ // If it isn't from config.shards, treat it as a normal user event.
+ if (!isConfigDotShardsEvent) {
+ return false;
+ }
+
+ // We need to validate that this event hasn't been faked by a user projection in a way that
+ // would cause us to tassert. Check the clusterTime field, which is needed to determine the
+ // point from which the new shard should start reporting change events.
+ if (eventDoc["clusterTime"].getType() != BSONType::bsonTimestamp) {
+ return false;
+ }
+ // Check the fullDocument field, which should contain details of the new shard's name and hosts.
+ auto fullDocument = eventDoc[DocumentSourceChangeStream::kFullDocumentField];
+ if (opType.getStringData() == "insert"_sd && fullDocument.getType() != BSONType::Object) {
+ return false;
+ }
+
+ // The event is on config.shards and is well-formed. It is still possible that it is a forgery,
+ // but all the user can do is cause their own stream to uassert.
+ return true;
}
} // namespace