diff options
4 files changed, 61 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp index 74864fe8856..4e49d30e355 100644 --- a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp +++ b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/s/query/sharded_agg_test_fixture.h" #include "mongo/unittest/unittest.h" @@ -230,7 +231,6 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedPathAndIdIsProjected TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds) { auto shardKeyPattern = ShardKeyPattern(BSON("_id.a" << 1 << "b" << 1 << "_id.b" << "hashed")); - auto pipeline = Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), @@ -247,5 +247,27 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds) BSON("_id.a" << 20 << "b" << 1 << "_id.b" << 2598032665634823220LL)); ASSERT(!pipeline->getNext()); } + +TEST_F(ReshardingSplitPolicyTest, ReshardingSucceedsWithLimitedMemoryForSortOperation) { + RAIIServerParameterControllerForTest sortMaxMemory{ + "internalQueryMaxBlockingSortMemoryUsageBytes", 100}; + auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1)); + const NamespaceString ns("reshard", "foo"); + auto pipelineDocSource = + ReshardingSplitPolicy::makePipelineDocumentSource_forTest(operationContext(), + kTestAggregateNss, + shardKeyPattern, + 3 /*numInitialChunks*/, + 2 /*samplesPerChunk*/); + auto mockSource = DocumentSourceMock::createForTest( + {"{_id: 20, a: 4}", "{_id: 30, a: 3}", "{_id: 40, a: 2}", "{_id: 50, a: 1}"}, expCtx()); + pipelineDocSource->getPipeline_forTest()->addInitialSource(mockSource.get()); + auto next = pipelineDocSource->getNext(); + ASSERT_BSONOBJ_EQ(BSON("a" << 2), next.value()); + next = pipelineDocSource->getNext(); + ASSERT_BSONOBJ_EQ(BSON("a" << 4), next.value()); + ASSERT(!pipelineDocSource->getNext()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 724882a29fc..ec76b8b13a9 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -33,6 +33,7 @@ #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/curop.h" +#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" @@ -53,6 +54,7 @@ namespace { using ChunkDistributionMap = stdx::unordered_map<ShardId, size_t>; using ZoneShardMap = StringMap<std::vector<ShardId>>; +using boost::intrusive_ptr; std::vector<ShardId> getAllShardIdsSorted(OperationContext* opCtx) { // Many tests assume that chunks will be placed on shards @@ -791,11 +793,24 @@ void ReshardingSplitPolicy::_appendSplitPointsFromSample(BSONObjSet* splitPoints } std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource> +ReshardingSplitPolicy::makePipelineDocumentSource_forTest(OperationContext* opCtx, + const NamespaceString& ns, + const ShardKeyPattern& shardKey, + int numInitialChunks, + int samplesPerChunk) { + MakePipelineOptions opts; + opts.attachCursorSource = false; + return _makePipelineDocumentSource( + opCtx, ns, shardKey, numInitialChunks, samplesPerChunk, std::move(opts)); +} + +std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource> ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, const NamespaceString& ns, const ShardKeyPattern& shardKey, int numInitialChunks, - int samplesPerChunk) { + int samplesPerChunk, + MakePipelineOptions opts) { auto rawPipeline = createRawPipeline(shardKey, numInitialChunks - 1, samplesPerChunk); StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}}; @@ -810,7 +825,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, boost::none, /* explain */ false, /* fromMongos */ false, /* needsMerge */ - false, /* allowDiskUse */ + true, /* allowDiskUse */ true, /* bypassDocumentValidation */ false, /* isMapReduceCommand */ ns, @@ -820,8 +835,10 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, std::move(resolvedNamespaces), boost::none); /* collUUID */ - return std::make_unique<PipelineDocumentSource>(Pipeline::makePipeline(rawPipeline, expCtx, {}), - samplesPerChunk - 1); + expCtx->tempDir = storageGlobalParams.dbpath + "/tmp"; + + return std::make_unique<PipelineDocumentSource>( + Pipeline::makePipeline(rawPipeline, expCtx, opts), samplesPerChunk - 1); } ReshardingSplitPolicy::PipelineDocumentSource::PipelineDocumentSource( diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 563980f10d0..408ffb1ac9d 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -40,7 +40,6 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/string_map.h" - namespace mongo { struct SplitPolicyParams { @@ -291,6 +290,7 @@ public: public: virtual ~SampleDocumentSource(){}; virtual boost::optional<BSONObj> getNext() = 0; + virtual Pipeline* getPipeline_forTest() = 0; }; // Provides documents from a real Pipeline @@ -299,6 +299,9 @@ public: PipelineDocumentSource() = delete; PipelineDocumentSource(SampleDocumentPipeline pipeline, int skip); boost::optional<BSONObj> getNext() override; + Pipeline* getPipeline_forTest() override { + return _pipeline.get(); + } private: SampleDocumentPipeline _pipeline; @@ -334,13 +337,21 @@ public: static constexpr int kDefaultSamplesPerChunk = 10; + static std::unique_ptr<SampleDocumentSource> makePipelineDocumentSource_forTest( + OperationContext* opCtx, + const NamespaceString& ns, + const ShardKeyPattern& shardKey, + int numInitialChunks, + int samplesPerChunk); + private: static std::unique_ptr<SampleDocumentSource> _makePipelineDocumentSource( OperationContext* opCtx, const NamespaceString& ns, const ShardKeyPattern& shardKey, int numInitialChunks, - int samplesPerChunk); + int samplesPerChunk, + MakePipelineOptions opts = {}); /** * Returns a set of split points to ensure that chunk boundaries will align with the zone diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 119e4ff98f7..cae1f09d826 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -1726,6 +1726,10 @@ public: return next; } + Pipeline* getPipeline_forTest() override { + return nullptr; + } + private: std::list<BSONObj> _toReturn; }; |