diff options
Diffstat (limited to 'src/mongo/db')
5 files changed, 231 insertions, 195 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index eafed18ef19..30f4319bb4a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -60,6 +60,7 @@ env.Library( 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', 'recoverable_critical_section_service.cpp', + 'resharding/document_source_resharding_ownership_match.cpp', 'resharding/resharding_change_event_o2_field.idl', 'resharding/resharding_collection_cloner.cpp', 'resharding/resharding_coordinator_commit_monitor.cpp', diff --git a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp new file mode 100644 index 00000000000..76ad368f844 --- /dev/null +++ b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/document_source_resharding_ownership_match.h" + +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/s/resharding/common_types_gen.h" + +namespace mongo { + +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalReshardingOwnershipMatch, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceReshardingOwnershipMatch::createFromBson, + true); + +boost::intrusive_ptr<DocumentSourceReshardingOwnershipMatch> +DocumentSourceReshardingOwnershipMatch::create( + ShardId recipientShardId, + ShardKeyPattern reshardingKey, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceReshardingOwnershipMatch( + std::move(recipientShardId), std::move(reshardingKey), expCtx); +} + +boost::intrusive_ptr<DocumentSourceReshardingOwnershipMatch> +DocumentSourceReshardingOwnershipMatch::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(8423307, + str::stream() << "Argument to " << kStageName << " must be an object", + elem.type() == Object); + + auto parsed = DocumentSourceReshardingOwnershipMatchSpec::parse( + {"DocumentSourceReshardingOwnershipMatchSpec"}, elem.embeddedObject()); + + return new DocumentSourceReshardingOwnershipMatch( + parsed.getRecipientShardId(), ShardKeyPattern(parsed.getReshardingKey()), expCtx); +} + +DocumentSourceReshardingOwnershipMatch::DocumentSourceReshardingOwnershipMatch( + ShardId recipientShardId, + ShardKeyPattern reshardingKey, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(kStageName, expCtx), + _recipientShardId{std::move(recipientShardId)}, + _reshardingKey{std::move(reshardingKey)} {} + +StageConstraints DocumentSourceReshardingOwnershipMatch::constraints( + Pipeline::SplitState pipeState) const { + return StageConstraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kDenylist); +} + +Value DocumentSourceReshardingOwnershipMatch::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value{Document{{kStageName, + DocumentSourceReshardingOwnershipMatchSpec( + _recipientShardId, _reshardingKey.getKeyPattern()) + .toBSON()}}}; +} + +DepsTracker::State DocumentSourceReshardingOwnershipMatch::getDependencies( + DepsTracker* deps) const { + for (const auto& skElem : _reshardingKey.toBSON()) { + deps->fields.insert(skElem.fieldNameStringData().toString()); + } + + return DepsTracker::State::SEE_NEXT; +} + +DocumentSource::GetModPathsReturn DocumentSourceReshardingOwnershipMatch::getModifiedPaths() const { + // This stage does not modify or rename any paths. + return {DocumentSource::GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; +} + +DocumentSource::GetNextResult DocumentSourceReshardingOwnershipMatch::doGetNext() { + // TODO: Actually propagate the temporary resharding namespace from the recipient. + auto tempReshardingNss = constructTemporaryReshardingNss(pExpCtx->ns.db(), *pExpCtx->uuid); + + auto* catalogCache = Grid::get(pExpCtx->opCtx)->catalogCache(); + auto cm = catalogCache->getShardedCollectionRoutingInfo(pExpCtx->opCtx, tempReshardingNss); + + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + auto shardKey = + _reshardingKey.extractShardKeyFromDocThrows(nextInput.getDocument().toBson()); + + if (cm.keyBelongsToShard(shardKey, _recipientShardId)) { + return nextInput; + } + + // For performance reasons, a streaming stage must not keep references to documents across + // calls to getNext(). Such stages must retrieve a result from their child and then release + // it (or return it) before asking for another result. Failing to do so can result in extra + // work, since the Document/Value library must copy data on write when that data has a + // refcount above one. + nextInput.releaseDocument(); + } + + return nextInput; +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.h b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.h new file mode 100644 index 00000000000..9b978e5c931 --- /dev/null +++ b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/s/shard_id.h" +#include "mongo/s/shard_key_pattern.h" + +namespace mongo { + +/** + * This is a purpose-built stage to filter out documents which are 'owned' by this shard according + * to a given shardId and shard key. This stage was created to optimize performance of internal + * resharding pipelines which need to be able to answer this question very quickly. To do so, it + * re-uses pieces of sharding infrastructure rather than applying a MatchExpression. + */ +class DocumentSourceReshardingOwnershipMatch final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalReshardingOwnershipMatch"_sd; + + static boost::intrusive_ptr<DocumentSourceReshardingOwnershipMatch> create( + ShardId recipientShardId, + ShardKeyPattern reshardingKey, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + static boost::intrusive_ptr<DocumentSourceReshardingOwnershipMatch> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + DepsTracker::State getDependencies(DepsTracker* deps) const final; + + DocumentSource::GetModPathsReturn getModifiedPaths() const final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return boost::none; + } + + const char* getSourceName() const final { + return DocumentSourceReshardingOwnershipMatch::kStageName.rawData(); + } + +private: + DocumentSourceReshardingOwnershipMatch(ShardId recipientShardId, + ShardKeyPattern reshardingKey, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + DocumentSource::GetNextResult doGetNext() final; + + const ShardId _recipientShardId; + const ShardKeyPattern _reshardingKey; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 3b77a47c524..2af6caace49 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -44,11 +44,11 @@ #include "mongo/db/exec/document_value/document.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/pipeline/aggregation_request_helper.h" -#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_request_helper.h" +#include "mongo/db/s/resharding/document_source_resharding_ownership_match.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" @@ -149,63 +149,18 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel expCtx)); } - stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( - fromjson("{$replaceWith: {original: '$$ROOT'}}").firstElement(), expCtx)); - - Arr extractShardKeyExpr; - for (auto&& field : _newShardKeyPattern.toBSON()) { - if (ShardKeyPattern::isHashedPatternEl(field)) { - extractShardKeyExpr.emplace_back( - Doc{{"$toHashedIndexKey", "$original." + field.fieldNameStringData()}}); - } else { - extractShardKeyExpr.emplace_back("$original." + field.fieldNameStringData()); - } - } - - stages.emplace_back(DocumentSourceLookUp::createFromBson( - Doc{{"$lookup", - Doc{{"from", - Doc{{"db", tempCacheChunksNss.db()}, {"coll", tempCacheChunksNss.coll()}}}, - {"let", Doc{{"sk", extractShardKeyExpr}}}, - {"pipeline", - Arr{V{Doc{{"$match", - Doc{{"$expr", - Doc{{"$eq", - Arr{V{"$shard"_sd}, V{_recipientShard.toString()}}}}}}}}}, - V{Doc{fromjson("{$replaceWith: {\ - min: {$map: {input: {$objectToArray: '$_id'}, in: '$$this.v'}},\ - max: {$map: {input: {$objectToArray: '$max'}, in: '$$this.v'}}\ - }}")}}, - V{Doc{fromjson("{$addFields: {\ - isGlobalMax: {$allElementsTrue: [{$map: {\ - input: '$max',\ - in: {$eq: [{$type: '$$this'}, 'maxKey']}\ - }}]}\ - }}")}}, - V{Doc{fromjson("{$match: {$expr: {$and: [\ - {$gte: ['$$sk', '$min']},\ - {$or: [\ - {$lt: ['$$sk', '$max']},\ - '$isGlobalMax'\ - ]}\ - ]}}}")}}}}, - {"as", "intersectingChunk"_sd}}}} - .toBson() - .firstElement(), - expCtx)); - - stages.emplace_back( - DocumentSourceMatch::create(fromjson("{intersectingChunk: {$ne: []}}"), expCtx)); + stages.emplace_back(DocumentSourceReshardingOwnershipMatch::create( + _recipientShard, ShardKeyPattern{_newShardKeyPattern.getKeyPattern()}, expCtx)); // We use $arrayToObject to synthesize the $sortKeys needed by the AsyncResultsMerger to merge // the results from all of the donor shards by {_id: 1}. This expression wouldn't be correct if // the aggregation pipeline was using a non-"simple" collation. stages.emplace_back( DocumentSourceReplaceRoot::createFromBson(fromjson("{$replaceWith: {$mergeObjects: [\ - '$original',\ + '$$ROOT',\ {$arrayToObject: {$concatArrays: [[{\ k: {$literal: '$sortKey'},\ - v: ['$original._id']\ + v: ['$$ROOT._id']\ }]]}}\ ]}}") .firstElement(), diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp index d6b063c965d..f52a55fbb8f 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp @@ -129,150 +129,5 @@ private: std::unique_ptr<ReshardingMetrics> _metrics; }; -TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) { - auto pipeline = - makePipeline(ShardKeyPattern(fromjson("{x: 1}")), - ShardId("shard1"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -0.001}")), - Doc(fromjson("{_id: 3, x: NumberLong(0)}")), - Doc(fromjson("{_id: 4, x: 0.0}")), - Doc(fromjson("{_id: 5, x: 0.001}")), - Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}, - {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'shard1'}")), - Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))}); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)), - next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - -TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) { - auto pipeline = - makePipeline(ShardKeyPattern(fromjson("{x: 1}")), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -0.001}")), - Doc(fromjson("{_id: 3, x: NumberLong(0)}")), - Doc(fromjson("{_id: 4, x: 0.0}")), - Doc(fromjson("{_id: 5, x: 0.001}")), - Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}, - {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0}, shard: 'shard1'}")), - Doc(fromjson("{_id: {x: 0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))}); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)), - next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - -TEST_F(ReshardingCollectionClonerTest, HashedShardKey) { - auto pipeline = makePipeline( - ShardKeyPattern(fromjson("{x: 'hashed'}")), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -1}")), - Doc(fromjson("{_id: 3, x: -0.123}")), - Doc(fromjson("{_id: 4, x: 0}")), - Doc(fromjson("{_id: 5, x: NumberLong(0)}")), - Doc(fromjson("{_id: 6, x: 0.123}")), - Doc(fromjson("{_id: 7, x: 1}")), - Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, - // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: - // - [MinKey, hash(0)) : shard1 - // - [hash(0), hash(0) + 1) : shard2 - // - [hash(0) + 1, MaxKey] : shard3 - {Doc{{"_id", Doc{{"x", V(MINKEY)}}}, - {"max", Doc{{"x", getHashedElementValue(0)}}}, - {"shard", "shard1"_sd}}, - Doc{{"_id", Doc{{"x", getHashedElementValue(0)}}}, - {"max", Doc{{"x", getHashedElementValue(0) + 1}}}, - {"shard", "shard2"_sd}}, - Doc{{"_id", Doc{{"x", getHashedElementValue(0) + 1}}}, - {"max", Doc{{"x", V(MAXKEY)}}}, - {"shard", "shard3"_sd}}}); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)), - next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - -TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) { - auto pipeline = makePipeline( - ShardKeyPattern(fromjson("{x: 'hashed', y: 1}")), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -1}")), - Doc(fromjson("{_id: 3, x: -0.123, y: -1}")), - Doc(fromjson("{_id: 4, x: 0, y: 0}")), - Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")), - Doc(fromjson("{_id: 6, x: 0.123}")), - Doc(fromjson("{_id: 7, x: 1}")), - Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, - // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: - // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1 - // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2 - // - [{x: hash(0), y: 1}, {x: MaxKey, y: MaxKey}] : shard3 - {Doc{{"_id", Doc{{"x", V(MINKEY)}, {"y", V(MINKEY)}}}, - {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}}, - {"shard", "shard1"_sd}}, - Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}}, - {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}}, - {"shard", "shard2"_sd}}, - Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}}, - {"max", Doc{{"x", V(MAXKEY)}, {"y", V(MAXKEY)}}}, - {"shard", "shard3"_sd}}}); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ( - BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - } // namespace } // namespace mongo |