diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-25 00:57:16 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-12-14 11:01:55 +0000 |
commit | 9611f1368349db2fcbe7a7090d7bb7afe0f789af (patch) | |
tree | 22c8410fc9e3e83966103fea5ffc8349a7b3ce41 | |
parent | 8991cb9b4f54d943fb503c817ffb6aa59873099c (diff) | |
download | mongo-9611f1368349db2fcbe7a7090d7bb7afe0f789af.tar.gz |
SERVER-31689 Allow $facet to merge on mongoS if all its pipelines are eligible
-rw-r--r-- | jstests/aggregation/mongos_merge.js | 65 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet_test.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 5 |
5 files changed, 146 insertions, 11 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index b8763a45545..a5d1eb05162 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -204,6 +204,51 @@ expectedCount: 300 }); + // Test that $facet is merged on mongoS if all pipelines are mongoS-mergeable regardless of + // 'allowDiskUse'. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_facet_all_pipes_eligible_for_mongos", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $facet: { + pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}], + pipe2: [{$match: {_id: {$lt: 0}}}, {$project: {_id: 0, a: 1}}] + } + } + ], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); + + // Test that $facet is merged on mongoD if any pipeline requires a primary shard merge, + // regardless of 'allowDiskUse'. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_facet_pipe_needs_primary_shard_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $facet: { + pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}], + pipe2: [ + {$match: {_id: {$lt: 0}}}, + { + $lookup: { + from: unshardedColl.getName(), + localField: "_id", + foreignField: "_id", + as: "lookupField" + } + } + ] + } + } + ], + mergeType: "primaryShard", + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); + // Test that a pipeline whose merging half can be run on mongos using only the mongos // execution machinery returns the correct results. // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. @@ -273,6 +318,23 @@ expectedCount: 200 }); + // Test that $facet is only merged on mongoS if all pipelines are mongoS-mergeable when + // 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_facet_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $facet: { + pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}], + pipe2: [{$match: {_id: {$lt: 0}}}, {$sort: {a: -1}}] + } + } + ], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); + // Test that $bucketAuto is only merged on mongoS if 'allowDiskUse' is not set. assertMergeOnMongoX({ testName: "agg_mongos_merge_bucket_auto_allow_disk_use", @@ -344,6 +406,9 @@ {$group: {_id: "$_id", doc: {$push: "$$CURRENT"}}}, {$unwind: "$doc"}, {$replaceRoot: {newRoot: "$doc"}}, + {$facet: {facetPipe: [{$match: {_id: {$gte: -200, $lte: 200}}}]}}, + {$unwind: "$facetPipe"}, + {$replaceRoot: {newRoot: "$facetPipe"}}, { $redact: { $cond: diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 22cf25af629..1fa0ae1fc2a 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -249,19 +249,31 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints( }); // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). - // This means that if any stage in any of the $facet pipelines requires the primary shard, then - // the entire $facet must happen on the merger, and the merger must be the primary shard. - const bool needsPrimaryShard = - std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { - const auto sources = facet.pipeline->getSources(); - return std::any_of(sources.begin(), sources.end(), [&](const auto source) { - return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; - }); - }); + // This means that if any stage in any of the $facet pipelines needs to run on the primary shard + // or on mongoS, then the entire $facet stage must run there. + static const std::set<HostTypeRequirement> definitiveHosts = { + HostTypeRequirement::kMongoS, HostTypeRequirement::kPrimaryShard}; + + HostTypeRequirement host = HostTypeRequirement::kNone; + + // Iterate through each pipeline to determine the HostTypeRequirement for the $facet stage. + // Because we have already validated that there are no conflicting HostTypeRequirements during + // parsing, if we observe any of the 'definitiveHosts' types in any of the pipelines then the + // entire $facet stage must run on that host and iteration can stop. At the end of this process, + // 'host' will be the $facet's final HostTypeRequirement. + for (auto fi = _facets.begin(); fi != _facets.end() && !definitiveHosts.count(host); fi++) { + const auto& sources = fi->pipeline->getSources(); + for (auto si = sources.begin(); si != sources.end() && !definitiveHosts.count(host); si++) { + const auto hostReq = (*si)->constraints().resolvedHostTypeRequirement(pExpCtx); + if (hostReq != HostTypeRequirement::kNone) { + host = hostReq; + } + } + } return {StreamType::kBlocking, PositionRequirement::kNone, - needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard, + host, mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed}; } @@ -292,12 +304,31 @@ DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + boost::optional<std::string> needsMongoS; + boost::optional<std::string> needsShard; + std::vector<FacetPipeline> facetPipelines; for (auto&& rawFacet : extractRawPipelines(elem)) { const auto facetName = rawFacet.first; auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx)); + // Validate that none of the facet pipelines have any conflicting HostTypeRequirements. This + // verifies both that all stages within each pipeline are consistent, and that the pipelines + // are consistent with one another. + if (!needsShard && pipeline->needsShard()) { + needsShard.emplace(facetName); + } + if (!needsMongoS && pipeline->needsMongosMerger()) { + needsMongoS.emplace(facetName); + } + uassert(ErrorCodes::IllegalOperation, + str::stream() << "$facet pipeline '" << *needsMongoS + << "' must run on mongoS, but '" + << *needsShard + << "' requires a shard", + !(needsShard && needsMongoS)); + facetPipelines.emplace_back(facetName, std::move(pipeline)); } diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index a2de0f3c768..1ed1f4a779c 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -174,6 +174,32 @@ TEST_F(DocumentSourceFacetTest, ShouldAcceptLegalSpecification) { ASSERT_TRUE(facetStage.get()); } +TEST_F(DocumentSourceFacetTest, ShouldRejectConflictingHostTypeRequirementsWithinSinglePipeline) { + auto ctx = getExpCtx(); + ctx->inMongos = true; + + auto spec = fromjson( + "{$facet: {badPipe: [{$_internalSplitPipeline: {mergeType: 'anyShard'}}, " + "{$_internalSplitPipeline: {mergeType: 'mongos'}}]}}"); + + ASSERT_THROWS_CODE(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), + AssertionException, + ErrorCodes::IllegalOperation); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectConflictingHostTypeRequirementsAcrossPipelines) { + auto ctx = getExpCtx(); + ctx->inMongos = true; + + auto spec = fromjson( + "{$facet: {shardPipe: [{$_internalSplitPipeline: {mergeType: 'anyShard'}}], mongosPipe: " + "[{$_internalSplitPipeline: {mergeType: 'mongos'}}]}}"); + + ASSERT_THROWS_CODE(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), + AssertionException, + ErrorCodes::IllegalOperation); +} + // // Evaluation. // @@ -675,7 +701,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement == - DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard); + DocumentSource::StageConstraints::HostTypeRequirement::kNone); } } // namespace diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 338a2d73617..6f71c129707 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -471,6 +471,14 @@ bool Pipeline::needsMongosMerger() const { }); } +bool Pipeline::needsShard() const { + return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) { + auto hostType = stage->constraints().resolvedHostTypeRequirement(pCtx); + return (hostType == HostTypeRequirement::kAnyShard || + hostType == HostTypeRequirement::kPrimaryShard); + }); +} + bool Pipeline::canRunOnMongos() const { return _pipelineCanRunOnMongoS().isOK(); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 6697848fde3..51cf4c34da0 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -248,6 +248,11 @@ public: bool needsMongosMerger() const; /** + * Returns 'true' if any stage in the pipeline must run on a shard. + */ + bool needsShard() const; + + /** * Returns true if the pipeline can run on mongoS, but is not obliged to; that is, it can run * either on mongoS or on a shard. */ |