summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMisha Tyulenev <misha.tyulenev@mongodb.com>2020-09-17 00:07:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-17 01:25:59 +0000
commit7c83537305f9d0c626fefa82c6acbd9e5fd95222 (patch)
treeb46aac80d9053fc780fb20a1a6df92ec56f9f1a8 /src/mongo/db
parent96760ea1c4e33077ad456bf42d90c48cbaa84d6a (diff)
downloadmongo-7c83537305f9d0c626fefa82c6acbd9e5fd95222.tar.gz
SERVER-49785 aggragation pipeline for collection bulk loader for resharding
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/resharding_util.cpp81
-rw-r--r--src/mongo/db/s/resharding_util.h8
-rw-r--r--src/mongo/db/s/resharding_util_test.cpp115
3 files changed, 204 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 837bfc5c898..742b7c93236 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -531,6 +531,87 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
return Pipeline::create(std::move(stages), expCtx);
}
+std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ShardKeyPattern& newShardKeyPattern,
+ const NamespaceString& tempNss,
+ const ShardId& recipientShard) {
+ std::list<boost::intrusive_ptr<DocumentSource>> stages;
+
+ BSONObj replaceWithBSON = BSON("$replaceWith" << BSON("original"
+ << "$$ROOT"));
+ stages.emplace_back(
+ DocumentSourceReplaceRoot::createFromBson(replaceWithBSON.firstElement(), expCtx));
+
+ invariant(tempNss.isTemporaryReshardingCollection());
+ std::string cacheChunksColl = "cache.chunks." + tempNss.toString();
+ BSONObjBuilder lookupBuilder;
+ lookupBuilder.append("from",
+ BSON("db"
+ << "config"
+ << "coll" << cacheChunksColl));
+ {
+ BSONObjBuilder letBuilder(lookupBuilder.subobjStart("let"));
+ {
+ BSONArrayBuilder skVarBuilder(letBuilder.subarrayStart("sk"));
+ for (auto&& field : newShardKeyPattern.toBSON()) {
+ skVarBuilder.append("$original." + field.fieldNameStringData());
+ }
+ }
+ }
+ BSONArrayBuilder lookupPipelineBuilder(lookupBuilder.subarrayStart("pipeline"));
+ lookupPipelineBuilder.append(
+ BSON("$match" << BSON(
+ "$expr" << BSON("$eq" << BSON_ARRAY(recipientShard.toString() << "$shard")))));
+ lookupPipelineBuilder.append(BSON(
+ "$match" << BSON(
+ "$expr" << BSON(
+ "$let" << BSON(
+ "vars" << BSON("min" << BSON("$map" << BSON("input" << BSON("$objectToArray"
+ << "$_id")
+ << "in"
+ << "$$this.v"))
+ << "max"
+ << BSON("$map" << BSON("input" << BSON("$objectToArray"
+ << "$max")
+ << "in"
+ << "$$this.v")))
+ << "in"
+ << BSON(
+ "$and" << BSON_ARRAY(
+ BSON("$gte" << BSON_ARRAY("$$sk"
+ << "$$min"))
+ << BSON("$cond" << BSON(
+ "if"
+ << BSON("$allElementsTrue" << BSON_ARRAY(BSON(
+ "$map"
+ << BSON("input"
+ << "$$max"
+ << "in"
+ << BSON("$eq" << BSON_ARRAY(
+ BSON("$type"
+ << "$$this")
+ << "maxKey"))))))
+ << "then"
+ << BSON("$lte" << BSON_ARRAY("$$sk"
+ << "$$max"))
+ << "else"
+ << BSON("$lt" << BSON_ARRAY("$$sk"
+ << "$$max")))))))))));
+
+ lookupPipelineBuilder.done();
+ lookupBuilder.append("as", "intersectingChunk");
+ BSONObj lookupBSON(BSON("" << lookupBuilder.obj()));
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx));
+ stages.emplace_back(DocumentSourceMatch::create(
+ BSON("intersectingChunk" << BSON("$ne" << BSONArray())), expCtx));
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(BSON("$replaceWith"
+ << "$original")
+ .firstElement(),
+ expCtx));
+ return Pipeline::create(std::move(stages), expCtx);
+}
+
boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const NamespaceString& sourceNss,
BSONObj fullDocument) {
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 235d6474ff6..0837037ef10 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -147,5 +147,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const NamespaceString& sourceNss,
BSONObj fullDocument);
+/**
+ * Creates pipeline for filtering collection data matching the recipient shard.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ShardKeyPattern& newShardKeyPattern,
+ const NamespaceString& sourceNss,
+ const ShardId& recipientShard);
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp
index 18b2e37a984..1fd73c11a4f 100644
--- a/src/mongo/db/s/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding_util_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/json.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
@@ -56,6 +57,7 @@ class MockMongoInterface final : public StubMongoProcessInterface {
public:
MockMongoInterface(std::deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
+
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
Pipeline* ownedPipeline, bool allowTargetingShards = true) final {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
@@ -1161,5 +1163,118 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) {
ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions));
}
+class ReshardingCollectionCloneTest : public AggregationContextFixture {
+protected:
+ const NamespaceString& sourceNss() {
+ return _sourceNss;
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext(
+ NamespaceString sourceNss) {
+ _sourceNss = sourceNss;
+ NamespaceString foreignNss("config.cache.chunks." + sourceNss.toString());
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(
+ new ExpressionContextForTest(getOpCtx(), sourceNss));
+ expCtx->setResolvedNamespace(sourceNss, {sourceNss, {}});
+ expCtx->setResolvedNamespace(foreignNss, {foreignNss, {}});
+ return expCtx;
+ }
+
+
+ std::deque<DocumentSource::GetNextResult> makeForeignData(const ShardKeyPattern& pattern) {
+ const std::initializer_list<const char*> data{
+ "{_id: { x : { $minKey : 1 } }, max: { x : 0.0 }, shard: 'shard1' }",
+ "{_id: { x : 0.0 }, max: { x : { $maxKey : 1 } }, shard: 'shard2' }"};
+ std::deque<DocumentSource::GetNextResult> results;
+ for (auto&& json : data) {
+ results.emplace_back(Document(fromjson(json)));
+ }
+ return results;
+ }
+
+ std::deque<DocumentSource::GetNextResult> makeSourceData(const ShardKeyPattern& pattern) {
+ const std::initializer_list<const char*> data{
+ "{_id: 1, x: { $minKey: 1} }",
+ "{_id: 2, x: -0.001}",
+ "{_id: 3, x: NumberLong(0)}",
+ "{_id: 4, x: 0.0}",
+ "{_id: 5, x: 0.001}",
+ "{_id: 6, x: { $maxKey: 1} }",
+ };
+ std::deque<DocumentSource::GetNextResult> results;
+ for (auto&& json : data) {
+ results.emplace_back(Document(fromjson(json)));
+ }
+ return results;
+ }
+
+private:
+ NamespaceString _sourceNss;
+};
+
+TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMinKey) {
+ NamespaceString fromNs("test", "system.resharding.coll");
+ ShardKeyPattern pattern(BSON("x" << 1));
+
+ auto expCtx = createExpressionContext(fromNs);
+
+ auto foreignData = makeForeignData(pattern);
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
+
+ auto sourceData = makeSourceData(pattern);
+ auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
+
+ auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard1"));
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ BSONObj val = fromjson("{_id: 1, x: {$minKey : 1}}");
+ ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMaxKey) {
+ NamespaceString fromNs("test", "system.resharding.coll");
+ ShardKeyPattern pattern(BSON("x" << 1));
+
+ auto expCtx = createExpressionContext(fromNs);
+
+ auto foreignData = makeForeignData(pattern);
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
+
+ auto sourceData = makeSourceData(pattern);
+ auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
+
+ auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2"));
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ BSONObj val = fromjson("{_id: 3, x: NumberLong(0)}");
+ ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ val = fromjson("{_id: 6, x: {$maxKey: 1}}");
+ ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
+
+
+ ASSERT(!pipeline->getNext());
+}
+
} // namespace
} // namespace mongo