summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2020-11-11 13:59:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-11 15:12:30 +0000
commitc71607de7b74e72b188abc2c11fca31ba009dbf4 (patch)
tree90f9ca489750b12e92008199e23a2cd421350eab
parent7950f071b21957be1ab8e5ac2db8b650695a2bd0 (diff)
downloadmongo-c71607de7b74e72b188abc2c11fca31ba009dbf4.tar.gz
SERVER-52691 Unit test agg pipeline through ReshardingCollectionCloner.
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp136
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp247
-rw-r--r--src/mongo/db/s/resharding_util.cpp86
-rw-r--r--src/mongo/db/s/resharding_util.h9
-rw-r--r--src/mongo/db/s/resharding_util_test.cpp236
7 files changed, 350 insertions, 375 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d259e971d6f..54f07474872 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -476,6 +476,7 @@ env.CppUnitTest(
'resharding_collection_test.cpp',
'resharding_destined_recipient_test.cpp',
'resharding_txn_cloner_test.cpp',
+ 'resharding/resharding_collection_cloner_test.cpp',
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
@@ -500,6 +501,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/keys_collection_client_direct',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/pipeline/document_source_mock',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_interface_local',
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 699df230b40..015f422f8bf 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -35,12 +35,18 @@
#include <utility>
+#include "mongo/bson/json.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
@@ -66,27 +72,104 @@ ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardK
_atClusterTime(atClusterTime),
_outputNss(std::move(outputNss)) {}
-std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_makePipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ShardId& recipientShard,
- Timestamp atClusterTime,
- const NamespaceString& outputNss) {
+std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipeline(
+ OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {
+ using Doc = Document;
+ using Arr = std::vector<Value>;
+ using V = Value;
- std::vector<BSONObj> serializedPipeline =
- createAggForCollectionCloning(expCtx, _newShardKeyPattern, outputNss, recipientShard)
- ->serializeToBson();
+ // Assume that the input collection isn't a view. The collectionUUID parameter to
+ // the aggregate would enforce this anyway.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}};
- AggregationRequest request(_sourceNss, std::move(serializedPipeline));
+ // Assume that the config.cache.chunks collection isn't a view either.
+ auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ auto tempCacheChunksNss =
+ NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
+ resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}};
+
+ auto expCtx = make_intrusive<ExpressionContext>(opCtx,
+ boost::none, /* explain */
+ false, /* fromMongos */
+ false, /* needsMerge */
+ false, /* allowDiskUse */
+ false, /* bypassDocumentValidation */
+ false, /* isMapReduceCommand */
+ _sourceNss,
+ boost::none, /* runtimeConstants */
+ nullptr, /* collator */
+ std::move(mongoProcessInterface),
+ std::move(resolvedNamespaces),
+ _sourceUUID);
+
+ Pipeline::SourceContainer stages;
+
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
+ fromjson("{$replaceWith: {original: '$$ROOT'}}").firstElement(), expCtx));
+
+ Arr extractShardKeyExpr;
+ for (auto&& field : _newShardKeyPattern.toBSON()) {
+ if (ShardKeyPattern::isHashedPatternEl(field)) {
+ extractShardKeyExpr.emplace_back(
+ Doc{{"$toHashedIndexKey", "$original." + field.fieldNameStringData()}});
+ } else {
+ extractShardKeyExpr.emplace_back("$original." + field.fieldNameStringData());
+ }
+ }
+
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(
+ Doc{{"$lookup",
+ Doc{{"from",
+ Doc{{"db", tempCacheChunksNss.db()}, {"coll", tempCacheChunksNss.coll()}}},
+ {"let", Doc{{"sk", extractShardKeyExpr}}},
+ {"pipeline",
+ Arr{V{Doc{{"$match",
+ Doc{{"$expr",
+ Doc{{"$eq",
+ Arr{V{"$shard"_sd}, V{_recipientShard.toString()}}}}}}}}},
+ V{Doc(fromjson("{$match: {$expr: {$let: {\
+ vars: {\
+ min: {$map: {input: {$objectToArray: '$_id'}, in: '$$this.v'}},\
+ max: {$map: {input: {$objectToArray: '$max'}, in: '$$this.v'}}\
+ },\
+ in: {$and: [\
+ {$gte: ['$$sk', '$$min']},\
+ {$cond: {\
+ if: {$allElementsTrue: [{$map: {\
+ input: '$$max',\
+ in: {$eq: [{$type: '$$this'}, 'maxKey']}\
+ }}]},\
+ then: {$lte: ['$$sk', '$$max']},\
+ else: {$lt: ['$$sk', '$$max']}\
+ }}\
+ ]}\
+ }}}}"))}}},
+ {"as", "intersectingChunk"_sd}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ stages.emplace_back(
+ DocumentSourceMatch::create(fromjson("{intersectingChunk: {$ne: []}}"), expCtx));
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
+ fromjson("{$replaceWith: '$original'}").firstElement(), expCtx));
+ return Pipeline::create(std::move(stages), expCtx);
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest(
+ const Pipeline& pipeline) {
+ AggregationRequest request(_sourceNss, pipeline.serializeToBson());
request.setCollectionUUID(_sourceUUID);
request.setHint(BSON("_id" << 1));
request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName
<< repl::readConcernLevels::kSnapshotName
<< repl::ReadConcernArgs::kAtClusterTimeFieldName
- << atClusterTime));
+ << _atClusterTime));
// TODO SERVER-52692: Set read preference to nearest.
// request.setUnwrappedReadPref();
- return sharded_agg_helpers::targetShardsAndAddMergeCursors(std::move(expCtx),
+ return sharded_agg_helpers::targetShardsAndAddMergeCursors(pipeline.getContext(),
std::move(request));
}
@@ -208,34 +291,9 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return ExecutorFuture(executor)
.then([this, serviceContext] {
return _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) {
- // Assume that the input collection isn't a view. The collectionUUID parameter to
- // the aggregate would enforce this anyway.
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}};
-
- // Assume that the config.cache.chunks collection isn't a view either.
- auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
- auto tempCacheChunksNss =
- NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
- resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss,
- std::vector<BSONObj>{}};
-
- auto expCtx =
- make_intrusive<ExpressionContext>(opCtx,
- boost::none, /* explain */
- false, /* fromMongos */
- false, /* needsMerge */
- false, /* allowDiskUse */
- false, /* bypassDocumentValidation */
- false, /* isMapReduceCommand */
- _sourceNss,
- boost::none, /* runtimeConstants */
- nullptr, /* collator */
- MongoProcessInterface::create(opCtx),
- std::move(resolvedNamespaces),
- _sourceUUID);
-
- auto pipeline = _makePipeline(expCtx, _recipientShard, _atClusterTime, _outputNss);
+ auto pipeline = _targetAggregationRequest(
+ *makePipeline(opCtx, MongoProcessInterface::create(opCtx)));
+
pipeline->detachFromOperationContext();
return pipeline;
});
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 8b9b467a718..56a70c6d3db 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -60,17 +60,16 @@ public:
Timestamp atClusterTime,
NamespaceString outputNss);
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
+ OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface);
+
ExecutorFuture<void> run(ServiceContext* serviceContext,
std::shared_ptr<executor::TaskExecutor>);
private:
static constexpr StringData kClientName = "ReshardingCollectionCloner"_sd;
- std::unique_ptr<Pipeline, PipelineDeleter> _makePipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ShardId& recipientShard,
- Timestamp atClusterTime,
- const NamespaceString& outputNss);
+ std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(const Pipeline& pipeline);
std::vector<InsertStatement> _fillBatch(Pipeline& pipeline);
void _insertBatch(OperationContext* opCtx, std::vector<InsertStatement>& batch);
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
new file mode 100644
index 00000000000..eefefd0fa11
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -0,0 +1,247 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/hasher.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/s/resharding/resharding_collection_cloner.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using Doc = Document;
+using Arr = std::vector<Value>;
+using V = Value;
+
+/**
+ * Mock interface to allow specifying mock results for the 'from' collection of the $lookup stage.
+ */
+class MockMongoInterface final : public StubMongoProcessInterface {
+public:
+ MockMongoInterface(std::deque<DocumentSource::GetNextResult> mockResults)
+ : _mockResults(std::move(mockResults)) {}
+
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ Pipeline* ownedPipeline, bool allowTargetingShards = true) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
+ ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
+
+ pipeline->addInitialSource(
+ DocumentSourceMock::createForTest(_mockResults, pipeline->getContext()));
+ return pipeline;
+ }
+
+private:
+ std::deque<DocumentSource::GetNextResult> _mockResults;
+};
+
+class ReshardingCollectionClonerTest : public ServiceContextTest {
+protected:
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
+ ShardKeyPattern newShardKeyPattern,
+ ShardId recipientShard,
+ std::deque<DocumentSource::GetNextResult> sourceCollectionData,
+ std::deque<DocumentSource::GetNextResult> configCacheChunksData) {
+ auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ ReshardingCollectionCloner cloner(std::move(newShardKeyPattern),
+ _sourceNss,
+ _sourceUUID,
+ std::move(recipientShard),
+ Timestamp(1, 0), /* dummy value */
+ std::move(tempNss));
+
+ auto pipeline = cloner.makePipeline(
+ _opCtx.get(), std::make_shared<MockMongoInterface>(std::move(configCacheChunksData)));
+
+ pipeline->addInitialSource(DocumentSourceMock::createForTest(
+ std::move(sourceCollectionData), pipeline->getContext()));
+
+ return pipeline;
+ }
+
+ template <class T>
+ auto getHashedElementValue(T value) {
+ return BSONElementHasher::hash64(BSON("" << value).firstElement(),
+ BSONElementHasher::DEFAULT_HASH_SEED);
+ }
+
+private:
+ const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd);
+ const CollectionUUID _sourceUUID = UUID::gen();
+
+ ServiceContext::UniqueOperationContext _opCtx = makeOperationContext();
+};
+
+TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) {
+ auto pipeline =
+ makePipeline(ShardKeyPattern(fromjson("{x: 1}")),
+ ShardId("shard1"),
+ {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -0.001}")),
+ Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 4, x: 0.0}")),
+ Doc(fromjson("{_id: 5, x: 0.001}")),
+ Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))},
+ {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'shard1'}")),
+ Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))});
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson());
+
+ ASSERT_FALSE(pipeline->getNext());
+}
+
+TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) {
+ auto pipeline =
+ makePipeline(ShardKeyPattern(fromjson("{x: 1}")),
+ ShardId("shard2"),
+ {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -0.001}")),
+ Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 4, x: 0.0}")),
+ Doc(fromjson("{_id: 5, x: 0.001}")),
+ Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))},
+ {Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0}, shard: 'shard1'}")),
+ Doc(fromjson("{_id: {x: 0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))});
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY), next->toBson());
+
+ ASSERT_FALSE(pipeline->getNext());
+}
+
+TEST_F(ReshardingCollectionClonerTest, HashedShardKey) {
+ auto pipeline = makePipeline(
+ ShardKeyPattern(fromjson("{x: 'hashed'}")),
+ ShardId("shard2"),
+ {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -1}")),
+ Doc(fromjson("{_id: 3, x: -0.123}")),
+ Doc(fromjson("{_id: 4, x: 0}")),
+ Doc(fromjson("{_id: 5, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 6, x: 0.123}")),
+ Doc(fromjson("{_id: 7, x: 1}")),
+ Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))},
+ // Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
+ // - [MinKey, hash(0)) : shard1
+ // - [hash(0), hash(0) + 1) : shard2
+ // - [hash(0) + 1, MaxKey] : shard3
+ {Doc{{"_id", Doc{{"x", V(MINKEY)}}},
+ {"max", Doc{{"x", getHashedElementValue(0)}}},
+ {"shard", "shard1"_sd}},
+ Doc{{"_id", Doc{{"x", getHashedElementValue(0)}}},
+ {"max", Doc{{"x", getHashedElementValue(0) + 1}}},
+ {"shard", "shard2"_sd}},
+ Doc{{"_id", Doc{{"x", getHashedElementValue(0) + 1}}},
+ {"max", Doc{{"x", V(MAXKEY)}}},
+ {"shard", "shard3"_sd}}});
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123), next->toBson());
+
+ ASSERT_FALSE(pipeline->getNext());
+}
+
+TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) {
+ auto pipeline = makePipeline(
+ ShardKeyPattern(fromjson("{x: 'hashed', y: 1}")),
+ ShardId("shard2"),
+ {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -1}")),
+ Doc(fromjson("{_id: 3, x: -0.123, y: -1}")),
+ Doc(fromjson("{_id: 4, x: 0, y: 0}")),
+ Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")),
+ Doc(fromjson("{_id: 6, x: 0.123}")),
+ Doc(fromjson("{_id: 7, x: 1}")),
+ Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))},
+ // Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
+ // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1
+ // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2
+ // - [{x: hash(0), y: 1}, {x: MaxKey, y: MaxKey}] : shard3
+ {Doc{{"_id", Doc{{"x", V(MINKEY)}, {"y", V(MINKEY)}}},
+ {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}},
+ {"shard", "shard1"_sd}},
+ Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 0}}},
+ {"max", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}},
+ {"shard", "shard2"_sd}},
+ Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}},
+ {"max", Doc{{"x", V(MAXKEY)}, {"y", V(MAXKEY)}}},
+ {"shard", "shard3"_sd}}});
+
+ auto next = pipeline->getNext();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "y" << 0), next->toBson());
+
+ ASSERT_FALSE(pipeline->getNext());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 3954023bd15..971b8726ba5 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -539,92 +539,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
return Pipeline::create(std::move(stages), expCtx);
}
-std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ShardKeyPattern& newShardKeyPattern,
- const NamespaceString& tempNss,
- const ShardId& recipientShard) {
- std::list<boost::intrusive_ptr<DocumentSource>> stages;
-
- BSONObj replaceWithBSON = BSON("$replaceWith" << BSON("original"
- << "$$ROOT"));
- stages.emplace_back(
- DocumentSourceReplaceRoot::createFromBson(replaceWithBSON.firstElement(), expCtx));
-
- invariant(tempNss.isTemporaryReshardingCollection(), tempNss.ns());
- std::string cacheChunksColl = "cache.chunks." + tempNss.toString();
- BSONObjBuilder lookupBuilder;
- lookupBuilder.append("from",
- BSON("db"
- << "config"
- << "coll" << cacheChunksColl));
- {
- BSONObjBuilder letBuilder(lookupBuilder.subobjStart("let"));
- {
- BSONArrayBuilder skVarBuilder(letBuilder.subarrayStart("sk"));
- for (auto&& field : newShardKeyPattern.toBSON()) {
- if (ShardKeyPattern::isHashedPatternEl(field)) {
- skVarBuilder.append(BSON("$toHashedIndexKey"
- << "$original." + field.fieldNameStringData()));
- } else {
- skVarBuilder.append("$original." + field.fieldNameStringData());
- }
- }
- }
- }
- BSONArrayBuilder lookupPipelineBuilder(lookupBuilder.subarrayStart("pipeline"));
- lookupPipelineBuilder.append(
- BSON("$match" << BSON(
- "$expr" << BSON("$eq" << BSON_ARRAY(recipientShard.toString() << "$shard")))));
- lookupPipelineBuilder.append(BSON(
- "$match" << BSON(
- "$expr" << BSON(
- "$let" << BSON(
- "vars" << BSON("min" << BSON("$map" << BSON("input" << BSON("$objectToArray"
- << "$_id")
- << "in"
- << "$$this.v"))
- << "max"
- << BSON("$map" << BSON("input" << BSON("$objectToArray"
- << "$max")
- << "in"
- << "$$this.v")))
- << "in"
- << BSON(
- "$and" << BSON_ARRAY(
- BSON("$gte" << BSON_ARRAY("$$sk"
- << "$$min"))
- << BSON("$cond" << BSON(
- "if"
- << BSON("$allElementsTrue" << BSON_ARRAY(BSON(
- "$map"
- << BSON("input"
- << "$$max"
- << "in"
- << BSON("$eq" << BSON_ARRAY(
- BSON("$type"
- << "$$this")
- << "maxKey"))))))
- << "then"
- << BSON("$lte" << BSON_ARRAY("$$sk"
- << "$$max"))
- << "else"
- << BSON("$lt" << BSON_ARRAY("$$sk"
- << "$$max")))))))))));
-
- lookupPipelineBuilder.done();
- lookupBuilder.append("as", "intersectingChunk");
- BSONObj lookupBSON(BSON("" << lookupBuilder.obj()));
- stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx));
- stages.emplace_back(DocumentSourceMatch::create(
- BSON("intersectingChunk" << BSON("$ne" << BSONArray())), expCtx));
- stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(BSON("$replaceWith"
- << "$original")
- .firstElement(),
- expCtx));
- return Pipeline::create(std::move(stages), expCtx);
-}
-
namespace resharding {
boost::optional<TypeCollectionDonorFields> getDonorFields(OperationContext* opCtx,
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 31ed606f1a8..46c33dd5c5f 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -193,15 +193,6 @@ boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const BSONObj& fullDocument);
/**
- * Creates pipeline for filtering collection data matching the recipient shard.
- */
-std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ShardKeyPattern& newShardKeyPattern,
- const NamespaceString& tempNss,
- const ShardId& recipientShard);
-
-/**
* Sentinel oplog format:
* {
* op: "n",
diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp
index 93fefd8cb99..38b4dee5121 100644
--- a/src/mongo/db/s/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding_util_test.cpp
@@ -1203,241 +1203,5 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) {
ASSERT(pipelineMatchesDeque(pipeline, expectedTransactions));
}
-class ReshardingCollectionCloneTest : public AggregationContextFixture {
-protected:
- const NamespaceString& sourceNss() {
- return _sourceNss;
- }
-
- boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext(
- NamespaceString sourceNss) {
- _sourceNss = sourceNss;
- NamespaceString foreignNss("config.cache.chunks." + sourceNss.toString());
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(
- new ExpressionContextForTest(getOpCtx(), sourceNss));
- expCtx->setResolvedNamespace(sourceNss, {sourceNss, {}});
- expCtx->setResolvedNamespace(foreignNss, {foreignNss, {}});
- return expCtx;
- }
-
-
- std::deque<DocumentSource::GetNextResult> makeForeignData(const ShardKeyPattern& pattern) {
- const std::initializer_list<const char*> data{
- "{_id: { x : { $minKey : 1 } }, max: { x : 0.0 }, shard: 'shard1' }",
- "{_id: { x : 0.0 }, max: { x : { $maxKey : 1 } }, shard: 'shard2' }"};
- std::deque<DocumentSource::GetNextResult> results;
- for (auto&& json : data) {
- results.emplace_back(Document(fromjson(json)));
- }
- return results;
- }
-
- std::deque<DocumentSource::GetNextResult> makeForeignData(std::vector<BSONObj> data) {
- std::deque<DocumentSource::GetNextResult> results;
- for (auto&& obj : data) {
- results.emplace_back(Document(obj));
- }
- return results;
- }
-
- std::deque<DocumentSource::GetNextResult> makeSourceData(const ShardKeyPattern& pattern) {
- const std::initializer_list<const char*> data{
- "{_id: 1, x: { $minKey: 1} }",
- "{_id: 2, x: -0.001}",
- "{_id: 3, x: NumberLong(0)}",
- "{_id: 4, x: 0.0}",
- "{_id: 5, x: 0.001}",
- "{_id: 6, x: { $maxKey: 1} }",
- };
- std::deque<DocumentSource::GetNextResult> results;
- for (auto&& json : data) {
- results.emplace_back(Document(fromjson(json)));
- }
- return results;
- }
-
- std::deque<DocumentSource::GetNextResult> makeSourceData(std::vector<BSONObj> data) {
- std::deque<DocumentSource::GetNextResult> results;
- for (auto&& obj : data) {
- results.emplace_back(Document(obj));
- }
- return results;
- }
-
-private:
- NamespaceString _sourceNss;
-};
-
-TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMinKey) {
- NamespaceString fromNs("test", "system.resharding.coll");
- ShardKeyPattern pattern(BSON("x" << 1));
-
- auto expCtx = createExpressionContext(fromNs);
-
- auto foreignData = makeForeignData(pattern);
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
-
- auto sourceData = makeSourceData(pattern);
- auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
-
- auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard1"));
- pipeline->addInitialSource(mockSource);
-
- auto next = pipeline->getNext();
- ASSERT(next);
- BSONObj val = fromjson("{_id: 1, x: {$minKey : 1}}");
- ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson());
-
- ASSERT(!pipeline->getNext());
-}
-
-TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicMaxKey) {
- NamespaceString fromNs("test", "system.resharding.coll");
- ShardKeyPattern pattern(BSON("x" << 1));
-
- auto expCtx = createExpressionContext(fromNs);
-
- auto foreignData = makeForeignData(pattern);
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
-
- auto sourceData = makeSourceData(pattern);
- auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
-
- auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2"));
- pipeline->addInitialSource(mockSource);
-
- auto next = pipeline->getNext();
- ASSERT(next);
- BSONObj val = fromjson("{_id: 3, x: NumberLong(0)}");
- ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- val = fromjson("{_id: 6, x: {$maxKey: 1}}");
- ASSERT_BSONOBJ_BINARY_EQ(val, next->toBson());
-
-
- ASSERT(!pipeline->getNext());
-}
-
-template <class T>
-auto getHashedElementValue(T value) {
- return BSONElementHasher::hash64(BSON("" << value).firstElement(),
- BSONElementHasher::DEFAULT_HASH_SEED);
-}
-
-TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicHashedExactMatch) {
- NamespaceString fromNs("test", "system.resharding.coll");
- ShardKeyPattern pattern(BSON("x"
- << "hashed"));
-
- auto expCtx = createExpressionContext(fromNs);
-
- // Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
- // - [MinKey, hash(0)) : shard1
- // - [hash(0), hash(0) + 1) : shard2
- // - [hash(0) + 1, MaxKey] : shard3
- auto foreignData =
- makeForeignData({BSON("_id" << BSON("x" << MINKEY) << "max"
- << BSON("x" << getHashedElementValue(0)) << "shard"
- << "shard1"),
- BSON("_id" << BSON("x" << getHashedElementValue(0)) << "max"
- << BSON("x" << getHashedElementValue(0) + 1) << "shard"
- << "shard2"),
- BSON("_id" << BSON("x" << getHashedElementValue(0) + 1) << "max"
- << BSON("x" << MAXKEY) << "shard"
- << "shard3")});
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
-
- // Documents in a mocked sharded collection.
- auto sourceData = makeSourceData({fromjson("{_id: 1, x: {$minKey: 1}}"),
- fromjson("{_id: 2, x: -1}"),
- fromjson("{_id: 3, x: -0.123}"),
- fromjson("{_id: 4, x: 0}"),
- fromjson("{_id: 5, x: NumberLong(0)}"),
- fromjson("{_id: 6, x: 0.123}"),
- fromjson("{_id: 7, x: 1}"),
- fromjson("{_id: 8, x: {$maxKey: 1}}")});
- auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
-
- auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2"));
- pipeline->addInitialSource(mockSource);
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 3, x: -0.123}"), next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 4, x: 0}"), next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 5, x: NumberLong(0)}"), next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 6, x: 0.123}"), next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
-}
-
-TEST_F(ReshardingCollectionCloneTest, CollectionClonePipelineBasicHashedExactMatchCompoundKey) {
- NamespaceString fromNs("test", "system.resharding.coll");
- ShardKeyPattern pattern(BSON("x"
- << "hashed"
- << "y" << 1));
-
- auto expCtx = createExpressionContext(fromNs);
-
- // Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
- // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1
- // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2
- // - [{x: hash(0), y: 1}, {x: MaxKey, y: MaxKey}] : shard3
- auto foreignData = makeForeignData(
- {BSON("_id" << BSON("x" << MINKEY << "y" << MINKEY) << "max"
- << BSON("x" << getHashedElementValue(0) << "y" << 0) << "shard"
- << "shard1"),
- BSON("_id" << BSON("x" << getHashedElementValue(0) << "y" << 0) << "max"
- << BSON("x" << (getHashedElementValue(0) + 0) << "y" << 1) << "shard"
- << "shard2"),
- BSON("_id" << BSON("x" << (getHashedElementValue(0) + 0) << "y" << 1) << "max"
- << BSON("x" << MAXKEY << "y" << MAXKEY) << "shard"
- << "shard3")});
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(foreignData));
-
- // Documents in a mocked sharded collection.
- auto sourceData = makeSourceData({fromjson("{_id: 1, x: {$minKey: 1}}"),
- fromjson("{_id: 2, x: -1}"),
- fromjson("{_id: 3, x: -0.123, y: -1}"),
- fromjson("{_id: 4, x: 0, y: 0}"),
- fromjson("{_id: 5, x: NumberLong(0), y: 1}"),
- fromjson("{_id: 6, x: 0.123}"),
- fromjson("{_id: 7, x: 1}"),
- fromjson("{_id: 8, x: {$maxKey: 1}}")});
- auto mockSource = DocumentSourceMock::createForTest(std::move(sourceData), expCtx);
-
- auto pipeline = createAggForCollectionCloning(expCtx, pattern, fromNs, ShardId("shard2"));
- pipeline->addInitialSource(mockSource);
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 4, x: 0, y: 0}"), next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
-}
-
} // namespace
} // namespace mongo