summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-10-25 00:57:16 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-12-14 11:01:55 +0000
commit9611f1368349db2fcbe7a7090d7bb7afe0f789af (patch)
tree22c8410fc9e3e83966103fea5ffc8349a7b3ce41
parent8991cb9b4f54d943fb503c817ffb6aa59873099c (diff)
downloadmongo-9611f1368349db2fcbe7a7090d7bb7afe0f789af.tar.gz
SERVER-31689 Allow $facet to merge on mongoS if all its pipelines are eligible
-rw-r--r--jstests/aggregation/mongos_merge.js65
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp51
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp28
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp8
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
5 files changed, 146 insertions, 11 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
index b8763a45545..a5d1eb05162 100644
--- a/jstests/aggregation/mongos_merge.js
+++ b/jstests/aggregation/mongos_merge.js
@@ -204,6 +204,51 @@
expectedCount: 300
});
+ // Test that $facet is merged on mongoS if all pipelines are mongoS-mergeable regardless of
+ // 'allowDiskUse'.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_facet_all_pipes_eligible_for_mongos",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $facet: {
+ pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}],
+ pipe2: [{$match: {_id: {$lt: 0}}}, {$project: {_id: 0, a: 1}}]
+ }
+ }
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 1
+ });
+
+ // Test that $facet is merged on mongoD if any pipeline requires a primary shard merge,
+ // regardless of 'allowDiskUse'.
+ assertMergeOnMongoD({
+ testName: "agg_mongos_merge_facet_pipe_needs_primary_shard_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $facet: {
+ pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}],
+ pipe2: [
+ {$match: {_id: {$lt: 0}}},
+ {
+ $lookup: {
+ from: unshardedColl.getName(),
+ localField: "_id",
+ foreignField: "_id",
+ as: "lookupField"
+ }
+ }
+ ]
+ }
+ }
+ ],
+ mergeType: "primaryShard",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 1
+ });
+
// Test that a pipeline whose merging half can be run on mongos using only the mongos
// execution machinery returns the correct results.
// TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos.
@@ -273,6 +318,23 @@
expectedCount: 200
});
+ // Test that $facet is only merged on mongoS if all pipelines are mongoS-mergeable when
+ // 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_facet_allow_disk_use",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $facet: {
+ pipe1: [{$match: {_id: {$gt: 0}}}, {$skip: 10}, {$limit: 150}],
+ pipe2: [{$match: {_id: {$lt: 0}}}, {$sort: {a: -1}}]
+ }
+ }
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 1
+ });
+
// Test that $bucketAuto is only merged on mongoS if 'allowDiskUse' is not set.
assertMergeOnMongoX({
testName: "agg_mongos_merge_bucket_auto_allow_disk_use",
@@ -344,6 +406,9 @@
{$group: {_id: "$_id", doc: {$push: "$$CURRENT"}}},
{$unwind: "$doc"},
{$replaceRoot: {newRoot: "$doc"}},
+ {$facet: {facetPipe: [{$match: {_id: {$gte: -200, $lte: 200}}}]}},
+ {$unwind: "$facetPipe"},
+ {$replaceRoot: {newRoot: "$facetPipe"}},
{
$redact: {
$cond:
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 22cf25af629..1fa0ae1fc2a 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -249,19 +249,31 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints(
});
// Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
- // This means that if any stage in any of the $facet pipelines requires the primary shard, then
- // the entire $facet must happen on the merger, and the merger must be the primary shard.
- const bool needsPrimaryShard =
- std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
- const auto sources = facet.pipeline->getSources();
- return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
- return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
- });
- });
+ // This means that if any stage in any of the $facet pipelines needs to run on the primary shard
+ // or on mongoS, then the entire $facet stage must run there.
+ static const std::set<HostTypeRequirement> definitiveHosts = {
+ HostTypeRequirement::kMongoS, HostTypeRequirement::kPrimaryShard};
+
+ HostTypeRequirement host = HostTypeRequirement::kNone;
+
+ // Iterate through each pipeline to determine the HostTypeRequirement for the $facet stage.
+ // Because we have already validated that there are no conflicting HostTypeRequirements during
+ // parsing, if we observe any of the 'definitiveHosts' types in any of the pipelines then the
+ // entire $facet stage must run on that host and iteration can stop. At the end of this process,
+ // 'host' will be the $facet's final HostTypeRequirement.
+ for (auto fi = _facets.begin(); fi != _facets.end() && !definitiveHosts.count(host); fi++) {
+ const auto& sources = fi->pipeline->getSources();
+ for (auto si = sources.begin(); si != sources.end() && !definitiveHosts.count(host); si++) {
+ const auto hostReq = (*si)->constraints().resolvedHostTypeRequirement(pExpCtx);
+ if (hostReq != HostTypeRequirement::kNone) {
+ host = hostReq;
+ }
+ }
+ }
return {StreamType::kBlocking,
PositionRequirement::kNone,
- needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard,
+ host,
mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed};
}
@@ -292,12 +304,31 @@ DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker*
intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ boost::optional<std::string> needsMongoS;
+ boost::optional<std::string> needsShard;
+
std::vector<FacetPipeline> facetPipelines;
for (auto&& rawFacet : extractRawPipelines(elem)) {
const auto facetName = rawFacet.first;
auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx));
+ // Validate that none of the facet pipelines have any conflicting HostTypeRequirements. This
+ // verifies both that all stages within each pipeline are consistent, and that the pipelines
+ // are consistent with one another.
+ if (!needsShard && pipeline->needsShard()) {
+ needsShard.emplace(facetName);
+ }
+ if (!needsMongoS && pipeline->needsMongosMerger()) {
+ needsMongoS.emplace(facetName);
+ }
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "$facet pipeline '" << *needsMongoS
+ << "' must run on mongoS, but '"
+ << *needsShard
+ << "' requires a shard",
+ !(needsShard && needsMongoS));
+
facetPipelines.emplace_back(facetName, std::move(pipeline));
}
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index a2de0f3c768..1ed1f4a779c 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -174,6 +174,32 @@ TEST_F(DocumentSourceFacetTest, ShouldAcceptLegalSpecification) {
ASSERT_TRUE(facetStage.get());
}
+TEST_F(DocumentSourceFacetTest, ShouldRejectConflictingHostTypeRequirementsWithinSinglePipeline) {
+ auto ctx = getExpCtx();
+ ctx->inMongos = true;
+
+ auto spec = fromjson(
+ "{$facet: {badPipe: [{$_internalSplitPipeline: {mergeType: 'anyShard'}}, "
+ "{$_internalSplitPipeline: {mergeType: 'mongos'}}]}}");
+
+ ASSERT_THROWS_CODE(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectConflictingHostTypeRequirementsAcrossPipelines) {
+ auto ctx = getExpCtx();
+ ctx->inMongos = true;
+
+ auto spec = fromjson(
+ "{$facet: {shardPipe: [{$_internalSplitPipeline: {mergeType: 'anyShard'}}], mongosPipe: "
+ "[{$_internalSplitPipeline: {mergeType: 'mongos'}}]}}");
+
+ ASSERT_THROWS_CODE(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+}
+
//
// Evaluation.
//
@@ -675,7 +701,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement ==
- DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard);
+ DocumentSource::StageConstraints::HostTypeRequirement::kNone);
}
} // namespace
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 338a2d73617..6f71c129707 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -471,6 +471,14 @@ bool Pipeline::needsMongosMerger() const {
});
}
+bool Pipeline::needsShard() const {
+ return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
+ auto hostType = stage->constraints().resolvedHostTypeRequirement(pCtx);
+ return (hostType == HostTypeRequirement::kAnyShard ||
+ hostType == HostTypeRequirement::kPrimaryShard);
+ });
+}
+
bool Pipeline::canRunOnMongos() const {
return _pipelineCanRunOnMongoS().isOK();
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 6697848fde3..51cf4c34da0 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -248,6 +248,11 @@ public:
bool needsMongosMerger() const;
/**
+ * Returns 'true' if any stage in the pipeline must run on a shard.
+ */
+ bool needsShard() const;
+
+ /**
* Returns true if the pipeline can run on mongoS, but is not obliged to; that is, it can run
* either on mongoS or on a shard.
*/