diff options
author | Misha Tyulenev <misha.tyulenev@mongodb.com> | 2020-09-17 00:07:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-17 01:25:59 +0000 |
commit | 7c83537305f9d0c626fefa82c6acbd9e5fd95222 (patch) | |
tree | b46aac80d9053fc780fb20a1a6df92ec56f9f1a8 /src/mongo/db | |
parent | 96760ea1c4e33077ad456bf42d90c48cbaa84d6a (diff) | |
download | mongo-7c83537305f9d0c626fefa82c6acbd9e5fd95222.tar.gz |
SERVER-49785 aggragation pipeline for collection bulk loader for resharding
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 81 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util_test.cpp | 115 |
3 files changed, 204 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 837bfc5c898..742b7c93236 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -531,6 +531,87 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard return Pipeline::create(std::move(stages), expCtx); } +std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ShardKeyPattern& newShardKeyPattern, + const NamespaceString& tempNss, + const ShardId& recipientShard) { + std::list<boost::intrusive_ptr<DocumentSource>> stages; + + BSONObj replaceWithBSON = BSON("$replaceWith" << BSON("original" + << "$$ROOT")); + stages.emplace_back( + DocumentSourceReplaceRoot::createFromBson(replaceWithBSON.firstElement(), expCtx)); + + invariant(tempNss.isTemporaryReshardingCollection()); + std::string cacheChunksColl = "cache.chunks." + tempNss.toString(); + BSONObjBuilder lookupBuilder; + lookupBuilder.append("from", + BSON("db" + << "config" + << "coll" << cacheChunksColl)); + { + BSONObjBuilder letBuilder(lookupBuilder.subobjStart("let")); + { + BSONArrayBuilder skVarBuilder(letBuilder.subarrayStart("sk")); + for (auto&& field : newShardKeyPattern.toBSON()) { + skVarBuilder.append("$original." + field.fieldNameStringData()); + } + } + } + BSONArrayBuilder lookupPipelineBuilder(lookupBuilder.subarrayStart("pipeline")); + lookupPipelineBuilder.append( + BSON("$match" << BSON( + "$expr" << BSON("$eq" << BSON_ARRAY(recipientShard.toString() << "$shard"))))); + lookupPipelineBuilder.append(BSON( + "$match" << BSON( + "$expr" << BSON( + "$let" << BSON( + "vars" << BSON("min" << BSON("$map" << BSON("input" << BSON("$objectToArray" + << "$_id") + << "in" + << "$$this.v")) + << "max" + << BSON("$map" << BSON("input" << BSON("$objectToArray" + << "$max") + << "in" + << "$$this.v"))) + << "in" + << BSON( + "$and" << BSON_ARRAY( + BSON("$gte" << BSON_ARRAY("$$sk" + << "$$min")) + << BSON("$cond" << BSON( + "if" + << BSON("$allElementsTrue" << BSON_ARRAY(BSON( + "$map" + << BSON("input" + << "$$max" + << "in" + << BSON("$eq" << BSON_ARRAY( + BSON("$type" + << "$$this") + << "maxKey")))))) + << "then" + << BSON("$lte" << BSON_ARRAY("$$sk" + << "$$max")) + << "else" + << BSON("$lt" << BSON_ARRAY("$$sk" + << "$$max"))))))))))); + + lookupPipelineBuilder.done(); + lookupBuilder.append("as", "intersectingChunk"); + BSONObj lookupBSON(BSON("" << lookupBuilder.obj())); + stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx)); + stages.emplace_back(DocumentSourceMatch::create( + BSON("intersectingChunk" << BSON("$ne" << BSONArray())), expCtx)); + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(BSON("$replaceWith" + << "$original") + .firstElement(), + expCtx)); + return Pipeline::create(std::move(stages), expCtx); +} + boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, BSONObj fullDocument) { diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 235d6474ff6..0837037ef10 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -147,5 +147,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, BSONObj fullDocument); +/** + * Creates pipeline for filtering collection data matching the recipient shard. + */ +std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ShardKeyPattern& newShardKeyPattern, + const NamespaceString& sourceNss, + const ShardId& recipientShard); } // namespace mongo diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp index 18b2e37a984..1fd73c11a4f 100644 --- a/src/mongo/db/s/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding_util_test.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/json.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/expression_context_for_test.h" @@ -56,6 +57,7 @@ class MockMongoInterface final : public StubMongoProcessInterface { public: MockMongoInterface(std::deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( Pipeline* ownedPipeline, bool allowTargetingShards = true) final { std::unique_ptr<Pipeline, PipelineDeleter> pipeline( @@ -1161,5 +1163,118 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) { ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions)); } +class ReshardingCollectionCloneTest : public AggregationContextFixture { +protected: + const NamespaceString& sourceNss() { + return _sourceNss; + } + + boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext( + NamespaceString sourceNss) { + _sourceNss = sourceNss; + NamespaceString foreignNss("config.cache.chunks." + sourceNss.toString()); + boost::intrusive_ptr<ExpressionContextForTest> expCtx( + new ExpressionContextForTest(getOpCtx(), sourceNss)); + expCtx->setResolvedNamespace(sourceNss, {sourceNss, {}}); + expCtx->setResolvedNamespace(foreignNss, {foreignNss, {}}); + return expCtx; + } + + + std::deque<DocumentSource::GetNextResult> makeForeignData(const ShardKeyPattern& pattern) { + const std::initializer_list<const char*> data{ + "{_id: { x : { $minKey : 1 } }, max: { x : 0.0 }, shard: 'shard1' }", + "{_id: { x : 0.0 }, max: { x : { $maxKey : 1 } }, shard: 'shard2' }"}; + std::deque<DocumentSource::GetNextResult> results; + for (auto&& json : data) { + results.emplace_back(Document(fromjson(json))); + } + return results; + } + + std::deque<DocumentSource::GetNextResult> makeSourceData(const ShardKeyPattern& pattern) { + const std::initializer_list<const char*> data{ + "{_id: 1, x: { $minKey: 1} }", + "{_id: 2, x: -0.001}", + "{_id: 3, x: NumberLong(0)}", + "{_id: 4, x: 0.0}", + "{_id: 5, x: 0.001}", + "{_id: 6, x: { $maxKey: 1} }", + }; + std::deque<DocumentSource::GetNextResult> results; + for (auto&& json : data) { + results.emplace_back(Document(fromjson(json))); + } + return results; + } + +private: + NamespaceString _sourceNss; +}; + +TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMinKey) { + NamespaceString fromNs("test", "system.resharding.coll"); + ShardKeyPattern pattern(BSON("x" << 1)); + + auto expCtx = createExpressionContext(fromNs); + + auto foreignData = makeForeignData(pattern); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); + + auto sourceData = makeSourceData(pattern); + auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); + + auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard1")); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT(next); + BSONObj val = fromjson("{_id: 1, x: {$minKey : 1}}"); + ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMaxKey) { + NamespaceString fromNs("test", "system.resharding.coll"); + ShardKeyPattern pattern(BSON("x" << 1)); + + auto expCtx = createExpressionContext(fromNs); + + auto foreignData = makeForeignData(pattern); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData)); + + auto sourceData = makeSourceData(pattern); + auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx); + + auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2")); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT(next); + BSONObj val = fromjson("{_id: 3, x: NumberLong(0)}"); + ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson()); + + next = pipeline->getNext(); + ASSERT(next); + val = fromjson("{_id: 6, x: {$maxKey: 1}}"); + ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson()); + + + ASSERT(!pipeline->getNext()); +} + } // namespace } // namespace mongo |