diff options
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner.cpp | 136 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp | 247 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util_test.cpp | 236 |
7 files changed, 350 insertions, 375 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d259e971d6f..54f07474872 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -476,6 +476,7 @@ env.CppUnitTest( 'resharding_collection_test.cpp', 'resharding_destined_recipient_test.cpp', 'resharding_txn_cloner_test.cpp', + 'resharding/resharding_collection_cloner_test.cpp', 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', @@ -500,6 +501,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/keys_collection_client_direct', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/pipeline/document_source_mock', '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_interface_local', diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 699df230b40..015f422f8bf 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -35,12 +35,18 @@ #include <utility> +#include "mongo/bson/json.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" @@ -66,27 +72,104 @@ ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardK _atClusterTime(atClusterTime), _outputNss(std::move(outputNss)) {} -std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_makePipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ShardId& recipientShard, - Timestamp atClusterTime, - const NamespaceString& outputNss) { +std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipeline( + OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface) { + using Doc = Document; + using Arr = std::vector<Value>; + using V = Value; - std::vector<BSONObj> serializedPipeline = - createAggForCollectionCloning(expCtx, _newShardKeyPattern, outputNss, recipientShard) - ->serializeToBson(); + // Assume that the input collection isn't a view. The collectionUUID parameter to + // the aggregate would enforce this anyway. + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}}; - AggregationRequest request(_sourceNss, std::move(serializedPipeline)); + // Assume that the config.cache.chunks collection isn't a view either. + auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + auto tempCacheChunksNss = + NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); + resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}}; + + auto expCtx = make_intrusive<ExpressionContext>(opCtx, + boost::none, /* explain */ + false, /* fromMongos */ + false, /* needsMerge */ + false, /* allowDiskUse */ + false, /* bypassDocumentValidation */ + false, /* isMapReduceCommand */ + _sourceNss, + boost::none, /* runtimeConstants */ + nullptr, /* collator */ + std::move(mongoProcessInterface), + std::move(resolvedNamespaces), + _sourceUUID); + + Pipeline::SourceContainer stages; + + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( + fromjson("{$replaceWith: {original: '$$ROOT'}}").firstElement(), expCtx)); + + Arr extractShardKeyExpr; + for (auto&& field : _newShardKeyPattern.toBSON()) { + if (ShardKeyPattern::isHashedPatternEl(field)) { + extractShardKeyExpr.emplace_back( + Doc{{"$toHashedIndexKey", "$original." + field.fieldNameStringData()}}); + } else { + extractShardKeyExpr.emplace_back("$original." + field.fieldNameStringData()); + } + } + + stages.emplace_back(DocumentSourceLookUp::createFromBson( + Doc{{"$lookup", + Doc{{"from", + Doc{{"db", tempCacheChunksNss.db()}, {"coll", tempCacheChunksNss.coll()}}}, + {"let", Doc{{"sk", extractShardKeyExpr}}}, + {"pipeline", + Arr{V{Doc{{"$match", + Doc{{"$expr", + Doc{{"$eq", + Arr{V{"$shard"_sd}, V{_recipientShard.toString()}}}}}}}}}, + V{Doc(fromjson("{$match: {$expr: {$let: {\ + vars: {\ + min: {$map: {input: {$objectToArray: '$_id'}, in: '$$this.v'}},\ + max: {$map: {input: {$objectToArray: '$max'}, in: '$$this.v'}}\ + },\ + in: {$and: [\ + {$gte: ['$$sk', '$$min']},\ + {$cond: {\ + if: {$allElementsTrue: [{$map: {\ + input: '$$max',\ + in: {$eq: [{$type: '$$this'}, 'maxKey']}\ + }}]},\ + then: {$lte: ['$$sk', '$$max']},\ + else: {$lt: ['$$sk', '$$max']}\ + }}\ + ]}\ + }}}}"))}}}, + {"as", "intersectingChunk"_sd}}}} + .toBson() + .firstElement(), + expCtx)); + + stages.emplace_back( + DocumentSourceMatch::create(fromjson("{intersectingChunk: {$ne: []}}"), expCtx)); + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( + fromjson("{$replaceWith: '$original'}").firstElement(), expCtx)); + return Pipeline::create(std::move(stages), expCtx); +} + +std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest( + const Pipeline& pipeline) { + AggregationRequest request(_sourceNss, pipeline.serializeToBson()); request.setCollectionUUID(_sourceUUID); request.setHint(BSON("_id" << 1)); request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName << repl::readConcernLevels::kSnapshotName << repl::ReadConcernArgs::kAtClusterTimeFieldName - << atClusterTime)); + << _atClusterTime)); // TODO SERVER-52692: Set read preference to nearest. // request.setUnwrappedReadPref(); - return sharded_agg_helpers::targetShardsAndAddMergeCursors(std::move(expCtx), + return sharded_agg_helpers::targetShardsAndAddMergeCursors(pipeline.getContext(), std::move(request)); } @@ -208,34 +291,9 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( return ExecutorFuture(executor) .then([this, serviceContext] { return _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) { - // Assume that the input collection isn't a view. The collectionUUID parameter to - // the aggregate would enforce this anyway. - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}}; - - // Assume that the config.cache.chunks collection isn't a view either. - auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - auto tempCacheChunksNss = - NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); - resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, - std::vector<BSONObj>{}}; - - auto expCtx = - make_intrusive<ExpressionContext>(opCtx, - boost::none, /* explain */ - false, /* fromMongos */ - false, /* needsMerge */ - false, /* allowDiskUse */ - false, /* bypassDocumentValidation */ - false, /* isMapReduceCommand */ - _sourceNss, - boost::none, /* runtimeConstants */ - nullptr, /* collator */ - MongoProcessInterface::create(opCtx), - std::move(resolvedNamespaces), - _sourceUUID); - - auto pipeline = _makePipeline(expCtx, _recipientShard, _atClusterTime, _outputNss); + auto pipeline = _targetAggregationRequest( + *makePipeline(opCtx, MongoProcessInterface::create(opCtx))); + pipeline->detachFromOperationContext(); return pipeline; }); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 8b9b467a718..56a70c6d3db 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -60,17 +60,16 @@ public: Timestamp atClusterTime, NamespaceString outputNss); + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( + OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface); + ExecutorFuture<void> run(ServiceContext* serviceContext, std::shared_ptr<executor::TaskExecutor>); private: static constexpr StringData kClientName = "ReshardingCollectionCloner"_sd; - std::unique_ptr<Pipeline, PipelineDeleter> _makePipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ShardId& recipientShard, - Timestamp atClusterTime, - const NamespaceString& outputNss); + std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(const Pipeline& pipeline); std::vector<InsertStatement> _fillBatch(Pipeline& pipeline); void _insertBatch(OperationContext* opCtx, std::vector<InsertStatement>& batch); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp new file mode 100644 index 00000000000..eefefd0fa11 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp @@ -0,0 +1,247 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/json.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/hasher.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/s/resharding/resharding_collection_cloner.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using Doc = Document; +using Arr = std::vector<Value>; +using V = Value; + +/** + * Mock interface to allow specifying mock results for the 'from' collection of the $lookup stage. + */ +class MockMongoInterface final : public StubMongoProcessInterface { +public: + MockMongoInterface(std::deque<DocumentSource::GetNextResult> mockResults) + : _mockResults(std::move(mockResults)) {} + + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + Pipeline* ownedPipeline, bool allowTargetingShards = true) final { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline( + ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx)); + + pipeline->addInitialSource( + DocumentSourceMock::createForTest(_mockResults, pipeline->getContext())); + return pipeline; + } + +private: + std::deque<DocumentSource::GetNextResult> _mockResults; +}; + +class ReshardingCollectionClonerTest : public ServiceContextTest { +protected: + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( + ShardKeyPattern newShardKeyPattern, + ShardId recipientShard, + std::deque<DocumentSource::GetNextResult> sourceCollectionData, + std::deque<DocumentSource::GetNextResult> configCacheChunksData) { + auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + ReshardingCollectionCloner cloner(std::move(newShardKeyPattern), + _sourceNss, + _sourceUUID, + std::move(recipientShard), + Timestamp(1, 0), /* dummy value */ + std::move(tempNss)); + + auto pipeline = cloner.makePipeline( + _opCtx.get(), std::make_shared<MockMongoInterface>(std::move(configCacheChunksData))); + + pipeline->addInitialSource(DocumentSourceMock::createForTest( + std::move(sourceCollectionData), pipeline->getContext())); + + return pipeline; + } + + template <class T> + auto getHashedElementValue(T value) { + return BSONElementHasher::hash64(BSON("" << value).firstElement(), + BSONElementHasher::DEFAULT_HASH_SEED); + } + +private: + const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd); + const CollectionUUID _sourceUUID = UUID::gen(); + + ServiceContext::UniqueOperationContext _opCtx = makeOperationContext(); +}; + +TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) { + auto pipeline = + makePipeline(ShardKeyPattern(fromjson("{x: 1}")), + ShardId("shard1"), + {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -0.001}")), + Doc(fromjson("{_id: 3, x: NumberLong(0)}")), + Doc(fromjson("{_id: 4, x: 0.0}")), + Doc(fromjson("{_id: 5, x: 0.001}")), + Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}, + {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'shard1'}")), + Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))}); + + auto next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson()); + + ASSERT_FALSE(pipeline->getNext()); +} + +TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) { + auto pipeline = + makePipeline(ShardKeyPattern(fromjson("{x: 1}")), + ShardId("shard2"), + {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -0.001}")), + Doc(fromjson("{_id: 3, x: NumberLong(0)}")), + Doc(fromjson("{_id: 4, x: 0.0}")), + Doc(fromjson("{_id: 5, x: 0.001}")), + Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}, + {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0}, shard: 'shard1'}")), + Doc(fromjson("{_id: {x: 0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))}); + + auto next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY), next->toBson()); + + ASSERT_FALSE(pipeline->getNext()); +} + +TEST_F(ReshardingCollectionClonerTest, HashedShardKey) { + auto pipeline = makePipeline( + ShardKeyPattern(fromjson("{x: 'hashed'}")), + ShardId("shard2"), + {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -1}")), + Doc(fromjson("{_id: 3, x: -0.123}")), + Doc(fromjson("{_id: 4, x: 0}")), + Doc(fromjson("{_id: 5, x: NumberLong(0)}")), + Doc(fromjson("{_id: 6, x: 0.123}")), + Doc(fromjson("{_id: 7, x: 1}")), + Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, + // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: + // - [MinKey, hash(0)) : shard1 + // - [hash(0), hash(0) + 1) : shard2 + // - [hash(0) + 1, MaxKey] : shard3 + {Doc{{"_id", Doc{{"x", V(MINKEY)}}}, + {"max", Doc{{"x", getHashedElementValue(0)}}}, + {"shard", "shard1"_sd}}, + Doc{{"_id", Doc{{"x", getHashedElementValue(0)}}}, + {"max", Doc{{"x", getHashedElementValue(0) + 1}}}, + {"shard", "shard2"_sd}}, + Doc{{"_id", Doc{{"x", getHashedElementValue(0) + 1}}}, + {"max", Doc{{"x", V(MAXKEY)}}}, + {"shard", "shard3"_sd}}}); + + auto next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123), next->toBson()); + + ASSERT_FALSE(pipeline->getNext()); +} + +TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) { + auto pipeline = makePipeline( + ShardKeyPattern(fromjson("{x: 'hashed', y: 1}")), + ShardId("shard2"), + {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -1}")), + Doc(fromjson("{_id: 3, x: -0.123, y: -1}")), + Doc(fromjson("{_id: 4, x: 0, y: 0}")), + Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")), + Doc(fromjson("{_id: 6, x: 0.123}")), + Doc(fromjson("{_id: 7, x: 1}")), + Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, + // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: + // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1 + // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2 + // - [{x: hash(0), y: 1}, {x: MaxKey, y: MaxKey}] : shard3 + {Doc{{"_id", Doc{{"x", V(MINKEY)}, {"y", V(MINKEY)}}}, + {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}}, + {"shard", "shard1"_sd}}, + Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}}, + {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}}, + {"shard", "shard2"_sd}}, + Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}}, + {"max", Doc{{"x", V(MAXKEY)}, {"y", V(MAXKEY)}}}, + {"shard", "shard3"_sd}}}); + + auto next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "y" << 0), next->toBson()); + + ASSERT_FALSE(pipeline->getNext()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 3954023bd15..971b8726ba5 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -539,92 +539,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard return Pipeline::create(std::move(stages), expCtx); } -std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ShardKeyPattern& newShardKeyPattern, - const NamespaceString& tempNss, - const ShardId& recipientShard) { - std::list<boost::intrusive_ptr<DocumentSource>> stages; - - BSONObj replaceWithBSON = BSON("$replaceWith" << BSON("original" - << "$$ROOT")); - stages.emplace_back( - DocumentSourceReplaceRoot::createFromBson(replaceWithBSON.firstElement(), expCtx)); - - invariant(tempNss.isTemporaryReshardingCollection(), tempNss.ns()); - std::string cacheChunksColl = "cache.chunks." + tempNss.toString(); - BSONObjBuilder lookupBuilder; - lookupBuilder.append("from", - BSON("db" - << "config" - << "coll" << cacheChunksColl)); - { - BSONObjBuilder letBuilder(lookupBuilder.subobjStart("let")); - { - BSONArrayBuilder skVarBuilder(letBuilder.subarrayStart("sk")); - for (auto&& field : newShardKeyPattern.toBSON()) { - if (ShardKeyPattern::isHashedPatternEl(field)) { - skVarBuilder.append(BSON("$toHashedIndexKey" - << "$original." + field.fieldNameStringData())); - } else { - skVarBuilder.append("$original." + field.fieldNameStringData()); - } - } - } - } - BSONArrayBuilder lookupPipelineBuilder(lookupBuilder.subarrayStart("pipeline")); - lookupPipelineBuilder.append( - BSON("$match" << BSON( - "$expr" << BSON("$eq" << BSON_ARRAY(recipientShard.toString() << "$shard"))))); - lookupPipelineBuilder.append(BSON( - "$match" << BSON( - "$expr" << BSON( - "$let" << BSON( - "vars" << BSON("min" << BSON("$map" << BSON("input" << BSON("$objectToArray" - << "$_id") - << "in" - << "$$this.v")) - << "max" - << BSON("$map" << BSON("input" << BSON("$objectToArray" - << "$max") - << "in" - << "$$this.v"))) - << "in" - << BSON( - "$and" << BSON_ARRAY( - BSON("$gte" << BSON_ARRAY("$$sk" - << "$$min")) - << BSON("$cond" << BSON( - "if" - << BSON("$allElementsTrue" << BSON_ARRAY(BSON( - "$map" - << BSON("input" - << "$$max" - << "in" - << BSON("$eq" << BSON_ARRAY( - BSON("$type" - << "$$this") - << "maxKey")))))) - << "then" - << BSON("$lte" << BSON_ARRAY("$$sk" - << "$$max")) - << "else" - << BSON("$lt" << BSON_ARRAY("$$sk" - << "$$max"))))))))))); - - lookupPipelineBuilder.done(); - lookupBuilder.append("as", "intersectingChunk"); - BSONObj lookupBSON(BSON("" << lookupBuilder.obj())); - stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx)); - stages.emplace_back(DocumentSourceMatch::create( - BSON("intersectingChunk" << BSON("$ne" << BSONArray())), expCtx)); - stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(BSON("$replaceWith" - << "$original") - .firstElement(), - expCtx)); - return Pipeline::create(std::move(stages), expCtx); -} - namespace resharding { boost::optional<TypeCollectionDonorFields> getDonorFields(OperationContext* opCtx, diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 31ed606f1a8..46c33dd5c5f 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -193,15 +193,6 @@ boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, const BSONObj& fullDocument); /** - * Creates pipeline for filtering collection data matching the recipient shard. - */ -std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ShardKeyPattern& newShardKeyPattern, - const NamespaceString& tempNss, - const ShardId& recipientShard); - -/** * Sentinel oplog format: * { * op: "n", diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp index 93fefd8cb99..38b4dee5121 100644 --- a/src/mongo/db/s/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding_util_test.cpp @@ -1203,241 +1203,5 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) { ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions)); } -class ReshardingCollectionCloneTest : public AggregationContextFixture { -protected: - const NamespaceString& sourceNss() { - return _sourceNss; - } - - boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext( - NamespaceString sourceNss) { - _sourceNss = sourceNss; - NamespaceString foreignNss("config.cache.chunks." + sourceNss.toString()); - boost::intrusive_ptr<ExpressionContextForTest> expCtx( - new ExpressionContextForTest(getOpCtx(), sourceNss)); - expCtx->setResolvedNamespace(sourceNss, {sourceNss, {}}); - expCtx->setResolvedNamespace(foreignNss, {foreignNss, {}}); - return expCtx; - } - - - std::deque<DocumentSource::GetNextResult> makeForeignData(const ShardKeyPattern& pattern) { - const std::initializer_list<const char*> data{ - "{_id: { x : { $minKey : 1 } }, max: { x : 0.0 }, shard: 'shard1' }", - "{_id: { x : 0.0 }, max: { x : { $maxKey : 1 } }, shard: 'shard2' }"}; - std::deque<DocumentSource::GetNextResult> results; - for (auto&& json : data) { - results.emplace_back(Document(fromjson(json))); - } - return results; - } - - std::deque<DocumentSource::GetNextResult> makeForeignData(std::vector<BSONObj> data) { - std::deque<DocumentSource::GetNextResult> results; - for (auto&& obj : data) { - results.emplace_back(Document(obj)); - } - return results; - } - - std::deque<DocumentSource::GetNextResult> makeSourceData(const ShardKeyPattern& pattern) { - const std::initializer_list<const char*> data{ - "{_id: 1, x: { $minKey: 1} }", - "{_id: 2, x: -0.001}", - "{_id: 3, x: NumberLong(0)}", - "{_id: 4, x: 0.0}", - "{_id: 5, x: 0.001}", - "{_id: 6, x: { $maxKey: 1} }", - }; - std::deque<DocumentSource::GetNextResult> results; - for (auto&& json : data) { - results.emplace_back(Document(fromjson(json))); - } - return results; - } - - std::deque<DocumentSource::GetNextResult> makeSourceData(std::vector<BSONObj> data) { - std::deque<DocumentSource::GetNextResult> results; - for (auto&& obj : data) { - results.emplace_back(Document(obj)); - } - return results; - } - -private: - NamespaceString _sourceNss; -}; - -TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMinKey) { - NamespaceString fromNs("test", "system.resharding.coll"); - ShardKeyPattern pattern(BSON("x" << 1)); - - auto expCtx = createExpressionContext(fromNs); - - auto foreignData = makeForeignData(pattern); - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); - - auto sourceData = makeSourceData(pattern); - auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); - - auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard1")); - pipeline->addInitialSource(mockSource); - - auto next = pipeline->getNext(); - ASSERT(next); - BSONObj val = fromjson("{_id: 1, x: {$minKey : 1}}"); - ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson()); - - ASSERT(!pipeline->getNext()); -} - -TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMaxKey) { - NamespaceString fromNs("test", "system.resharding.coll"); - ShardKeyPattern pattern(BSON("x" << 1)); - - auto expCtx = createExpressionContext(fromNs); - - auto foreignData = makeForeignData(pattern); - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); - - auto sourceData = makeSourceData(pattern); - auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); - - auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2")); - pipeline->addInitialSource(mockSource); - - auto next = pipeline->getNext(); - ASSERT(next); - BSONObj val = fromjson("{_id: 3, x: NumberLong(0)}"); - ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - val = fromjson("{_id: 6, x: {$maxKey: 1}}"); - ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); - - - ASSERT(!pipeline->getNext()); -} - -template <class T> -auto getHashedElementValue(T value) { - return BSONElementHasher::hash64(BSON("" << value).firstElement(), - BSONElementHasher::DEFAULT_HASH_SEED); -} - -TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicHashedExactMatch) { - NamespaceString fromNs("test", "system.resharding.coll"); - ShardKeyPattern pattern(BSON("x" - << "hashed")); - - auto expCtx = createExpressionContext(fromNs); - - // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: - // - [MinKey, hash(0)) : shard1 - // - [hash(0), hash(0) + 1) : shard2 - // - [hash(0) + 1, MaxKey] : shard3 - auto foreignData = - makeForeignData({BSON("_id" << BSON("x" << MINKEY) << "max" - << BSON("x" << getHashedElementValue(0)) << "shard" - << "shard1"), - BSON("_id" << BSON("x" << getHashedElementValue(0)) << "max" - << BSON("x" << getHashedElementValue(0) + 1) << "shard" - << "shard2"), - BSON("_id" << BSON("x" << getHashedElementValue(0) + 1) << "max" - << BSON("x" << MAXKEY) << "shard" - << "shard3")}); - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); - - // Documents in a mocked sharded collection. - auto sourceData = makeSourceData({fromjson("{_id: 1, x: {$minKey: 1}}"), - fromjson("{_id: 2, x: -1}"), - fromjson("{_id: 3, x: -0.123}"), - fromjson("{_id: 4, x: 0}"), - fromjson("{_id: 5, x: NumberLong(0)}"), - fromjson("{_id: 6, x: 0.123}"), - fromjson("{_id: 7, x: 1}"), - fromjson("{_id: 8, x: {$maxKey: 1}}")}); - auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); - - auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2")); - pipeline->addInitialSource(mockSource); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 3, x: -0.123}"), next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 4, x: 0}"), next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 5, x: NumberLong(0)}"), next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 6, x: 0.123}"), next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - -TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicHashedExactMatchCompoundKey) { - NamespaceString fromNs("test", "system.resharding.coll"); - ShardKeyPattern pattern(BSON("x" - << "hashed" - << "y" << 1)); - - auto expCtx = createExpressionContext(fromNs); - - // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: - // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1 - // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2 - // - [{x: hash(0), y: 1}, {x: MaxKey, y: MaxKey}] : shard3 - auto foreignData = makeForeignData( - {BSON("_id" << BSON("x" << MINKEY << "y" << MINKEY) << "max" - << BSON("x" << getHashedElementValue(0) << "y" << 0) << "shard" - << "shard1"), - BSON("_id" << BSON("x" << getHashedElementValue(0) << "y" << 0) << "max" - << BSON("x" << (getHashedElementValue(0) + 0) << "y" << 1) << "shard" - << "shard2"), - BSON("_id" << BSON("x" << (getHashedElementValue(0) + 0) << "y" << 1) << "max" - << BSON("x" << MAXKEY << "y" << MAXKEY) << "shard" - << "shard3")}); - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); - - // Documents in a mocked sharded collection. - auto sourceData = makeSourceData({fromjson("{_id: 1, x: {$minKey: 1}}"), - fromjson("{_id: 2, x: -1}"), - fromjson("{_id: 3, x: -0.123, y: -1}"), - fromjson("{_id: 4, x: 0, y: 0}"), - fromjson("{_id: 5, x: NumberLong(0), y: 1}"), - fromjson("{_id: 6, x: 0.123}"), - fromjson("{_id: 7, x: 1}"), - fromjson("{_id: 8, x: {$maxKey: 1}}")}); - auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); - - auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2")); - pipeline->addInitialSource(mockSource); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 4, x: 0, y: 0}"), next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); -} - } // namespace } // namespace mongo |