summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml3
-rw-r--r--jstests/aggregation/sources/unionWith/unionWith_explain.js105
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp28
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp21
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp22
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp15
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp7
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp113
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h23
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp74
15 files changed, 88 insertions, 351 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml
index 85013bdfd16..b1cab5449a9 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml
@@ -15,9 +15,6 @@ selector:
# Mongos does not support runtimeConstants.
- jstests/aggregation/accumulators/internal_js_reduce_with_scope.js
- jstests/aggregation/expressions/internal_js_emit_with_scope.js
- # $unionWith explain output does not check whether the collection is sharded in a sharded
- # cluster.
- - jstests/aggregation/sources/unionWith/unionWith_explain.js
exclude_with_any_tags:
- assumes_against_mongod_not_mongos
- requires_profiling
diff --git a/jstests/aggregation/sources/unionWith/unionWith_explain.js b/jstests/aggregation/sources/unionWith/unionWith_explain.js
deleted file mode 100644
index 111b78a3697..00000000000
--- a/jstests/aggregation/sources/unionWith/unionWith_explain.js
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Test that $unionWith's pipeline argument returns the same explain as an equivalent normal
- * pipeline.
- * @tags: [do_not_wrap_aggregations_in_facets]
- */
-
-(function() {
-"use strict";
-load("jstests/aggregation/extras/utils.js"); // arrayEq, documentEq
-load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
-const testDB = db.getSiblingDB(jsTestName());
-const collA = testDB.A;
-collA.drop();
-const collB = testDB.B;
-collB.drop();
-const collC = testDB.C;
-collC.drop();
-for (let i = 0; i < 5; i++) {
- assert.commandWorked(collA.insert({a: i, val: i, groupKey: i}));
- assert.commandWorked(collB.insert({b: i, val: i * 2, groupKey: i}));
- assert.commandWorked(collC.insert({c: i, val: 10 - i, groupKey: i}));
-}
-function getUnionWithStage(pipeline) {
- for (let i = 0; i < pipeline.length; i++) {
- const stage = pipeline[i];
- if (stage.hasOwnProperty("$unionWith")) {
- return stage;
- }
- }
-}
-
-function buildErrorString(unionExplain, realExplain, field) {
- return "Explains did not match in field " + field + ". Union:\n" + tojson(unionExplain) +
- "\nRegular:\n" + tojson(realExplain);
-}
-
-function assertExplainEq(unionExplain, regularExplain) {
- if (FixtureHelpers.isMongos(testDB)) {
- const splitPipe = unionExplain.splitPipeline;
- // If there is only one shard, the whole pipeline will run on that shard.
- const subAggPipe =
- splitPipe === null ? unionExplain.shards["shard-rs0"].stages : splitPipe.mergerPart;
- const unionStage = getUnionWithStage(subAggPipe);
- const unionSubExplain = unionStage.$unionWith.pipeline;
- if (splitPipe === null) {
- assert.eq(unionSubExplain.splitPipeline,
- regularExplain.splitPipeline,
- buildErrorString(unionSubExplain, regularExplain, "splitPipeline"));
- } else {
- assert(documentEq(unionSubExplain.splitPipeline, regularExplain.splitPipeline),
- buildErrorString(unionSubExplain, regularExplain, "splitPipeline"));
- }
- assert.eq(unionSubExplain.mergeType,
- regularExplain.mergeType,
- buildErrorString(unionSubExplain, regularExplain, "mergeType"));
- assert(documentEq(unionSubExplain.shards, regularExplain.shards),
- buildErrorString(unionSubExplain, regularExplain, "shards"));
- } else {
- const unionStage = getUnionWithStage(unionExplain.stages);
- const unionSubExplain = unionStage.$unionWith.pipeline;
- const realExplain = regularExplain.stages;
- assert(arrayEq(unionSubExplain, realExplain),
- buildErrorString(unionSubExplain, realExplain));
- }
-}
-function testPipeline(pipeline) {
- let unionResult = collA.aggregate([{$unionWith: {coll: collB.getName(), pipeline: pipeline}}],
- {explain: true});
- let queryResult = collB.aggregate(pipeline, {explain: true});
- assertExplainEq(unionResult, queryResult);
-}
-
-testPipeline([{$addFields: {bump: true}}]);
-
-testPipeline([{$group: {_id: "$groupKey", sum: {$sum: "$val"}}}]);
-
-testPipeline([{$group: {_id: "$groupKey", sum: {$sum: "$val"}}}, {$addFields: {bump: true}}]);
-
-testPipeline([{$unionWith: {coll: collC.getName()}}]);
-
-testPipeline([{$unionWith: {coll: collC.getName(), pipeline: [{$addFields: {bump: true}}]}}]);
-
-testPipeline([
- {$project: {firstProj: false}},
- {$group: {_id: "$groupKey", sum: {$sum: "$val"}}},
- {$match: {_id: 2}}
-]);
-
-testPipeline([{$limit: 3}, {$sort: {_id: 1}}, {$addFields: {bump: true}}]);
-
-testPipeline([{
- $addFields: {
- value: {
- $function: {
- body: function(base, pow) {
- return Math.pow(base, pow);
- },
- args: [2, 3],
- lang: "js"
- }
- }
- }
-}]);
-})();
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 4b6f2c580db..43a6e22403d 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -226,28 +226,12 @@ void DocumentSourceUnionWith::doDispose() {
void DocumentSourceUnionWith::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
- if (explain) {
- auto ctx = _pipeline->getContext();
- auto containers = _pipeline->getSources();
- auto pipeCopy = Pipeline::create(containers, ctx);
- auto explainObj = pExpCtx->mongoProcessInterface->attachCursorSourceAndExplain(
- ctx, pipeCopy.release(), *explain);
- LOGV2_DEBUG(4553501, 3, "$unionWith attached cursor to pipeline for explain");
- // We expect this to be an explanation of a pipeline -- there should only be one field.
- invariant(explainObj.nFields() == 1);
- Document doc =
- DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline"
- << explainObj.firstElement()));
- array.push_back(Value(doc));
- return;
- } else {
- BSONArrayBuilder bab;
- for (auto&& stage : _pipeline->serialize())
- bab << stage;
- Document doc = DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
- << "pipeline" << bab.arr()));
- array.push_back(Value(doc));
- }
+ BSONArrayBuilder bab;
+ for (auto&& stage : _pipeline->serialize())
+ bab << stage;
+ Document doc = DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
+ << "pipeline" << bab.arr()));
+ array.push_back(Value(doc));
}
DepsTracker::State DocumentSourceUnionWith::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 095ae355f79..870717b4624 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -87,24 +87,6 @@ BSONObj pipelineFromJsonArray(const std::string& jsonArray) {
return fromjson("{pipeline: " + jsonArray + "}");
}
-class StubExplainInterface : public StubMongoProcessInterface {
- BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) override {
- BSONArrayBuilder bab;
- auto pipelineVec = ownedPipeline->writeExplainOps(verbosity);
- for (auto&& stage : pipelineVec) {
- bab << stage;
- }
- return BSON("pipeline" << bab.arr());
- }
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
- return pipeline;
- }
-};
void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
std::string outputPipeJson,
std::string serializedPipeJson) {
@@ -124,7 +106,6 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
AggregationRequest request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(opCtx.get(), request);
- ctx->mongoProcessInterface = std::make_shared<StubExplainInterface>();
TempDir tempDir("PipelineTest");
ctx->tempDir = tempDir.path();
@@ -1987,7 +1968,7 @@ TEST(PipelineOptimizationTest, MatchGetsPushedIntoBothChildrenOfUnion) {
" pipeline: ["
" {$match: {x: {$eq: 2}}},"
" {$project: {y: false}},"
- " {$sort: {sortKey: {score: 1}}}"
+ " {$sort: {score: 1}}"
" ]"
" }}"
"]",
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index f9db8dffcdc..25210681f9d 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -265,16 +265,6 @@ public:
bool allowTargetingShards = true) = 0;
/**
- * Accepts a pipeline and attaches a cursor source to it. Returns a BSONObj of the form
- * {"pipeline": <explainOutput>}. Note that <explainOutput> can be an object (shardsvr) or an
- * array (non_shardsvr).
- */
- virtual BSONObj attachCursorSourceAndExplain(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) = 0;
-
- /**
* Accepts a pipeline and returns a new one which will draw input from the underlying
* collection _locally_. Trying to run this method on mongos is a programming error. Running
* this method on a shard server will only return results which match the pipeline on that
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 0d1526bac3f..09fc0fc00d0 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -103,21 +103,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- bool allowTargetingShards) {
- // On mongos we can't have local cursors.
- return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards);
-}
-
-BSONObj MongosProcessInterface::attachCursorSourceAndExplain(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) {
- return sharded_agg_helpers::targetShardsForExplain(expCtx, ownedPipeline);
-}
-
boost::optional<Document> MongosProcessInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -356,4 +341,11 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey(
targetCollectionVersion};
}
+std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 423b9cfb620..16bfc0dfe80 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -148,10 +148,6 @@ public:
MONGO_UNREACHABLE;
}
- BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) final;
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
// It is not meaningful to perform a "local read" on mongos.
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index 6a6ab6d1582..7a8c52966fa 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -172,19 +172,4 @@ void NonShardServerProcessInterface::dropCollection(OperationContext* opCtx,
uassertStatusOK(mongo::dropCollectionForApplyOps(
opCtx, ns, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops));
}
-
-BSONObj NonShardServerProcessInterface::attachCursorSourceAndExplain(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) {
- auto pipelineWithCursor = attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline);
- BSONArrayBuilder bab;
- auto pipelineVec = pipelineWithCursor->writeExplainOps(verbosity);
- for (auto&& stage : pipelineVec) {
- bab << stage;
- }
-
- return BSON("pipeline" << bab.arr());
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 53d6aa8bda9..d2dec9f412e 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -56,10 +56,6 @@ public:
Pipeline* pipeline,
bool allowTargetingShards) override;
- BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity);
-
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
// We'll never do shard filtering on a standalone.
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 8fab1b4d000..e8ea0b8521e 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -156,13 +156,6 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd
return {{response.getN(), response.getNModified()}};
}
-BSONObj ShardServerProcessInterface::attachCursorSourceAndExplain(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) {
- return sharded_agg_helpers::targetShardsForExplain(expCtx, ownedPipeline);
-}
-
std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
auto collectionFilter =
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index 6aab2103f97..592d8043017 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -87,10 +87,6 @@ public:
bool multi,
boost::optional<OID> targetEpoch) final;
- BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) final;
-
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final;
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 9d5dac4a503..f6a19dd9851 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -149,12 +149,6 @@ public:
MONGO_UNREACHABLE;
}
- BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- ExplainOptions::Verbosity verbosity) override {
- MONGO_UNREACHABLE;
- }
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 7f66213b15d..aace58f534b 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -49,7 +49,6 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/semantic_analysis.h"
#include "mongo/logv2/log.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/cluster_query_knobs_gen.h"
@@ -582,9 +581,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
}
}
-
-} // namespace
-
/**
* For a sharded collection, establishes remote cursors on each shard that may have results, and
* creates a DocumentSourceMergeCursors stage to merge the remote cursors. Returns a pipeline
@@ -651,6 +647,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
return mergePipeline;
}
+} // namespace
+
boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx,
const Pipeline* mergePipeline) {
if (internalQueryDisableExchange.load()) {
@@ -1003,113 +1001,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
mergePipeline->addInitialSource(std::move(mergeCursorsStage));
}
-Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- BSONObjBuilder* result) {
- if (dispatchResults.splitPipeline) {
- auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get();
- const char* mergeType = [&]() {
- if (mergePipeline->canRunOnMongos()) {
- if (mergeCtx->inMongos) {
- return "mongos";
- }
- return "local";
- } else if (dispatchResults.exchangeSpec) {
- return "exchange";
- } else if (mergePipeline->needsPrimaryShardMerger()) {
- return "primaryShard";
- } else {
- return "anyShard";
- }
- }();
-
- *result << "mergeType" << mergeType;
-
- MutableDocument pipelinesDoc;
- // We specify "queryPlanner" verbosity when building the output for "shardsPart" because
- // execution stats are reported by each shard individually.
- pipelinesDoc.addField("shardsPart",
- Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps(
- ExplainOptions::Verbosity::kQueryPlanner)));
- if (dispatchResults.exchangeSpec) {
- BSONObjBuilder bob;
- dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob);
- bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards);
- pipelinesDoc.addField("exchange", Value(bob.obj()));
- }
- // We specify "queryPlanner" verbosity because execution stats are not currently
- // supported when building the output for "mergerPart".
- pipelinesDoc.addField(
- "mergerPart",
- Value(mergePipeline->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)));
-
- *result << "splitPipeline" << pipelinesDoc.freeze();
- } else {
- *result << "splitPipeline" << BSONNULL;
- }
-
- BSONObjBuilder shardExplains(result->subobjStart("shards"));
- for (const auto& shardResult : dispatchResults.remoteExplainOutput) {
- invariant(shardResult.shardHostAndPort);
-
- uassertStatusOK(shardResult.swResponse.getStatus());
- uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data));
-
- auto shardId = shardResult.shardId.toString();
- const auto& data = shardResult.swResponse.getValue().data;
- BSONObjBuilder explain(shardExplains.subobjStart(shardId));
- explain << "host" << shardResult.shardHostAndPort->toString();
- if (auto stagesElement = data["stages"]) {
- explain << "stages" << stagesElement;
- } else {
- auto queryPlannerElement = data["queryPlanner"];
- uassert(51157,
- str::stream() << "Malformed explain response received from shard " << shardId
- << ": " << data.toString(),
- queryPlannerElement);
- explain << "queryPlanner" << queryPlannerElement;
- if (auto executionStatsElement = data["executionStats"]) {
- explain << "executionStats" << executionStatsElement;
- }
- }
- }
- return Status::OK();
-}
-
-BSONObj targetShardsForExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline) {
-
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
- invariant(expCtx->explain);
- // Generate the command object for the targeted shards.
- auto rawStages = [&pipeline]() {
- auto serialization = pipeline->serialize();
- std::vector<BSONObj> stages;
- stages.reserve(serialization.size());
-
- for (const auto& stageObj : serialization) {
- invariant(stageObj.getType() == BSONType::Object);
- stages.push_back(stageObj.getDocument().toBson());
- }
-
- return stages;
- }();
-
- AggregationRequest aggRequest(expCtx->ns, rawStages);
- LiteParsedPipeline liteParsedPipeline(aggRequest);
- auto hasChangeStream = liteParsedPipeline.hasChangeStream();
- auto shardDispatchResults = dispatchShardPipeline(
- aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
- BSONObjBuilder explainBuilder;
- auto appendStatus =
- appendExplainResults(std::move(shardDispatchResults), expCtx, &explainBuilder);
- uassertStatusOK(appendStatus);
- return BSON("pipeline" << explainBuilder.done());
-}
-
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss) {
// First, verify that there are shards present in the cluster. If not, then we return the
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 6001a0050ba..c8bea34d92c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -152,29 +152,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
bool hasChangeStream);
/**
- * For a sharded collection, establishes remote cursors on each shard that may have results, and
- * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline
- * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might
- * be this node itself.
- */
-std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline);
-
-/**
- * Targets the shards with an aggregation command built from `ownedPipeline` and explain set to
- * true. Returns a BSONObj of the form {"pipeline": {<pipelineExplainOutput>}}.
- */
-BSONObj targetShardsForExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline);
-
-/**
- * Appends the explain output of `dispatchResults` to `result`.
- */
-Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- BSONObjBuilder* result);
-
-/**
* Returns the proper routing table to use for targeting shards: either a historical routing table
* based on the global read timestamp if there is an active transaction with snapshot level read
* concern or the latest routing table otherwise.
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 0988e6fff7f..f47f3744196 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -461,6 +461,77 @@ ClusterClientCursorGuard convertPipelineToRouterStages(
std::move(cursorParams));
}
+Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
+ const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
+ BSONObjBuilder* result) {
+ if (dispatchResults.splitPipeline) {
+ auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get();
+ const char* mergeType = [&]() {
+ if (mergePipeline->canRunOnMongos()) {
+ return "mongos";
+ } else if (dispatchResults.exchangeSpec) {
+ return "exchange";
+ } else if (mergePipeline->needsPrimaryShardMerger()) {
+ return "primaryShard";
+ } else {
+ return "anyShard";
+ }
+ }();
+
+ *result << "mergeType" << mergeType;
+
+ MutableDocument pipelinesDoc;
+ // We specify "queryPlanner" verbosity when building the output for "shardsPart" because
+ // execution stats are reported by each shard individually.
+ pipelinesDoc.addField("shardsPart",
+ Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps(
+ ExplainOptions::Verbosity::kQueryPlanner)));
+ if (dispatchResults.exchangeSpec) {
+ BSONObjBuilder bob;
+ dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob);
+ bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards);
+ pipelinesDoc.addField("exchange", Value(bob.obj()));
+ }
+ // We specify "queryPlanner" verbosity because execution stats are not currently
+ // supported when building the output for "mergerPart".
+ pipelinesDoc.addField(
+ "mergerPart",
+ Value(mergePipeline->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)));
+
+ *result << "splitPipeline" << pipelinesDoc.freeze();
+ } else {
+ *result << "splitPipeline" << BSONNULL;
+ }
+
+ BSONObjBuilder shardExplains(result->subobjStart("shards"));
+ for (const auto& shardResult : dispatchResults.remoteExplainOutput) {
+ invariant(shardResult.shardHostAndPort);
+
+ uassertStatusOK(shardResult.swResponse.getStatus());
+ uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data));
+
+ auto shardId = shardResult.shardId.toString();
+ const auto& data = shardResult.swResponse.getValue().data;
+ BSONObjBuilder explain(shardExplains.subobjStart(shardId));
+ explain << "host" << shardResult.shardHostAndPort->toString();
+ if (auto stagesElement = data["stages"]) {
+ explain << "stages" << stagesElement;
+ } else {
+ auto queryPlannerElement = data["queryPlanner"];
+ uassert(51157,
+ str::stream() << "Malformed explain response received from shard " << shardId
+ << ": " << data.toString(),
+ queryPlannerElement);
+ explain << "queryPlanner" << queryPlannerElement;
+ if (auto executionStatsElement = data["executionStats"]) {
+ explain << "executionStats" << executionStatsElement;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
/**
* Returns the output of the listCollections command filtered to the namespace 'nss'.
*/
@@ -697,8 +768,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
// If the operation is an explain, then we verify that it succeeded on all targeted
// shards, write the results to the output builder, and return immediately.
if (expCtx->explain) {
- return sharded_agg_helpers::appendExplainResults(
- std::move(shardDispatchResults), expCtx, result);
+ return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
}
// If this isn't an explain, then we must have established cursors on at least one