summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp24
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp25
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h15
-rw-r--r--src/mongo/db/s/config/initial_split_policy_test.cpp4
4 files changed, 61 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp
index 74864fe8856..4e49d30e355 100644
--- a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp
+++ b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/s/config/initial_split_policy.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/query/sharded_agg_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -230,7 +231,6 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedPathAndIdIsProjected
TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds) {
auto shardKeyPattern = ShardKeyPattern(BSON("_id.a" << 1 << "b" << 1 << "_id.b"
<< "hashed"));
-
auto pipeline =
Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
@@ -247,5 +247,27 @@ TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedPathSucceeds)
BSON("_id.a" << 20 << "b" << 1 << "_id.b" << 2598032665634823220LL));
ASSERT(!pipeline->getNext());
}
+
+TEST_F(ReshardingSplitPolicyTest, ReshardingSucceedsWithLimitedMemoryForSortOperation) {
+ RAIIServerParameterControllerForTest sortMaxMemory{
+ "internalQueryMaxBlockingSortMemoryUsageBytes", 100};
+ auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1));
+ const NamespaceString ns("reshard", "foo");
+ auto pipelineDocSource =
+ ReshardingSplitPolicy::makePipelineDocumentSource_forTest(operationContext(),
+ kTestAggregateNss,
+ shardKeyPattern,
+ 3 /*numInitialChunks*/,
+ 2 /*samplesPerChunk*/);
+ auto mockSource = DocumentSourceMock::createForTest(
+ {"{_id: 20, a: 4}", "{_id: 30, a: 3}", "{_id: 40, a: 2}", "{_id: 50, a: 1}"}, expCtx());
+ pipelineDocSource->getPipeline_forTest()->addInitialSource(mockSource.get());
+ auto next = pipelineDocSource->getNext();
+ ASSERT_BSONOBJ_EQ(BSON("a" << 2), next.value());
+ next = pipelineDocSource->getNext();
+ ASSERT_BSONOBJ_EQ(BSON("a" << 4), next.value());
+ ASSERT(!pipelineDocSource->getNext());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 724882a29fc..ec76b8b13a9 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/curop.h"
+#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
@@ -53,6 +54,7 @@ namespace {
using ChunkDistributionMap = stdx::unordered_map<ShardId, size_t>;
using ZoneShardMap = StringMap<std::vector<ShardId>>;
+using boost::intrusive_ptr;
std::vector<ShardId> getAllShardIdsSorted(OperationContext* opCtx) {
// Many tests assume that chunks will be placed on shards
@@ -791,11 +793,24 @@ void ReshardingSplitPolicy::_appendSplitPointsFromSample(BSONObjSet* splitPoints
}
std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource>
+ReshardingSplitPolicy::makePipelineDocumentSource_forTest(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const ShardKeyPattern& shardKey,
+ int numInitialChunks,
+ int samplesPerChunk) {
+ MakePipelineOptions opts;
+ opts.attachCursorSource = false;
+ return _makePipelineDocumentSource(
+ opCtx, ns, shardKey, numInitialChunks, samplesPerChunk, std::move(opts));
+}
+
+std::unique_ptr<ReshardingSplitPolicy::SampleDocumentSource>
ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx,
const NamespaceString& ns,
const ShardKeyPattern& shardKey,
int numInitialChunks,
- int samplesPerChunk) {
+ int samplesPerChunk,
+ MakePipelineOptions opts) {
auto rawPipeline = createRawPipeline(shardKey, numInitialChunks - 1, samplesPerChunk);
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}};
@@ -810,7 +825,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx,
boost::none, /* explain */
false, /* fromMongos */
false, /* needsMerge */
- false, /* allowDiskUse */
+ true, /* allowDiskUse */
true, /* bypassDocumentValidation */
false, /* isMapReduceCommand */
ns,
@@ -820,8 +835,10 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx,
std::move(resolvedNamespaces),
boost::none); /* collUUID */
- return std::make_unique<PipelineDocumentSource>(Pipeline::makePipeline(rawPipeline, expCtx, {}),
- samplesPerChunk - 1);
+ expCtx->tempDir = storageGlobalParams.dbpath + "/tmp";
+
+ return std::make_unique<PipelineDocumentSource>(
+ Pipeline::makePipeline(rawPipeline, expCtx, opts), samplesPerChunk - 1);
}
ReshardingSplitPolicy::PipelineDocumentSource::PipelineDocumentSource(
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 563980f10d0..408ffb1ac9d 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -40,7 +40,6 @@
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/string_map.h"
-
namespace mongo {
struct SplitPolicyParams {
@@ -291,6 +290,7 @@ public:
public:
virtual ~SampleDocumentSource(){};
virtual boost::optional<BSONObj> getNext() = 0;
+ virtual Pipeline* getPipeline_forTest() = 0;
};
// Provides documents from a real Pipeline
@@ -299,6 +299,9 @@ public:
PipelineDocumentSource() = delete;
PipelineDocumentSource(SampleDocumentPipeline pipeline, int skip);
boost::optional<BSONObj> getNext() override;
+ Pipeline* getPipeline_forTest() override {
+ return _pipeline.get();
+ }
private:
SampleDocumentPipeline _pipeline;
@@ -334,13 +337,21 @@ public:
static constexpr int kDefaultSamplesPerChunk = 10;
+ static std::unique_ptr<SampleDocumentSource> makePipelineDocumentSource_forTest(
+ OperationContext* opCtx,
+ const NamespaceString& ns,
+ const ShardKeyPattern& shardKey,
+ int numInitialChunks,
+ int samplesPerChunk);
+
private:
static std::unique_ptr<SampleDocumentSource> _makePipelineDocumentSource(
OperationContext* opCtx,
const NamespaceString& ns,
const ShardKeyPattern& shardKey,
int numInitialChunks,
- int samplesPerChunk);
+ int samplesPerChunk,
+ MakePipelineOptions opts = {});
/**
* Returns a set of split points to ensure that chunk boundaries will align with the zone
diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp
index 119e4ff98f7..cae1f09d826 100644
--- a/src/mongo/db/s/config/initial_split_policy_test.cpp
+++ b/src/mongo/db/s/config/initial_split_policy_test.cpp
@@ -1726,6 +1726,10 @@ public:
return next;
}
+ Pipeline* getPipeline_forTest() override {
+ return nullptr;
+ }
+
private:
std::list<BSONObj> _toReturn;
};