diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-10-17 20:26:03 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-17 20:26:03 +0000 |
commit | 97cc7b5838db4ef13ede3149c44bceca8f5c2977 (patch) | |
tree | 38d9b78346777033545f99048f947559a67a9ab1 /src/mongo/s/query/document_source_update_on_add_shard.cpp | |
parent | 1daf063435aa3f748840c476f8fa3bd13d7d2a68 (diff) | |
download | mongo-97cc7b5838db4ef13ede3149c44bceca8f5c2977.tar.gz |
SERVER-42723 New shard with new database can be ignored by change streams
Diffstat (limited to 'src/mongo/s/query/document_source_update_on_add_shard.cpp')
-rw-r--r-- | src/mongo/s/query/document_source_update_on_add_shard.cpp | 72 |
1 files changed, 43 insertions, 29 deletions
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp index d94a99518b7..4ae4318c997 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/async_results_merger_params_gen.h" @@ -40,10 +41,20 @@ namespace mongo { namespace { -// Returns true if the change stream document has an 'operationType' of 'newShardDetected'. -bool needsUpdate(const Document& childResult) { - return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() == - DocumentSourceChangeStream::kNewShardDetectedOpType; +// Returns true if the change stream document is an event in 'config.shards'. +bool isShardConfigEvent(const Document& eventDoc) { + // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility + // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event + // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where + // a change stream is targeted to a subset of shards. See SERVER-44039 for details. + if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() == + DocumentSourceChangeStream::kNewShardDetectedOpType) { + return true; + } + auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField]; + return nsObj.getType() == BSONType::Object && + nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() && + nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll(); } } // namespace @@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( : DocumentSource(kStageName, expCtx), _executor(std::move(executor)), _mergeCursors(mergeCursors), - _shardsWithCursors(std::move(shardsWithCursors)), + _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()), _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { auto childResult = pSource->getNext(); - while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { - addNewShardCursors(childResult.getDocument()); + // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard. + while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) { + auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField]; + if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) { + addNewShardCursors(childResult.getDocument()); + } + // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType. childResult = pSource->getNext(); } return childResult; @@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards( const Document& newShardDetectedObj) { - auto* opCtx = pExpCtx->opCtx; // Reload the shard registry. We need to ensure a reload initiated after calling this method - // caused the reload, otherwise we aren't guaranteed to get all the new shards. - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { + // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload. + auto* opCtx = pExpCtx->opCtx; + if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) { // A 'false' return from shardRegistry.reload() means a reload was already in progress and // it completed before reload() returned. So another reload(), regardless of return value, // will ensure a reload started after the first call to reload(). - shardRegistry->reload(opCtx); + Grid::get(opCtx)->shardRegistry()->reload(opCtx); } - std::vector<ShardId> shardIds, newShardIds; - shardRegistry->getAllShardIdsNoReload(&shardIds); - std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end()); - std::sort(shardIds.begin(), shardIds.end()); - std::set_difference(shardIds.begin(), - shardIds.end(), - _shardsWithCursors.begin(), - _shardsWithCursors.end(), - std::back_inserter(newShardIds)); + // Parse the new shard's information from the document inserted into 'config.shards'. + auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField]; + auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson())); - auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( - _cmdToRunOnNewShards, - newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument()); - std::vector<std::pair<ShardId, BSONObj>> requests; - for (const auto& shardId : newShardIds) { - requests.emplace_back(shardId, cmdObj); - _shardsWithCursors.push_back(shardId); + // Make sure we are not attempting to open a cursor on a shard that already has one. + if (!_shardsWithCursors.insert(newShard.getName()).second) { + return {}; } + + // We must start the new cursor from the moment at which the shard became visible. + const auto newShardAddedTime = LogicalTime{ + newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()}; + auto resumeTokenForNewShard = + ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand( + _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument()); + const bool allowPartialResults = false; // partial results are not allowed return establishCursors(opCtx, _executor, pExpCtx->ns, ReadPreferenceSetting::get(opCtx), - requests, + {{newShard.getName(), cmdObj}}, allowPartialResults); } |