summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2020-08-25 12:47:54 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-08 17:33:57 +0000
commit4edfad49d10c3f75cf1fbc2f909e424d79f4a7e1 (patch)
treedcb658fc56c58e486c7865c8351712ff9e66fde4
parent04b12743cbdcfea11b339e6ad21fc24dec8f6539 (diff)
downloadmongo-4edfad49d10c3f75cf1fbc2f909e424d79f4a7e1.tar.gz
SERVER-50246 Fix $unionWith explain with mode 'executionStats' to account for pushed down stages
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml3
-rw-r--r--jstests/aggregation/sources/unionWith/unionWith_explain.js89
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h10
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp5
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp1
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp1
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp1
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h1
10 files changed, 107 insertions, 44 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
index 41902b5f51a..9597b8980d1 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
@@ -11,8 +11,6 @@ selector:
# TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections.
- jstests/aggregation/sources/lookup/lookup_subpipeline.js
- jstests/aggregation/sources/graphLookup/variables.js
- # TODO SERVER-50246 Investigate and remove.
- - jstests/aggregation/sources/unionWith/unionWith_explain.js
exclude_with_any_tags:
# Tests tagged with the following will fail because they assume collections are not sharded.
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml
index 7dae8488b00..4f685288fb3 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml
@@ -24,9 +24,6 @@ selector:
# TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections.
- jstests/aggregation/sources/lookup/lookup_subpipeline.js
- jstests/aggregation/sources/graphLookup/variables.js
- # TODO SERVER-50246 Investigate and remove.
- - jstests/aggregation/sources/unionWith/unionWith_explain.js
-
exclude_with_any_tags:
# The next tag corresponds to the special error thrown by the set_read_preference_secondary.js
# override when it refuses to replace the readPreference of a particular command. Above each tag
diff --git a/jstests/aggregation/sources/unionWith/unionWith_explain.js b/jstests/aggregation/sources/unionWith/unionWith_explain.js
index 4bc762c7c21..841ba8a31bb 100644
--- a/jstests/aggregation/sources/unionWith/unionWith_explain.js
+++ b/jstests/aggregation/sources/unionWith/unionWith_explain.js
@@ -11,6 +11,7 @@
"use strict";
load("jstests/aggregation/extras/utils.js"); // arrayEq, documentEq
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+load("jstests/libs/analyze_plan.js"); // For getAggPlanStage.
const testDB = db.getSiblingDB(jsTestName());
const collA = testDB.A;
@@ -19,17 +20,25 @@ const collB = testDB.B;
collB.drop();
const collC = testDB.C;
collC.drop();
-for (let i = 0; i < 5; i++) {
+const docsPerColl = 5;
+for (let i = 0; i < docsPerColl; i++) {
assert.commandWorked(collA.insert({a: i, val: i, groupKey: i}));
assert.commandWorked(collB.insert({b: i, val: i * 2, groupKey: i}));
assert.commandWorked(collC.insert({c: i, val: 10 - i, groupKey: i}));
}
-function getUnionWithStage(pipeline) {
- for (let i = 0; i < pipeline.length; i++) {
- const stage = pipeline[i];
- if (stage.hasOwnProperty("$unionWith")) {
- return stage;
+function getUnionWithStage(explain) {
+ if (explain.splitPipeline != null) {
+ // If there is only one shard, the whole pipeline will run on that shard.
+ const subAggPipe = explain.splitPipeline === null ? explain.shards["shard-rs0"].stages
+ : explain.splitPipeline.mergerPart;
+ for (let i = 0; i < subAggPipe.length; i++) {
+ const stage = subAggPipe[i];
+ if (stage.hasOwnProperty("$unionWith")) {
+ return stage;
+ }
}
+ } else {
+ return getAggPlanStage(explain, "$unionWith");
}
}
@@ -50,13 +59,11 @@ function docEqWithIgnoredFields(union, regular) {
}
function assertExplainEq(unionExplain, regularExplain) {
+ const unionStage = getUnionWithStage(unionExplain);
+ assert(unionStage);
+ const unionSubExplain = unionStage.$unionWith.pipeline;
if (FixtureHelpers.isMongos(testDB)) {
const splitPipe = unionExplain.splitPipeline;
- // If there is only one shard, the whole pipeline will run on that shard.
- const subAggPipe =
- splitPipe === null ? unionExplain.shards["shard-rs0"].stages : splitPipe.mergerPart;
- const unionStage = getUnionWithStage(subAggPipe);
- const unionSubExplain = unionStage.$unionWith.pipeline;
if (splitPipe === null) {
assert.eq(unionSubExplain.splitPipeline,
regularExplain.splitPipeline,
@@ -72,8 +79,6 @@ function assertExplainEq(unionExplain, regularExplain) {
assert(docEqWithIgnoredFields(unionSubExplain.shards, regularExplain.shards),
buildErrorString(unionSubExplain, regularExplain, "shards"));
} else {
- const unionStage = getUnionWithStage(unionExplain.stages);
- const unionSubExplain = unionStage.$unionWith.pipeline;
if ("executionStats" in unionSubExplain[0].$cursor) {
const unionSubStats =
unionStage.$unionWith.pipeline[0].$cursor.executionStats.executionStages;
@@ -143,7 +148,7 @@ assert.commandWorked(testDB.runCommand({
}));
// Ensure that $unionWith can still execute explain if followed by a stage that calls dispose().
-var result = assert.commandWorked(testDB.runCommand({
+let result = assert.commandWorked(testDB.runCommand({
explain: {
aggregate: collA.getName(),
pipeline: [{$unionWith: collB.getName()}, {$limit: 1}],
@@ -153,29 +158,61 @@ var result = assert.commandWorked(testDB.runCommand({
// Test that execution stats inner cursor is populated.
result = collA.explain("executionStats").aggregate([{"$unionWith": collB.getName()}]);
-var expectedResult = collB.explain("executionStats").aggregate([]);
-assert(result.ok, result);
-assert(expectedResult.ok, result);
-// If we attached a fresh cursor stage, the number returned would still be zero.
+assert.commandWorked(result);
+let expectedResult = collB.explain("executionStats").aggregate([]);
+assert.commandWorked(expectedResult);
+let unionStage = getUnionWithStage(result);
+assert(unionStage, result);
if (FixtureHelpers.isMongos(testDB)) {
- if (result.splitPipeline != null) {
- const pipeline = result.splitPipeline.mergerPart;
- const unionStage = getUnionWithStage(pipeline);
- assert(docEqWithIgnoredFields(expectedResult.shards, unionStage.$unionWith.pipeline.shards),
- buildErrorString(unionStage, expectedResult));
- }
+ assert(docEqWithIgnoredFields(expectedResult.shards, unionStage.$unionWith.pipeline.shards),
+ buildErrorString(unionStage, expectedResult));
+ // TODO SERVER-50597 Fix unionWith nReturned stat in sharded cluster
+ // assert.eq(unionStage.nReturned, docsPerColl, unionStage);
} else {
- assert(result.stages[1].$unionWith.pipeline[0].$cursor.executionStats.nreturned != 0, result);
+ assert.eq(unionStage.nReturned, docsPerColl * 2, unionStage);
+ assert.eq(unionStage.$unionWith.pipeline[0].$cursor.executionStats.nReturned,
+ docsPerColl,
+ unionStage);
+}
+
+// Test explain with executionStats when the $unionWith stage doesn't need to read from it's
+// sub-pipeline.
+result = collA.explain("executionStats").aggregate([{"$unionWith": collB.getName()}, {$limit: 1}]);
+assert.commandWorked(result);
+unionStage = getUnionWithStage(result);
+assert(unionStage, result);
+if (!FixtureHelpers.isSharded(collB)) {
+ assert.eq(unionStage.nReturned, 1, unionStage);
+ assert.eq(unionStage.$unionWith, {coll: "B", pipeline: []}, unionStage);
+}
+
+// Test explain with executionStats when the $unionWith stage partially reads from it's
+// sub-pipeline.
+result = collA.explain("executionStats")
+ .aggregate([{"$unionWith": collB.getName()}, {$limit: docsPerColl + 1}]);
+assert.commandWorked(result);
+unionStage = getUnionWithStage(result);
+assert(unionStage, result);
+if (!FixtureHelpers.isSharded(collB)) {
+ assert.eq(unionStage.nReturned, docsPerColl + 1, unionStage);
+ // TODO SERVER-50597 Fix the executionStats of $unionWith sub-pipeline, the actual result should
+ // be 1 instead of docsPerColl.
+ assert.eq(unionStage.$unionWith.pipeline[0].$cursor.executionStats.nReturned,
+ docsPerColl,
+ unionStage);
}
// Test an index scan.
const indexedColl = testDB.indexed;
assert.commandWorked(indexedColl.createIndex({val: 1}));
indexedColl.insert([{val: 0}, {val: 1}, {val: 2}, {val: 3}]);
+
result = collA.explain("executionStats").aggregate([
{$unionWith: {coll: indexedColl.getName(), pipeline: [{$match: {val: {$gt: 2}}}]}}
]);
expectedResult = indexedColl.explain("executionStats").aggregate([{$match: {val: {$gt: 2}}}]);
-
assertExplainEq(result, expectedResult);
+
+// Test a nested $unionWith which itself should perform an index scan.
+testPipeline([{$unionWith: {coll: indexedColl.getName(), pipeline: [{$match: {val: {$gt: 0}}}]}}]);
})();
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 63d7e124a64..294b16accc3 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -66,7 +66,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns);
if (resolvedNs.pipeline.empty()) {
- return Pipeline::parse(std::move(currentPipeline), unionExpCtx, validatorCallback);
+ return Pipeline::parse(currentPipeline, unionExpCtx, validatorCallback);
}
auto resolvedPipeline = std::move(resolvedNs.pipeline);
resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size());
@@ -251,18 +251,40 @@ void DocumentSourceUnionWith::doDispose() {
Value DocumentSourceUnionWith::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
+ // There are several different possible states depending on the explain verbosity as well as
+ // the other stages in the pipeline:
+ // * If verbosity is queryPlanner, then the sub-pipeline should be untouched and we can
+ // explain it directly.
+ // * If verbosity is execStats or allPlansExecution, then whether or not to explain the
+ // sub-pipeline depends on if we've started reading from it. For instance, there could be a
+ // $limit stage after the $unionWith which results in only reading from the base collection
+ // branch and not the sub-pipeline.
+ Pipeline* pipeCopy = nullptr;
+ if (*explain == ExplainOptions::Verbosity::kQueryPlanner) {
+ pipeCopy = Pipeline::create(_pipeline->getSources(), _pipeline->getContext()).release();
+ } else if (*explain >= ExplainOptions::Verbosity::kExecStats &&
+ _executionState > ExecutionProgress::kIteratingSource) {
+ // We've either exhausted the sub-pipeline or at least started iterating it. Use the
+ // cached pipeline to get the explain output since the '_pipeline' may have been
+ // modified for any optimizations or pushdowns into the initial $cursor stage.
+ pipeCopy = _cachedPipeline;
+ } else {
+ // The plan does not require reading from the sub-pipeline, so just include the
+ // serialization in the explain output.
+ BSONArrayBuilder bab;
+ for (auto&& stage : _pipeline->serialize())
+ bab << stage;
+ return Value(DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
+ << "pipeline" << bab.arr())));
+ }
- auto pipeCopy = Pipeline::create(_pipeline->getSources(), _pipeline->getContext());
-
- // If we have already started getting documents from the sub-pipeline, this is an explain
- // that has done some execution. We don't want to serialize the mergeCursors stage, so if
- // we have a cursor stage we tell the process interface to remove it in the case it is a
- // mergeCursors stage.
+ invariant(pipeCopy);
BSONObj explainLocal =
- pExpCtx->mongoProcessInterface->preparePipelineAndExplain(pipeCopy.release(), *explain);
+ pExpCtx->mongoProcessInterface->preparePipelineAndExplain(pipeCopy, *explain);
LOGV2_DEBUG(4553501, 3, "$unionWith attached cursor to pipeline for explain");
// We expect this to be an explanation of a pipeline -- there should only be one field.
invariant(explainLocal.nFields() == 1);
+
return Value(
DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline"
<< explainLocal.firstElement())));
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index 05c3d5d8505..5686f569374 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -62,7 +62,14 @@ public:
DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
- : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {}
+ : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {
+ // If this pipeline is being run as part of explain, then cache a copy to use later during
+ // serialization.
+ if (expCtx->explain >= ExplainOptions::Verbosity::kExecStats) {
+ _cachedPipeline =
+ Pipeline::create(_pipeline->getSources(), _pipeline->getContext()).release();
+ }
+ }
~DocumentSourceUnionWith();
@@ -155,6 +162,7 @@ private:
void addViewDefinition(NamespaceString nss, std::vector<BSONObj> viewPipeline);
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
+ Pipeline* _cachedPipeline = nullptr;
bool _usedDisk = false;
ExecutionProgress _executionState = ExecutionProgress::kIteratingSource;
};
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index 3cae0550907..537d89749d1 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -187,6 +187,11 @@ BSONObj NonShardServerProcessInterface::preparePipelineAndExplain(
ownedPipeline = nullptr;
} else {
auto pipelineWithCursor = attachCursorSourceToPipelineForLocalRead(ownedPipeline);
+ // If we need execution stats, this runs the plan in order to gather the stats.
+ if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
+ while (pipelineWithCursor->getNext()) {
+ }
+ }
pipelineVec = pipelineWithCursor->writeExplainOps(verbosity);
}
BSONArrayBuilder bab;
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index c6a166b942c..58ac2b0129e 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -215,7 +215,6 @@ bool runAggregationMapReduce(OperationContext* opCtx,
// a pointer to the constructed ExpressionContext.
uassertStatusOK(cluster_aggregation_planner::dispatchPipelineAndMerge(
opCtx,
- expCtx->mongoProcessInterface->taskExecutor,
std::move(targeter),
std::move(serialized),
std::numeric_limits<long long>::max(),
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index d85252e309c..279c425ec68 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -335,7 +335,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: {
return cluster_aggregation_planner::dispatchPipelineAndMerge(
opCtx,
- expCtx->mongoProcessInterface->taskExecutor,
std::move(targeter),
request.serializeToCommandObj(),
request.getBatchSize(),
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 25b7a4e71f2..3a703dba1f7 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -698,7 +698,6 @@ Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
}
Status dispatchPipelineAndMerge(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor> executor,
AggregationTargeter targeter,
Document serializedCommand,
long long batchSize,
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 5beaed7b8f2..b1257914099 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -117,7 +117,6 @@ Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
* necessary on either mongos or a randomly designated shard.
*/
Status dispatchPipelineAndMerge(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor>,
AggregationTargeter targeter,
Document serializedCommand,
long long batchSize,