diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-10-17 20:26:03 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-17 20:26:03 +0000 |
commit | 97cc7b5838db4ef13ede3149c44bceca8f5c2977 (patch) | |
tree | 38d9b78346777033545f99048f947559a67a9ab1 | |
parent | 1daf063435aa3f748840c476f8fa3bd13d7d2a68 (diff) | |
download | mongo-97cc7b5838db4ef13ede3149c44bceca8f5c2977.tar.gz |
SERVER-42723 New shard with new database can be ignored by change streams
14 files changed, 259 insertions, 91 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index f3317e6811b..11f855555f9 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -24,6 +24,7 @@ selector: - jstests/sharding/prepared_txn_metadata_refresh.js # Enable when 4.4 becomes last stable - jstests/sharding/bulk_insert.js + - jstests/sharding/change_streams_new_shard_new_database.js - jstests/sharding/clear_jumbo.js - jstests/sharding/comment_field.js - jstests/sharding/covered_shard_key_indexes.js diff --git a/jstests/change_streams/required_as_first_stage.js b/jstests/change_streams/required_as_first_stage.js index 2c5128f4865..3920c9324c7 100644 --- a/jstests/change_streams/required_as_first_stage.js +++ b/jstests/change_streams/required_as_first_stage.js @@ -20,14 +20,14 @@ assertErrorCode( 40602); let error = assert.throws(() => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}])); -assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error)); +assert.contains(error.code, [40602], "Unexpected error: " + tojson(error)); error = assert.throws( () => coll.aggregate([{$sort: {x: 1}}, {$changeStream: {}}], {allowDiskUse: true})); -assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error)); +assert.contains(error.code, [40602], "Unexpected error: " + tojson(error)); error = assert.throws(() => coll.aggregate([{$group: {_id: "$x"}}, {$changeStream: {}}])); -assert.contains(error.code, [40602, 50988], "Unexpected error: " + tojson(error)); +assert.contains(error.code, [40602], "Unexpected error: " + tojson(error)); // This one has a different error code because of conflicting host type requirements: the $group // needs to merge on a shard, but the $changeStream needs to merge on mongos. This doesn't diff --git a/jstests/libs/override_methods/continuous_stepdown.js b/jstests/libs/override_methods/continuous_stepdown.js index 3bd0ba101e2..48e4a79f197 100644 --- a/jstests/libs/override_methods/continuous_stepdown.js +++ b/jstests/libs/override_methods/continuous_stepdown.js @@ -176,6 +176,9 @@ ContinuousStepdown.configure = function(stepdownOptions, * Overrides the ReplSetTest constructor to start the continuous primary stepdown thread. */ ReplSetTest = function ReplSetTestWithContinuousPrimaryStepdown() { + // Preserve the original set of nodeOptions passed to the constructor. + const origNodeOpts = Object.assign({}, (arguments[0] && arguments[0].nodeOptions) || {}); + // Construct the original object originalReplSetTest.apply(this, arguments); @@ -185,24 +188,31 @@ ContinuousStepdown.configure = function(stepdownOptions, const _originalAwaitLastOpCommitted = this.awaitLastOpCommitted; /** - * Overrides startSet call to increase logging verbosity. + * Overrides startSet call to increase logging verbosity. Ensure that we only override the + * 'logComponentVerbosity' server parameter, but retain any other parameters that were + * supplied during ReplSetTest construction. */ this.startSet = function() { - let options = arguments[0] || {}; - - if (typeof (options.setParameter) === "string") { - var eqIdx = options.setParameter.indexOf("="); - if (eqIdx != -1) { - var param = options.setParameter.substring(0, eqIdx); - var value = options.setParameter.substring(eqIdx + 1); - options.setParameter = {}; - options.setParameter[param] = value; + // Helper function to convert a string representation of setParameter to object form. + function setParamToObj(setParam) { + if (typeof (setParam) === "string") { + var eqIdx = setParam.indexOf("="); + if (eqIdx != -1) { + var param = setParam.substring(0, eqIdx); + var value = setParam.substring(eqIdx + 1); + return {[param]: value}; + } } + return Object.assign({}, setParam || {}); } + + const options = arguments[0] || {}; + + options.setParameter = Object.assign(setParamToObj(origNodeOpts.setParameter), + setParamToObj(options.setParameter), + {logComponentVerbosity: verbositySetting}); arguments[0] = options; - options.setParameter = options.setParameter || {}; - options.setParameter.logComponentVerbosity = verbositySetting; return _originalStartSetFn.apply(this, arguments); }; diff --git a/jstests/sharding/change_stream_show_migration_events.js b/jstests/sharding/change_stream_show_migration_events.js index 570a8039a8c..ba73cdd6ab3 100644 --- a/jstests/sharding/change_stream_show_migration_events.js +++ b/jstests/sharding/change_stream_show_migration_events.js @@ -1,4 +1,4 @@ -// Tests the undocumented 'showChunkMigrations' option for change streams. +// Tests the undocumented 'showMigrationEvents' option for change streams. // // This test is connecting directly to a shard, and change streams require the getMore command. // @tags: [requires_find_command, uses_change_streams] diff --git a/jstests/sharding/change_streams_new_shard_new_database.js b/jstests/sharding/change_streams_new_shard_new_database.js new file mode 100644 index 00000000000..a188d2e68ee --- /dev/null +++ b/jstests/sharding/change_streams_new_shard_new_database.js @@ -0,0 +1,88 @@ +/** + * Tests that existing whole-cluster, whole-db and single-collection $changeStreams correctly pick + * up events on a newly-added shard when a new unsharded collection is created on it. Exercises the + * fix for SERVER-42723. + * Tagging as 'requires_find_command' to ensure that this test is not run in the legacy protocol + * passthroughs. Legacy getMore fails in cases where it is run on a database or collection which + * does not yet exist. + * @tags: [uses_change_streams, requires_sharding, requires_find_command] + */ +(function() { + +"use strict"; + +const rsNodeOptions = { + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} +}; +const st = + new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}}); + +// We require one 'test' database and a second 'other' database. +const oldShardDB = st.s.getDB(jsTestName() + "_other"); +const newShardDB = st.s.getDB(jsTestName()); + +const configDB = st.s.getDB("config"); +const adminDB = st.s.getDB("admin"); + +const oldShardColl = oldShardDB.coll; +const newShardColl = newShardDB.test; + +// Helper function to add a new ReplSetTest shard into the cluster. +function addShardToCluster(shardName) { + const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions}); + replTest.startSet({shardsvr: ""}); + replTest.initiate(); + assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName})); + return replTest; +} + +// Helper function to confirm that a stream sees an expected sequence of documents. +function assertAllEventsObserved(changeStream, expectedDocs) { + for (let expectedDoc of expectedDocs) { + assert.soon(() => changeStream.hasNext()); + const nextEvent = changeStream.next(); + assert.docEq(nextEvent.fullDocument, expectedDoc); + } +} + +// Open a whole-db change stream on the as yet non-existent database. +const wholeDBCS = newShardDB.watch(); + +// Open a single-collection change stream on a namespace within the non-existent database. +const singleCollCS = newShardColl.watch(); + +// Open a whole-cluster stream on the deployment. +const wholeClusterCS = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]); + +// Insert some data into the 'other' database on the only existing shard. This should ensure that +// the primary shard of the test database will be created on the second shard, after it is added. +const insertedDocs = Array.from({length: 20}, (_, i) => ({_id: i})); +assert.commandWorked(oldShardColl.insert(insertedDocs)); + +// Verify that the whole-cluster stream sees all these events. +assertAllEventsObserved(wholeClusterCS, insertedDocs); + +// Verify that the other two streams did not see any of the insertions on the 'other' collection. +for (let csCursor of [wholeDBCS, singleCollCS]) { + assert(!csCursor.hasNext()); +} + +// Now add a new shard into the cluster... +const newShard1 = addShardToCluster("newShard1"); + +// ... create a new database and collection, and verify that they were placed on the new shard.... +assert.commandWorked(newShardDB.runCommand({create: newShardColl.getName()})); +assert(configDB.databases.findOne({_id: newShardDB.getName(), primary: "newShard1"})); + +// ... insert some documents into the new, unsharded collection on the new shard... +assert.commandWorked(newShardColl.insert(insertedDocs)); + +// ... and confirm that all the pre-existing streams see all of these events. +for (let csCursor of [singleCollCS, wholeDBCS, wholeClusterCS]) { + assertAllEventsObserved(csCursor, insertedDocs); +} + +// Stop the new shard manually since the ShardingTest doesn't know anything about it. +st.stop(); +newShard1.stopSet(); +})(); diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js index 9ab4b1901fa..b5d869af9df 100644 --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -97,9 +97,7 @@ function testUnshardedBecomesSharded(collToWatch) { ]; // Verify that the cursor on the original shard is still valid and sees new inserted - // documents. The 'documentKey' field should now include the shard key, even before a - // 'kNewShardDetected' operation has been generated by the migration of a chunk to a new - // shard. + // documents. The 'documentKey' field should now include the shard key. assert.commandWorked(mongosColl.insert({_id: 1, x: 1})); assert.commandWorked(mongosCollOther.insert({_id: 1, y: 1})); cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]}); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a9d80a0e2a0..1049a9b04aa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, - Timestamp startFrom, - bool startFromInclusive, + Timestamp startFromInclusive, bool showMigrationEvents) { auto nss = expCtx->ns; @@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // 2.1) Normal CRUD ops. auto normalOpTypeMatch = BSON("op" << NE << "n"); + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove + // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a + // subset of shards. See SERVER-44039 for details. + // 2.2) A chunk gets migrated to a new shard that doesn't have any chunks. auto chunkMigratedNewShardMatch = BSON("op" << "n" @@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is // set - exempt all other operations and commands with that tag. Include the resume token, if // resuming, so we can verify it was still present in the oplog. - return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) + return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive) << BSON(OR(opMatch, commandAndApplyOpsMatch)))); } @@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression } } + // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'. if (auto startAtOperationTime = spec.getStartAtOperationTime()) { uassert(40674, "Only one type of resume option is allowed, but multiple were found.", @@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); } - // There might not be a starting point if we're on mongos, otherwise we should either have a - // 'resumeAfter' starting point, or should start from the latest majority committed operation. + // We can only run on a replica set, or through mongoS. Confirm that this is the case. auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); uassert( 40573, @@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression expCtx->inMongos || (replCoord && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet)); - if (!startFrom && !expCtx->inMongos) { - startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); + + // If we do not have an explicit starting point, we should start from the latest majority + // committed operation. If we are on mongoS and do not have a starting point, set it to the + // current clusterTime so that all shards start in sync. We always start one tick beyond the + // most recent operation, to ensure that the stream does not return it. + if (!startFrom) { + const auto currentTime = !expCtx->inMongos + ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} + : LogicalClock::get(expCtx->opCtx)->getClusterTime(); + startFrom = currentTime.addTicks(1).asTimestamp(); } - if (startFrom) { - const bool startFromInclusive = (resumeStage != nullptr); - stages.push_back(DocumentSourceOplogMatch::create( - DocumentSourceChangeStream::buildMatchFilter( - expCtx, *startFrom, startFromInclusive, showMigrationEvents), - expCtx)); - - // If we haven't already populated the initial PBRT, then we are starting from a specific - // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. - if (expCtx->initialPostBatchResumeToken.isEmpty()) { - Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; - expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson(); - } + // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies + // upon the fact that it is always the first stage in the pipeline. + stages.push_back(DocumentSourceOplogMatch::create( + DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents), + expCtx)); + + // If we haven't already populated the initial PBRT, then we are starting from a specific + // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. + if (expCtx->initialPostBatchResumeToken.isEmpty()) { + expCtx->initialPostBatchResumeToken = + ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson(); } // Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage. @@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( (expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS())); // Prevent $changeStream from running on internal databases. A stream may run against the - // 'admin' database iff 'allChangesForCluster' is true. + // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config' + // database iff 'allowToRunOnConfigDB' is true. + const bool isNotBannedInternalDB = + !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db() << " database", - expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() - : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB())); + expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB); // Prevent $changeStream from running on internal collections in any database. uassert(ErrorCodes::InvalidNamespace, diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 182a9373e12..d3492b0c319 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -166,7 +166,6 @@ public: */ static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, - bool startFromInclusive, bool showMigrationEvents); /** diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 93a92c25173..410e5ab9f15 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -109,3 +109,11 @@ structs: deletes may appear that do not reflect actual deletions or insertions of data. Instead they reflect this data moving from one shard to another. + allowToRunOnConfigDB: + cpp_name: allowToRunOnConfigDB + type: bool + default: false + description: A flag indicating whether the change stream may be opened on the + 'config' database, which is usually banned. This flag is used + internally to allow mongoS to open a stream on 'config.shards', in + order to monitor for the addition of new shards to the cluster.
\ No newline at end of file diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index a147e5271c3..9d86e364189 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -362,14 +362,11 @@ Value DocumentSourceChangeStreamTransform::serialize( changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAfterFieldName].missing()) { MutableDocument newChangeStreamOptions(changeStreamOptions); - // Use the current cluster time plus 1 tick since the oplog query will include all - // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In - // particular, avoid including the last operation that went through mongos in an attempt to - // match the behavior of a replica set more closely. - auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime(); - clusterTime.addTicks(1); - newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = - Value(clusterTime.asTimestamp()); + // Configure the serialized $changeStream to start from the initial high-watermark + // postBatchResumeToken which we generated while parsing the $changeStream pipeline. + invariant(!pExpCtx->initialPostBatchResumeToken.isEmpty()); + newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = + Value(pExpCtx->initialPostBatchResumeToken); changeStreamOptions = newChangeStreamOptions.freeze(); } return Value(Document{{getSourceName(), changeStreamOptions}}); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index a469b6d9807..f451d139d21 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -36,13 +36,17 @@ #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/curop.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/query/find_common.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/s/query/cluster_aggregation_planner.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -109,6 +113,33 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } +/** + * Open a $changeStream cursor on the 'config.shards' collection to watch for new shards. + */ +RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Timestamp startMonitoringAtTime) { + const auto& configShard = Grid::get(expCtx->opCtx)->shardRegistry()->getConfigShard(); + // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}} + AggregationRequest aggReq( + ShardType::ConfigNS, + {BSON(DocumentSourceChangeStream::kStageName + << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName + << startMonitoringAtTime + << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); + aggReq.setFromMongos(true); + aggReq.setNeedsMerge(true); + aggReq.setBatchSize(0); + auto configCursor = + establishCursors(expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + aggReq.getNamespaceString(), + ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, + {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}}, + false); + invariant(configCursor.size() == 1); + return std::move(*configCursor.begin()); +} + Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) { // The idempotent retry policy will retry even for writeConcern failures, so only set it if the // pipeline does not support writeConcern. @@ -575,13 +606,17 @@ DispatchShardPipelineResults dispatchShardPipeline( pipeline.get(), expCtx->collation); - // In order for a $changeStream to work reliably, we need the shard registry to be at least as - // current as the logical time at which the pipeline was serialized to 'targetedCommand' above. - // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines - // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines - // may not have been forced to split if there was only one shard in the cluster when the command - // began execution. If a shard was added since the earlier targeting logic ran, then refreshing - // here may cause us to illegally target an unsplit pipeline to more than one shard. + // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the + // config server in order to monitor for new shards. To guarantee that we do not miss any + // shards, we must ensure that the list of shards to which we initially dispatch the pipeline is + // at least as current as the logical time at which the stream begins scanning for new shards. + // We therefore set 'shardRegistryReloadTime' to the current clusterTime and then hard-reload + // the shard registry. We don't refresh for other pipelines that must run on all shards (e.g. + // $currentOp) because, unlike $changeStream, those pipelines may not have been forced to split + // if there was only one shard in the cluster when the command began execution. If a shard was + // added since the earlier targeting logic ran, then refreshing here may cause us to illegally + // target an unsplit pipeline to more than one shard. + auto shardRegistryReloadTime = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); if (hasChangeStream) { auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { @@ -636,12 +671,19 @@ DispatchShardPipelineResults dispatchShardPipeline( invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() << ") is not a multiple of producers (" << shardIds.size() << ")"); + + // For $changeStream, we must open an extra cursor on the 'config.shards' collection, so + // that we can monitor for the addition of new shards inline with real events. + if (hasChangeStream && expCtx->ns.db() != ShardType::ConfigNS.db()) { + cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime)); + } } // Convert remote cursors into a vector of "owned" cursors. std::vector<OwnedRemoteCursor> ownedCursors; for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns)); + auto cursorNss = cursor.getCursorResponse().getNSS(); + ownedCursors.emplace_back(opCtx, std::move(cursor), std::move(cursorNss)); } // Record the number of shards involved in the aggregation. If we are required to merge on diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index bc89f6aa19a..ae6aaf092b4 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -396,7 +396,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { } executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), remote.cursorNss.db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp index d94a99518b7..4ae4318c997 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/async_results_merger_params_gen.h" @@ -40,10 +41,20 @@ namespace mongo { namespace { -// Returns true if the change stream document has an 'operationType' of 'newShardDetected'. -bool needsUpdate(const Document& childResult) { - return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() == - DocumentSourceChangeStream::kNewShardDetectedOpType; +// Returns true if the change stream document is an event in 'config.shards'. +bool isShardConfigEvent(const Document& eventDoc) { + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event + // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where + // a change stream is targeted to a subset of shards. See SERVER-44039 for details. + if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() == + DocumentSourceChangeStream::kNewShardDetectedOpType) { + return true; + } + auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField]; + return nsObj.getType() == BSONType::Object && + nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() && + nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll(); } } // namespace @@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( : DocumentSource(kStageName, expCtx), _executor(std::move(executor)), _mergeCursors(mergeCursors), - _shardsWithCursors(std::move(shardsWithCursors)), + _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()), _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { auto childResult = pSource->getNext(); - while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { - addNewShardCursors(childResult.getDocument()); + // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard. + while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) { + auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField]; + if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) { + addNewShardCursors(childResult.getDocument()); + } + // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType. childResult = pSource->getNext(); } return childResult; @@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( const Document& newShardDetectedObj) { - auto* opCtx = pExpCtx->opCtx; // Reload the shard registry. We need to ensure a reload initiated after calling this method - // caused the reload, otherwise we aren't guaranteed to get all the new shards. - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { + // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload. + auto* opCtx = pExpCtx->opCtx; + if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) { // A 'false' return from shardRegistry.reload() means a reload was already in progress and // it completed before reload() returned. So another reload(), regardless of return value, // will ensure a reload started after the first call to reload(). - shardRegistry->reload(opCtx); + Grid::get(opCtx)->shardRegistry()->reload(opCtx); } - std::vector<ShardId> shardIds, newShardIds; - shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end()); - std::sort(shardIds.begin(), shardIds.end()); - std::set_difference(shardIds.begin(), - shardIds.end(), - _shardsWithCursors.begin(), - _shardsWithCursors.end(), - std::back_inserter(newShardIds)); + // Parse the new shard's information from the document inserted into 'config.shards'. + auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField]; + auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson())); - auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( - _cmdToRunOnNewShards, - newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument()); - std::vector<std::pair<ShardId, BSONObj>> requests; - for (const auto& shardId : newShardIds) { - requests.emplace_back(shardId, cmdObj); - _shardsWithCursors.push_back(shardId); + // Make sure we are not attempting to open a cursor on a shard that already has one. + if (!_shardsWithCursors.insert(newShard.getName()).second) { + return {}; } + + // We must start the new cursor from the moment at which the shard became visible. + const auto newShardAddedTime = LogicalTime{ + newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()}; + auto resumeTokenForNewShard = + ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( + _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument()); + const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, _executor, pExpCtx->ns, ReadPreferenceSetting::get(opCtx), - requests, + {{newShard.getName(), cmdObj}}, allowPartialResults); } diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 0b41fde92d1..ff76d2ce90e 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -102,7 +102,7 @@ private: std::shared_ptr<executor::TaskExecutor> _executor; boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors; - std::vector<ShardId> _shardsWithCursors; + std::set<ShardId> _shardsWithCursors; BSONObj _cmdToRunOnNewShards; }; } // namespace mongo |