diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-04-22 03:41:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-23 03:06:21 +0000 |
commit | 78c459a1f73eda8a89b00472182767dd0eae980f (patch) | |
tree | 8433025da51dcc191e08b177130eefcdefb87be8 | |
parent | d1d2bb4551bb991a2b888cb86249f7702d1ded6d (diff) | |
download | mongo-78c459a1f73eda8a89b00472182767dd0eae980f.tar.gz |
SERVER-65497 Do not assume input document is unmodified in DSChangeStreamHandleTopologyChange
(cherry picked from commit c7b46ffe630f5e323d834d6278330336bbbde32c)
7 files changed, 445 insertions, 86 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml index db39ec22c09..09d056f1e54 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_per_shard_cursor_passthrough.yml @@ -3,6 +3,9 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js + exclude_files: + # Expects a change stream cursor to be open on each of 2 shards. + - jstests/change_streams/projection_fakes_internal_event.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js index d268938640b..323b94f5956 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js @@ -12,70 +12,18 @@ "use strict"; load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. -load("jstests/libs/fixture_helpers.js"); // For isMongos. const dbName = "change_stream_rewrite_null_existence_test"; - const collName = "coll1"; -const collNameAfterRename = "coll_renamed"; // Establish a resume token at a point before anything actually happens in the test. const startPoint = db.getMongo().watch().getResumeToken(); -// If this is a sharded passthrough, make sure we shard on something other than _id. -if (FixtureHelpers.isMongos(db)) { - assertDropCollection(db, collName); - assert.commandWorked(db.adminCommand({enableSharding: dbName})); - assert.commandWorked( - db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}})); -} - const testDB = db.getSiblingDB(dbName); -let testColl = testDB[collName]; - const numDocs = 8; -// Generate the write workload. -(function performWriteWorkload() { - // Insert some documents. - for (let i = 0; i < numDocs; ++i) { - assert.commandWorked(testColl.insert( - {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false})); - } - - // Update half of them. We generate these updates individually so that they generate different - // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields. - const updateSpecs = [ - [{$set: {f2: true}}], // only populates 'updatedFields' - [{$unset: ["f1"]}], // only populates 'removedFields' - [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays' - [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields - ]; - for (let i = 0; i < numDocs / 2; ++i) { - assert.commandWorked( - testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)])); - } - - // Replace the other half. - for (let i = numDocs / 2; i < numDocs; ++i) { - assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i})); - } - - // Delete half of the updated documents. - for (let i = 0; i < numDocs / 4; ++i) { - assert.commandWorked(testColl.remove({_id: i, shardKey: i})); - } -})(); - -// Rename the collection. -assert.commandWorked(testColl.renameCollection(collNameAfterRename)); -testColl = testDB[collNameAfterRename]; - -// Drop the collection. -assert(testColl.drop()); - -// Drop the database. -assert.commandWorked(testDB.dropDatabase()); +// Generate a write workload for the change stream to consume. +generateChangeStreamWriteWorkload(testDB, collName, numDocs); // Function to generate a list of all paths to be tested from those observed in the event stream. function traverseEvent(event, outputMap, prefixPath = "") { @@ -110,11 +58,6 @@ function traverseEvent(event, outputMap, prefixPath = "") { } } - // Helper function to check whether this value is a plain old javascript object. - function isPlainObject(value) { - return (value && typeof (value) == "object" && value.constructor === Object); - } - // Add a predicate on the full field, whether scalar, object, or array. addToPredicatesList(fieldPath, fieldVal); @@ -139,29 +82,8 @@ function traverseEvent(event, outputMap, prefixPath = "") { } } -// Helper function to fully exhaust a change stream from the start point and return all events. -function getAllChangeStreamEvents(extraStages, csOptions) { - // Open a whole-cluster stream based on the supplied arguments. - const csCursor = testDB.getMongo().watch( - extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions)); - - // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will - // guarantee that all shards have returned results, and we expect all results to fit into a - // single batch, so we know we have exhausted the stream. - while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) { - csCursor.hasNext(); // runs a getMore - } - - // Close the cursor since we have already retrieved all results. - csCursor.close(); - - // Extract all events from the streams. Since the cursor is closed, it will not attempt to - // retrieve any more batches from the server. - return csCursor.toArray(); -} - // Obtain a list of all events that occurred during the write workload. -const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"}); +const allEvents = getAllChangeStreamEvents(testDB, [], {fullDocument: "updateLookup"}, startPoint); jsTestLog(`All events: ${tojson(allEvents)}`); @@ -253,8 +175,10 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) { const optimizedPipeline = [matchExpr]; // Extract all results from each of the pipelines. - const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig); - const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig); + const nonOptimizedOutput = + getAllChangeStreamEvents(testDB, nonOptimizedPipeline, csConfig, startPoint); + const optimizedOutput = + getAllChangeStreamEvents(testDB, optimizedPipeline, csConfig, startPoint); // We never expect to see more optimized results than unoptimized. assert(optimizedOutput.length <= nonOptimizedOutput.length, diff --git a/jstests/change_streams/oplog_rewrite/projection_changes_type.js b/jstests/change_streams/oplog_rewrite/projection_changes_type.js new file mode 100644 index 00000000000..b463444cea9 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/projection_changes_type.js @@ -0,0 +1,87 @@ +/** + * Tests that a projection which retains expected fields but changes their types does not cause the + * change stream framework to throw exceptions. Exercises the fix for SERVER-65497. + * @tags: [ requires_fcv_60 ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = jsTestName(); +const collName = "coll1"; + +// Establish a resume token at a point before anything actually happens in the test. +const startPoint = db.getMongo().watch().getResumeToken(); + +const testDB = db.getSiblingDB(dbName); +const numDocs = 8; + +// Generate a write workload for the change stream to consume. +generateChangeStreamWriteWorkload(testDB, collName, numDocs); + +// Obtain a list of all events that occurred during the write workload. +const fullEvents = getAllChangeStreamEvents(testDB, [], {showExpandedEvents: true}, startPoint); + +// Traverse each of the events and build up a projection which empties objects and arrays, and +// changes the type of all scalar fields. Be sure to retain the _id field unmodified. +const computedProjection = {}; +for (let event of fullEvents) { + for (let fieldName in event) { + if (fieldName === "_id" || computedProjection.hasOwnProperty(fieldName)) { + continue; + } + const fieldVal = event[fieldName]; + if (Array.isArray(fieldVal)) { + computedProjection[fieldName] = {$literal: []}; + } else if (isPlainObject(fieldVal)) { + computedProjection[fieldName] = {$literal: {}}; + } else if (isNumber(fieldVal)) { + computedProjection[fieldName] = {$literal: "dummy_value"}; + } else { + computedProjection[fieldName] = {$literal: NumberInt(1)}; + } + } +} + +// Helper function which reads all change stream events, performs the specified projection on each, +// and confirms both that it succeeds and that all events in the original stream were observed. +function assertProjection(testProjection) { + // Test both $project, which will exclude all but the specified computed fields, and $addFields, + // which will retain all fields and overwrite the specified fields with the computed values. + for (let projType of ["$project", "$addFields"]) { + // Log the projection that we are about to test. + jsTestLog(`Testing projection: ${tojsononeline({[projType]: testProjection})}`); + + // Read all events from the stream and apply the projection to each of them. + const projectedEvents = getAllChangeStreamEvents( + testDB, [{[projType]: testProjection}], {showExpandedEvents: true}, startPoint); + + // Assert that we see the same events in the projected stream as in the original. + assert.eq(projectedEvents.map(elem => ({_id: elem._id})), + fullEvents.map(elem => ({_id: elem._id}))); + } +} + +// Extract the list of fields that we observed in the stream. We randomize the order of the array so +// that, over time, every combination of fields will be tested without having to generate a power +// set each time this test runs. +const gitHash = assert.commandWorked(testDB.runCommand("buildInfo")).gitVersion; +Random.setRandomSeed(Number("0x" + gitHash.substring(0, 13))); // max 2^52-1 +const fieldsToInclude = Object.keys(computedProjection) + .map(value => ({value, sort: Random.rand()})) + .sort((a, b) => a.sort - b.sort) + .map(({value}) => value); + +// Now iterate through each field and test both projection of that single field, and projection of +// all accumulated fields encountered so far. +const accumulatedProjection = {}; +for (let fieldName of fieldsToInclude) { + // Test projection of this single field. + const currentFieldProjection = {[fieldName]: computedProjection[fieldName]}; + assertProjection(currentFieldProjection); + + // Test projection of all accumulated fields. + assertProjection(Object.assign(accumulatedProjection, currentFieldProjection)); +} +})();
\ No newline at end of file diff --git a/jstests/change_streams/projection_fakes_internal_event.js b/jstests/change_streams/projection_fakes_internal_event.js new file mode 100644 index 00000000000..77bbbedf358 --- /dev/null +++ b/jstests/change_streams/projection_fakes_internal_event.js @@ -0,0 +1,211 @@ +/** + * Tests that a user projection which fakes an internal topology-change event is handled gracefully + * in a sharded cluster. + * TODO SERVER-65778: rework this test when we can handle faked internal events more robustly. + * @tags: [assumes_read_preference_unchanged] + */ +(function() { +"use strict"; + +const numShards = 2; + +const st = new ShardingTest({ + shards: numShards, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; + +const testDB = mongosConn.getDB(jsTestName()); +const adminDB = mongosConn.getDB("admin"); +const testColl = testDB.test; + +// Insert one test document that points to a valid shard, and one that points to an invalid shard. +// These will generate change events that look identical to a config.shards entry, except for 'ns'. +// It also means that the documentKey field in the resume token will look like a potentially valid +// new-shard document. +const existingShardDoc = testDB.getSiblingDB("config").shards.find({_id: st.rs0.name}).next(); +const existingShardWrongNameDoc = { + _id: "nonExistentName", + host: existingShardDoc.host +}; +const existingShardWrongHostDoc = { + _id: st.rs1.name, + host: `${st.rs1.name}/${st.rs1.host}-wrong:${st.rs1.ports[0]}` +}; +const fakeShardDoc = { + _id: "shardX", + host: "shardX/nonExistentHost:27017" +}; +const invalidShardDoc = { + _id: "shardY", + host: null +}; +const configDotShardsNs = { + db: "config", + coll: "shards" +}; +assert.commandWorked(testColl.insert(existingShardWrongNameDoc)); +assert.commandWorked(testColl.insert(existingShardWrongHostDoc)); +assert.commandWorked(testColl.insert(existingShardDoc)); +assert.commandWorked(testColl.insert(invalidShardDoc)); +assert.commandWorked(testColl.insert(fakeShardDoc)); + +// Log the shard description documents that we just inserted into the collection. +jsTestLog("Shard docs: " + tojson(testColl.find().toArray())); + +// Helper function which opens a stream with the given projection and asserts that its behaviour +// conforms to the specified arguments; it will either throw the given error code, or return the +// expected events. Passing an empty array will confirm that we see no events in the stream. We +// further confirm that the faked events do not cause additional cursors to be opened. +function assertChangeStreamBehaviour(projection, expectedEvents, expectedErrorCode = null) { + // Can't expect both to see events and to throw an exception. + assert(!(expectedEvents && expectedErrorCode)); + + // Generate a random ID for this stream. + const commentID = `${Math.random()}`; + + // Create a change stream cursor with the specified projection. + var csCursor = testColl.watch([{$addFields: projection}], + {startAtOperationTime: Timestamp(1, 1), comment: commentID}); + + // Confirm that the observed events match the expected events, if specified. + if (expectedEvents && expectedEvents.length > 0) { + for (let expectedEvent of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const nextEvent = csCursor.next(); + for (let fieldName in expectedEvent) { + assert.eq( + expectedEvent[fieldName], nextEvent[fieldName], {expectedEvent, nextEvent}); + } + } + } + // If there are no expected events, confirm that the token advances without seeing anything. + if (expectedEvents && expectedEvents.length == 0) { + const startPoint = csCursor.getResumeToken(); + assert.soon(() => { + assert(!csCursor.hasNext(), () => tojson(csCursor.next())); + return bsonWoCompare(csCursor.getResumeToken(), startPoint) > 0; + }); + } + + // If we expect an error code, assert that we throw it soon. + if (expectedErrorCode) { + assert.soon(() => { + try { + assert.throwsWithCode(() => csCursor.hasNext(), expectedErrorCode); + } catch (err) { + return false; + } + return true; + }); + } else { + // Otherwise, confirm that we still only have a single cursor on each shard. It's possible + // that the same cursor will be listed as both active and inactive, so group by cursorId. + const openCursors = adminDB + .aggregate([ + {$currentOp: {idleCursors: true}}, + {$match: {"cursor.originatingCommand.comment": commentID}}, + { + $group: { + _id: {shard: "$shard", cursorId: "$cursor.cursorId"}, + currentOps: {$push: "$$ROOT"} + } + } + ]) + .toArray(); + assert.eq(openCursors.length, numShards, openCursors); + } + + // Close the change stream when we are done. + csCursor.close(); +} + +// Test that a projection which fakes a 'kNewShardDetected' event is swallowed but has no effect. +let testProjection = {operationType: "kNewShardDetected"}; +assertChangeStreamBehaviour(testProjection, []); + +// Test that a projection which fakes an event on config.shards with a non-string operationType is +// allowed to pass through. +testProjection = { + ns: configDotShardsNs, + operationType: null +}; +assertChangeStreamBehaviour(testProjection, [ + {operationType: null, fullDocument: existingShardWrongNameDoc}, + {operationType: null, fullDocument: existingShardWrongHostDoc}, + {operationType: null, fullDocument: existingShardDoc}, + {operationType: null, fullDocument: invalidShardDoc}, + {operationType: null, fullDocument: fakeShardDoc} +]); + +// Test that a projection which fakes an event on config.shards with a non-timestamp clusterTime +// is allowed to pass through. +testProjection = { + ns: configDotShardsNs, + clusterTime: null +}; +assertChangeStreamBehaviour(testProjection, [ + {clusterTime: null, fullDocument: existingShardWrongNameDoc}, + {clusterTime: null, fullDocument: existingShardWrongHostDoc}, + {clusterTime: null, fullDocument: existingShardDoc}, + {clusterTime: null, fullDocument: invalidShardDoc}, + {clusterTime: null, fullDocument: fakeShardDoc} +]); + +// Test that a projection which fakes an event on config.shards with a non-object fullDocument +// is allowed to pass through. +testProjection = { + ns: configDotShardsNs, + fullDocument: null +}; +assertChangeStreamBehaviour(testProjection, [ + {fullDocument: null}, + {fullDocument: null}, + {fullDocument: null}, + {fullDocument: null}, + {fullDocument: null} +]); + +// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument +// pointing to an existing shard is swallowed but has no effect. +testProjection = { + ns: configDotShardsNs, + fullDocument: existingShardDoc +}; +assertChangeStreamBehaviour(testProjection, []); + +// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument +// pointing to an existing shard's host, but the wrong shard name, throws as it attempts to connect. +testProjection = { + ns: configDotShardsNs, + fullDocument: existingShardWrongNameDoc +}; +assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound); + +// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument +// pointing to an existing shard's name, but the wrong host, is swallowed and has no effect. +testProjection = { + ns: configDotShardsNs, + fullDocument: existingShardWrongHostDoc +}; +assertChangeStreamBehaviour(testProjection, []); + +// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument +// pointing to a non-existent shard throws as it attempts to connect. +testProjection = { + ns: configDotShardsNs, + fullDocument: fakeShardDoc +}; +assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound); + +// Test that a projection which fakes a new-shard event on config.shards with an invalid +// fullDocument throws a validation exception. +testProjection = { + ns: configDotShardsNs, + fullDocument: invalidShardDoc +}; +assertChangeStreamBehaviour(testProjection, null, ErrorCodes.TypeMismatch); + +st.stop(); +})();
\ No newline at end of file diff --git a/jstests/libs/change_stream_rewrite_util.js b/jstests/libs/change_stream_rewrite_util.js index b692a64cf4e..593126f5b33 100644 --- a/jstests/libs/change_stream_rewrite_util.js +++ b/jstests/libs/change_stream_rewrite_util.js @@ -3,6 +3,106 @@ */ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/fixture_helpers.js"); // For isMongos. + +// Function which generates a write workload on the specified collection, including all events that +// a change stream may consume. Assumes that the specified collection does not already exist. +function generateChangeStreamWriteWorkload(db, collName, numDocs, includInvalidatingEvents = true) { + // If this is a sharded passthrough, make sure we shard on something other than _id so that a + // non-id field appears in the documentKey. This will generate 'create' and 'shardCollection'. + if (FixtureHelpers.isMongos(db)) { + assert.commandWorked(db.adminCommand({enableSharding: db.getName()})); + assert.commandWorked(db.adminCommand( + {shardCollection: `${db.getName()}.${collName}`, key: {shardKey: "hashed"}})); + } + + // If the collection hasn't already been created, do so here. + let testColl = assertCreateCollection(db, collName); + + // Build an index, collMod it, then drop it. + assert.commandWorked(testColl.createIndex({a: 1})); + assert.commandWorked(db.runCommand({ + collMod: testColl.getName(), + index: {keyPattern: {a: 1}, hidden: true, expireAfterSeconds: 500} + })); + assert.commandWorked(testColl.dropIndex({a: 1})); + + // Insert some documents. + for (let i = 0; i < numDocs; ++i) { + assert.commandWorked(testColl.insert( + {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false})); + } + + // Update half of them. We generate these updates individually so that they generate different + // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields. + const updateSpecs = [ + [{$set: {f2: true}}], // only populates 'updatedFields' + [{$unset: ["f1"]}], // only populates 'removedFields' + [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays' + [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields + ]; + for (let i = 0; i < numDocs / 2; ++i) { + assert.commandWorked( + testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)])); + } + + // Replace the other half. + for (let i = numDocs / 2; i < numDocs; ++i) { + assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i})); + } + + // Delete half of the updated documents. + for (let i = 0; i < numDocs / 4; ++i) { + assert.commandWorked(testColl.remove({_id: i, shardKey: i})); + } + + // If the caller is prepared to handle potential invalidations, include the following events. + if (includInvalidatingEvents) { + // Rename the collection. + const collNameAfterRename = `${testColl.getName()}_renamed`; + assert.commandWorked(testColl.renameCollection(collNameAfterRename)); + testColl = db[collNameAfterRename]; + + // Rename it back. + assert.commandWorked(testColl.renameCollection(collName)); + testColl = db[collName]; + + // Drop the collection. + assert(testColl.drop()); + + // Drop the database. + assert.commandWorked(db.dropDatabase()); + } + return testColl; +} + +// Helper function to fully exhaust a change stream from the specified point and return all events. +// Assumes that all relevant events can fit into a single 16MB batch. +function getAllChangeStreamEvents(db, extraPipelineStages = [], csOptions = {}, resumeToken) { + // Open a whole-cluster stream based on the supplied arguments. + const csCursor = db.getMongo().watch( + extraPipelineStages, + Object.assign({resumeAfter: resumeToken, maxAwaitTimeMS: 1}, csOptions)); + + // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will + // guarantee that all shards have returned results, and we expect all results to fit into a + // single batch, so we know we have exhausted the stream. + while (bsonWoCompare(csCursor._postBatchResumeToken, resumeToken) == 0) { + csCursor.hasNext(); // runs a getMore + } + + // Close the cursor since we have already retrieved all results. + csCursor.close(); + + // Extract all events from the streams. Since the cursor is closed, it will not attempt to + // retrieve any more batches from the server. + return csCursor.toArray(); +} + +// Helper function to check whether this value is a plain old javascript object. +function isPlainObject(value) { + return (value && typeof (value) == "object" && value.constructor === Object); +} // Verifies the number of change streams events returned from a particular shard. function assertNumChangeStreamDocsReturnedFromShard(stats, shardName, expectedTotalReturned) { diff --git a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js index 98a65324eda..338b3d0f02f 100644 --- a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js +++ b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js @@ -65,6 +65,11 @@ function shardCollection(collection) { } function shardCollectionWithSpec({db, collName, shardKey, timeseriesSpec}) { + // Don't attempt to shard if this operation is running on mongoD. + if (!FixtureHelpers.isMongos(db)) { + return; + } + var dbName = db.getName(); var fullName = dbName + "." + collName; diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 2f0772a29bc..0f32b20aa47 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -58,8 +58,14 @@ bool isShardConfigEvent(const Document& eventDoc) { auto opType = eventDoc[DocumentSourceChangeStream::kOperationTypeField]; - if (!opType.missing() && - opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) { + // If opType isn't a string, then this document has been manipulated. This means it cannot have + // been produced by the internal shard-monitoring cursor that we opened on the config servers, + // or by the kNewShardDetectedOpType mechanism, which bypasses filtering and projection stages. + if (opType.getType() != BSONType::String) { + return false; + } + + if (opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) { // If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the // client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event // has reached the mongoS. @@ -73,10 +79,33 @@ bool isShardConfigEvent(const Document& eventDoc) { return true; } + // Check whether this event occurred on the config.shards collection. auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField]; - return nsObj.getType() == BSONType::Object && + const bool isConfigDotShardsEvent = nsObj["db"_sd].getType() == BSONType::String && nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() && + nsObj["coll"_sd].getType() == BSONType::String && nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll(); + + // If it isn't from config.shards, treat it as a normal user event. + if (!isConfigDotShardsEvent) { + return false; + } + + // We need to validate that this event hasn't been faked by a user projection in a way that + // would cause us to tassert. Check the clusterTime field, which is needed to determine the + // point from which the new shard should start reporting change events. + if (eventDoc["clusterTime"].getType() != BSONType::bsonTimestamp) { + return false; + } + // Check the fullDocument field, which should contain details of the new shard's name and hosts. + auto fullDocument = eventDoc[DocumentSourceChangeStream::kFullDocumentField]; + if (opType.getStringData() == "insert"_sd && fullDocument.getType() != BSONType::Object) { + return false; + } + + // The event is on config.shards and is well-formed. It is still possible that it is a forgery, + // but all the user can do is cause their own stream to uassert. + return true; } } // namespace |