diff options
4 files changed, 61 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp index ccaecddf502..cc1f696fcaa 100644 --- a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp +++ b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/s/query/sharded_agg_test_fixture.h" #include "mongo/unittest/unittest.h" @@ -232,7 +233,6 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedPathAndIdIsProjected TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds) { auto shardKeyPattern = ShardKeyPattern(BSON("_id.a" << 1 << "b" << 1 << "_id.b" << "hashed")); - auto pipeline = Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), @@ -249,5 +249,27 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds) BSON("_id.a" << 20 << "b" << 1 << "_id.b" << 2598032665634823220LL)); ASSERT(!pipeline->getNext()); } + +TEST_F(ReshardingSplitPolicyTest, ReshardingSucceedsWithLimitedMemoryForSortOperation) { + RAIIServerParameterControllerForTest sortMaxMemory{ + "internalQueryMaxBlockingSortMemoryUsageBytes", 100}; + auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1)); + const NamespaceString ns("reshard", "foo"); + auto pipelineDocSource = + ReshardingSplitPolicy::makePipelineDocumentSource_forTest(operationContext(), + kTestAggregateNss, + shardKeyPattern, + 3 /*numInitialChunks*/, + 2 /*samplesPerChunk*/); + auto mockSource = DocumentSourceMock::createForTest( + {"{_id: 20, a: 4}", "{_id: 30, a: 3}", "{_id: 40, a: 2}", "{_id: 50, a: 1}"}, expCtx()); + pipelineDocSource->getPipeline_forTest()->addInitialSource(mockSource.get()); + auto next = pipelineDocSource->getNext(); + ASSERT_BSONOBJ_EQ(BSON("a" << 2), next.value()); + next = pipelineDocSource->getNext(); + ASSERT_BSONOBJ_EQ(BSON("a" << 4), next.value()); + ASSERT(!pipelineDocSource->getNext()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index a55f54b25b4..f01d1cadd63 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -37,6 +37,7 @@ #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/curop.h" +#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" @@ -55,6 +56,7 @@ namespace { using ChunkDistributionMap = stdx::unordered_map<ShardId, size_t>; using ZoneShardMap = StringMap<std::vector<ShardId>>; +using boost::intrusive_ptr; std::vector<ShardId> getAllShardIdsSorted(OperationContext* opCtx) { // Many tests assume that chunks will be placed on shards @@ -840,11 +842,24 @@ void ReshardingSplitPolicy::_appendSplitPointsFromSample(BSONObjSet* splitPoints } std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource> +ReshardingSplitPolicy::makePipelineDocumentSource_forTest(OperationContext* opCtx, + const NamespaceString& ns, + const ShardKeyPattern& shardKey, + int numInitialChunks, + int samplesPerChunk) { + MakePipelineOptions opts; + opts.attachCursorSource = false; + return _makePipelineDocumentSource( + opCtx, ns, shardKey, numInitialChunks, samplesPerChunk, std::move(opts)); +} + +std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource> ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, const NamespaceString& ns, const ShardKeyPattern& shardKey, int numInitialChunks, - int samplesPerChunk) { + int samplesPerChunk, + MakePipelineOptions opts) { auto rawPipeline = createRawPipeline(shardKey, numInitialChunks - 1, samplesPerChunk); StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}}; @@ -859,7 +874,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, boost::none, /* explain */ false, /* fromMongos */ false, /* needsMerge */ - false, /* allowDiskUse */ + true, /* allowDiskUse */ true, /* bypassDocumentValidation */ false, /* isMapReduceCommand */ ns, @@ -869,8 +884,10 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, std::move(resolvedNamespaces), boost::none); /* collUUID */ - return std::make_unique<PipelineDocumentSource>(Pipeline::makePipeline(rawPipeline, expCtx, {}), - samplesPerChunk - 1); + expCtx->tempDir = storageGlobalParams.dbpath + "/tmp"; + + return std::make_unique<PipelineDocumentSource>( + Pipeline::makePipeline(rawPipeline, expCtx, opts), samplesPerChunk - 1); } ReshardingSplitPolicy::PipelineDocumentSource::PipelineDocumentSource( diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index a5831b8747a..587e06652b5 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -41,7 +41,6 @@ #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/string_map.h" - namespace mongo { struct SplitPolicyParams { @@ -290,6 +289,7 @@ public: public: virtual ~SampleDocumentSource(){}; virtual boost::optional<BSONObj> getNext() = 0; + virtual Pipeline* getPipeline_forTest() = 0; }; // Provides documents from a real Pipeline @@ -298,6 +298,9 @@ public: PipelineDocumentSource() = delete; PipelineDocumentSource(SampleDocumentPipeline pipeline, int skip); boost::optional<BSONObj> getNext() override; + Pipeline* getPipeline_forTest() override { + return _pipeline.get(); + } private: SampleDocumentPipeline _pipeline; @@ -333,13 +336,21 @@ public: static constexpr int kDefaultSamplesPerChunk = 10; + static std::unique_ptr<SampleDocumentSource> makePipelineDocumentSource_forTest( + OperationContext* opCtx, + const NamespaceString& ns, + const ShardKeyPattern& shardKey, + int numInitialChunks, + int samplesPerChunk); + private: static std::unique_ptr<SampleDocumentSource> _makePipelineDocumentSource( OperationContext* opCtx, const NamespaceString& ns, const ShardKeyPattern& shardKey, int numInitialChunks, - int samplesPerChunk); + int samplesPerChunk, + MakePipelineOptions opts = {}); /** * Returns a set of split points to ensure that chunk boundaries will align with the zone diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index b3cde68454a..f7e2ce171a9 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -1785,6 +1785,10 @@ public: return next; } + Pipeline* getPipeline_forTest() override { + return nullptr; + } + private: std::list<BSONObj> _toReturn; }; |