diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-10-17 20:26:03 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-17 20:26:03 +0000 |
commit | 97cc7b5838db4ef13ede3149c44bceca8f5c2977 (patch) | |
tree | 38d9b78346777033545f99048f947559a67a9ab1 /src/mongo | |
parent | 1daf063435aa3f748840c476f8fa3bd13d7d2a68 (diff) | |
download | mongo-97cc7b5838db4ef13ede3149c44bceca8f5c2977.tar.gz |
SERVER-42723 New shard with new database can be ignored by change streams
Diffstat (limited to 'src/mongo')
8 files changed, 143 insertions, 72 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a9d80a0e2a0..1049a9b04aa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, - Timestamp startFrom, - bool startFromInclusive, + Timestamp startFromInclusive, bool showMigrationEvents) { auto nss = expCtx->ns; @@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // 2.1) Normal CRUD ops. auto normalOpTypeMatch = BSON("op" << NE << "n"); + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove + // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a + // subset of shards. See SERVER-44039 for details. + // 2.2) A chunk gets migrated to a new shard that doesn't have any chunks. auto chunkMigratedNewShardMatch = BSON("op" << "n" @@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is // set - exempt all other operations and commands with that tag. Include the resume token, if // resuming, so we can verify it was still present in the oplog. - return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) + return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive) << BSON(OR(opMatch, commandAndApplyOpsMatch)))); } @@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression } } + // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'. if (auto startAtOperationTime = spec.getStartAtOperationTime()) { uassert(40674, "Only one type of resume option is allowed, but multiple were found.", @@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom); } - // There might not be a starting point if we're on mongos, otherwise we should either have a - // 'resumeAfter' starting point, or should start from the latest majority committed operation. + // We can only run on a replica set, or through mongoS. Confirm that this is the case. auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); uassert( 40573, @@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression expCtx->inMongos || (replCoord && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet)); - if (!startFrom && !expCtx->inMongos) { - startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp(); + + // If we do not have an explicit starting point, we should start from the latest majority + // committed operation. If we are on mongoS and do not have a starting point, set it to the + // current clusterTime so that all shards start in sync. We always start one tick beyond the + // most recent operation, to ensure that the stream does not return it. + if (!startFrom) { + const auto currentTime = !expCtx->inMongos + ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} + : LogicalClock::get(expCtx->opCtx)->getClusterTime(); + startFrom = currentTime.addTicks(1).asTimestamp(); } - if (startFrom) { - const bool startFromInclusive = (resumeStage != nullptr); - stages.push_back(DocumentSourceOplogMatch::create( - DocumentSourceChangeStream::buildMatchFilter( - expCtx, *startFrom, startFromInclusive, showMigrationEvents), - expCtx)); - - // If we haven't already populated the initial PBRT, then we are starting from a specific - // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. - if (expCtx->initialPostBatchResumeToken.isEmpty()) { - Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)}; - expCtx->initialPostBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson(); - } + // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies + // upon the fact that it is always the first stage in the pipeline. + stages.push_back(DocumentSourceOplogMatch::create( + DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents), + expCtx)); + + // If we haven't already populated the initial PBRT, then we are starting from a specific + // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. + if (expCtx->initialPostBatchResumeToken.isEmpty()) { + expCtx->initialPostBatchResumeToken = + ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson(); } // Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage. @@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( (expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS())); // Prevent $changeStream from running on internal databases. A stream may run against the - // 'admin' database iff 'allChangesForCluster' is true. + // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config' + // database iff 'allowToRunOnConfigDB' is true. + const bool isNotBannedInternalDB = + !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db() << " database", - expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() - : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB())); + expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB); // Prevent $changeStream from running on internal collections in any database. uassert(ErrorCodes::InvalidNamespace, diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 182a9373e12..d3492b0c319 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -166,7 +166,6 @@ public: */ static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, - bool startFromInclusive, bool showMigrationEvents); /** diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 93a92c25173..410e5ab9f15 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -109,3 +109,11 @@ structs: deletes may appear that do not reflect actual deletions or insertions of data. Instead they reflect this data moving from one shard to another. + allowToRunOnConfigDB: + cpp_name: allowToRunOnConfigDB + type: bool + default: false + description: A flag indicating whether the change stream may be opened on the + 'config' database, which is usually banned. This flag is used + internally to allow mongoS to open a stream on 'config.shards', in + order to monitor for the addition of new shards to the cluster.
\ No newline at end of file diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index a147e5271c3..9d86e364189 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -362,14 +362,11 @@ Value DocumentSourceChangeStreamTransform::serialize( changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAfterFieldName].missing()) { MutableDocument newChangeStreamOptions(changeStreamOptions); - // Use the current cluster time plus 1 tick since the oplog query will include all - // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In - // particular, avoid including the last operation that went through mongos in an attempt to - // match the behavior of a replica set more closely. - auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime(); - clusterTime.addTicks(1); - newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = - Value(clusterTime.asTimestamp()); + // Configure the serialized $changeStream to start from the initial high-watermark + // postBatchResumeToken which we generated while parsing the $changeStream pipeline. + invariant(!pExpCtx->initialPostBatchResumeToken.isEmpty()); + newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = + Value(pExpCtx->initialPostBatchResumeToken); changeStreamOptions = newChangeStreamOptions.freeze(); } return Value(Document{{getSourceName(), changeStreamOptions}}); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index a469b6d9807..f451d139d21 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -36,13 +36,17 @@ #include "mongo/client/connpool.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/curop.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/query/find_common.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/s/query/cluster_aggregation_planner.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -109,6 +113,33 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } +/** + * Open a $changeStream cursor on the 'config.shards' collection to watch for new shards. + */ +RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Timestamp startMonitoringAtTime) { + const auto& configShard = Grid::get(expCtx->opCtx)->shardRegistry()->getConfigShard(); + // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}} + AggregationRequest aggReq( + ShardType::ConfigNS, + {BSON(DocumentSourceChangeStream::kStageName + << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName + << startMonitoringAtTime + << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); + aggReq.setFromMongos(true); + aggReq.setNeedsMerge(true); + aggReq.setBatchSize(0); + auto configCursor = + establishCursors(expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + aggReq.getNamespaceString(), + ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, + {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}}, + false); + invariant(configCursor.size() == 1); + return std::move(*configCursor.begin()); +} + Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) { // The idempotent retry policy will retry even for writeConcern failures, so only set it if the // pipeline does not support writeConcern. @@ -575,13 +606,17 @@ DispatchShardPipelineResults dispatchShardPipeline( pipeline.get(), expCtx->collation); - // In order for a $changeStream to work reliably, we need the shard registry to be at least as - // current as the logical time at which the pipeline was serialized to 'targetedCommand' above. - // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines - // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines - // may not have been forced to split if there was only one shard in the cluster when the command - // began execution. If a shard was added since the earlier targeting logic ran, then refreshing - // here may cause us to illegally target an unsplit pipeline to more than one shard. + // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the + // config server in order to monitor for new shards. To guarantee that we do not miss any + // shards, we must ensure that the list of shards to which we initially dispatch the pipeline is + // at least as current as the logical time at which the stream begins scanning for new shards. + // We therefore set 'shardRegistryReloadTime' to the current clusterTime and then hard-reload + // the shard registry. We don't refresh for other pipelines that must run on all shards (e.g. + // $currentOp) because, unlike $changeStream, those pipelines may not have been forced to split + // if there was only one shard in the cluster when the command began execution. If a shard was + // added since the earlier targeting logic ran, then refreshing here may cause us to illegally + // target an unsplit pipeline to more than one shard. + auto shardRegistryReloadTime = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); if (hasChangeStream) { auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { @@ -636,12 +671,19 @@ DispatchShardPipelineResults dispatchShardPipeline( invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() << ") is not a multiple of producers (" << shardIds.size() << ")"); + + // For $changeStream, we must open an extra cursor on the 'config.shards' collection, so + // that we can monitor for the addition of new shards inline with real events. + if (hasChangeStream && expCtx->ns.db() != ShardType::ConfigNS.db()) { + cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime)); + } } // Convert remote cursors into a vector of "owned" cursors. std::vector<OwnedRemoteCursor> ownedCursors; for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns)); + auto cursorNss = cursor.getCursorResponse().getNSS(); + ownedCursors.emplace_back(opCtx, std::move(cursor), std::move(cursorNss)); } // Record the number of shards involved in the aggregation. If we are required to merge on diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index bc89f6aa19a..ae6aaf092b4 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -396,7 +396,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { } executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx); + remote.getTargetHost(), remote.cursorNss.db().toString(), cmdObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) { diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp index d94a99518b7..4ae4318c997 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/async_results_merger_params_gen.h" @@ -40,10 +41,20 @@ namespace mongo { namespace { -// Returns true if the change stream document has an 'operationType' of 'newShardDetected'. -bool needsUpdate(const Document& childResult) { - return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() == - DocumentSourceChangeStream::kNewShardDetectedOpType; +// Returns true if the change stream document is an event in 'config.shards'. +bool isShardConfigEvent(const Document& eventDoc) { + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event + // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where + // a change stream is targeted to a subset of shards. See SERVER-44039 for details. + if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() == + DocumentSourceChangeStream::kNewShardDetectedOpType) { + return true; + } + auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField]; + return nsObj.getType() == BSONType::Object && + nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() && + nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll(); } } // namespace @@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( : DocumentSource(kStageName, expCtx), _executor(std::move(executor)), _mergeCursors(mergeCursors), - _shardsWithCursors(std::move(shardsWithCursors)), + _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()), _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { auto childResult = pSource->getNext(); - while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { - addNewShardCursors(childResult.getDocument()); + // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard. + while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) { + auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField]; + if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) { + addNewShardCursors(childResult.getDocument()); + } + // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType. childResult = pSource->getNext(); } return childResult; @@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( const Document& newShardDetectedObj) { - auto* opCtx = pExpCtx->opCtx; // Reload the shard registry. We need to ensure a reload initiated after calling this method - // caused the reload, otherwise we aren't guaranteed to get all the new shards. - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { + // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload. + auto* opCtx = pExpCtx->opCtx; + if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) { // A 'false' return from shardRegistry.reload() means a reload was already in progress and // it completed before reload() returned. So another reload(), regardless of return value, // will ensure a reload started after the first call to reload(). - shardRegistry->reload(opCtx); + Grid::get(opCtx)->shardRegistry()->reload(opCtx); } - std::vector<ShardId> shardIds, newShardIds; - shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end()); - std::sort(shardIds.begin(), shardIds.end()); - std::set_difference(shardIds.begin(), - shardIds.end(), - _shardsWithCursors.begin(), - _shardsWithCursors.end(), - std::back_inserter(newShardIds)); + // Parse the new shard's information from the document inserted into 'config.shards'. + auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField]; + auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson())); - auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( - _cmdToRunOnNewShards, - newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument()); - std::vector<std::pair<ShardId, BSONObj>> requests; - for (const auto& shardId : newShardIds) { - requests.emplace_back(shardId, cmdObj); - _shardsWithCursors.push_back(shardId); + // Make sure we are not attempting to open a cursor on a shard that already has one. + if (!_shardsWithCursors.insert(newShard.getName()).second) { + return {}; } + + // We must start the new cursor from the moment at which the shard became visible. + const auto newShardAddedTime = LogicalTime{ + newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()}; + auto resumeTokenForNewShard = + ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( + _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument()); + const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, _executor, pExpCtx->ns, ReadPreferenceSetting::get(opCtx), - requests, + {{newShard.getName(), cmdObj}}, allowPartialResults); } diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 0b41fde92d1..ff76d2ce90e 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -102,7 +102,7 @@ private: std::shared_ptr<executor::TaskExecutor> _executor; boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors; - std::vector<ShardId> _shardsWithCursors; + std::set<ShardId> _shardsWithCursors; BSONObj _cmdToRunOnNewShards; }; } // namespace mongo |