diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2020-02-28 17:23:24 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-28 23:08:42 +0000 |
commit | f9c85d489e1d0dd7e9458d67af3a7675c0f2200c (patch) | |
tree | e6704a2b057ce51eb18f0cdc58c3962b8646c487 | |
parent | 3c5a2527afba733f4b358d28d0128033ba56d7cd (diff) | |
download | mongo-f9c85d489e1d0dd7e9458d67af3a7675c0f2200c.tar.gz |
Revert "SERVER-45535 Support explain in $unionWith"
This reverts commit d1429a5a66efa56fba4ab01dc4fa925b29371fdc.
15 files changed, 88 insertions, 351 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml index 85013bdfd16..b1cab5449a9 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_mongos_passthrough.yml @@ -15,9 +15,6 @@ selector: # Mongos does not support runtimeConstants. - jstests/aggregation/accumulators/internal_js_reduce_with_scope.js - jstests/aggregation/expressions/internal_js_emit_with_scope.js - # $unionWith explain output does not check whether the collection is sharded in a sharded - # cluster. - - jstests/aggregation/sources/unionWith/unionWith_explain.js exclude_with_any_tags: - assumes_against_mongod_not_mongos - requires_profiling diff --git a/jstests/aggregation/sources/unionWith/unionWith_explain.js b/jstests/aggregation/sources/unionWith/unionWith_explain.js deleted file mode 100644 index 111b78a3697..00000000000 --- a/jstests/aggregation/sources/unionWith/unionWith_explain.js +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Test that $unionWith's pipeline argument returns the same explain as an equivalent normal - * pipeline. - * @tags: [do_not_wrap_aggregations_in_facets] - */ - -(function() { -"use strict"; -load("jstests/aggregation/extras/utils.js"); // arrayEq, documentEq -load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - -const testDB = db.getSiblingDB(jsTestName()); -const collA = testDB.A; -collA.drop(); -const collB = testDB.B; -collB.drop(); -const collC = testDB.C; -collC.drop(); -for (let i = 0; i < 5; i++) { - assert.commandWorked(collA.insert({a: i, val: i, groupKey: i})); - assert.commandWorked(collB.insert({b: i, val: i * 2, groupKey: i})); - assert.commandWorked(collC.insert({c: i, val: 10 - i, groupKey: i})); -} -function getUnionWithStage(pipeline) { - for (let i = 0; i < pipeline.length; i++) { - const stage = pipeline[i]; - if (stage.hasOwnProperty("$unionWith")) { - return stage; - } - } -} - -function buildErrorString(unionExplain, realExplain, field) { - return "Explains did not match in field " + field + ". Union:\n" + tojson(unionExplain) + - "\nRegular:\n" + tojson(realExplain); -} - -function assertExplainEq(unionExplain, regularExplain) { - if (FixtureHelpers.isMongos(testDB)) { - const splitPipe = unionExplain.splitPipeline; - // If there is only one shard, the whole pipeline will run on that shard. - const subAggPipe = - splitPipe === null ? unionExplain.shards["shard-rs0"].stages : splitPipe.mergerPart; - const unionStage = getUnionWithStage(subAggPipe); - const unionSubExplain = unionStage.$unionWith.pipeline; - if (splitPipe === null) { - assert.eq(unionSubExplain.splitPipeline, - regularExplain.splitPipeline, - buildErrorString(unionSubExplain, regularExplain, "splitPipeline")); - } else { - assert(documentEq(unionSubExplain.splitPipeline, regularExplain.splitPipeline), - buildErrorString(unionSubExplain, regularExplain, "splitPipeline")); - } - assert.eq(unionSubExplain.mergeType, - regularExplain.mergeType, - buildErrorString(unionSubExplain, regularExplain, "mergeType")); - assert(documentEq(unionSubExplain.shards, regularExplain.shards), - buildErrorString(unionSubExplain, regularExplain, "shards")); - } else { - const unionStage = getUnionWithStage(unionExplain.stages); - const unionSubExplain = unionStage.$unionWith.pipeline; - const realExplain = regularExplain.stages; - assert(arrayEq(unionSubExplain, realExplain), - buildErrorString(unionSubExplain, realExplain)); - } -} -function testPipeline(pipeline) { - let unionResult = collA.aggregate([{$unionWith: {coll: collB.getName(), pipeline: pipeline}}], - {explain: true}); - let queryResult = collB.aggregate(pipeline, {explain: true}); - assertExplainEq(unionResult, queryResult); -} - -testPipeline([{$addFields: {bump: true}}]); - -testPipeline([{$group: {_id: "$groupKey", sum: {$sum: "$val"}}}]); - -testPipeline([{$group: {_id: "$groupKey", sum: {$sum: "$val"}}}, {$addFields: {bump: true}}]); - -testPipeline([{$unionWith: {coll: collC.getName()}}]); - -testPipeline([{$unionWith: {coll: collC.getName(), pipeline: [{$addFields: {bump: true}}]}}]); - -testPipeline([ - {$project: {firstProj: false}}, - {$group: {_id: "$groupKey", sum: {$sum: "$val"}}}, - {$match: {_id: 2}} -]); - -testPipeline([{$limit: 3}, {$sort: {_id: 1}}, {$addFields: {bump: true}}]); - -testPipeline([{ - $addFields: { - value: { - $function: { - body: function(base, pow) { - return Math.pow(base, pow); - }, - args: [2, 3], - lang: "js" - } - } - } -}]); -})(); diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 4b6f2c580db..43a6e22403d 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -226,28 +226,12 @@ void DocumentSourceUnionWith::doDispose() { void DocumentSourceUnionWith::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { - if (explain) { - auto ctx = _pipeline->getContext(); - auto containers = _pipeline->getSources(); - auto pipeCopy = Pipeline::create(containers, ctx); - auto explainObj = pExpCtx->mongoProcessInterface->attachCursorSourceAndExplain( - ctx, pipeCopy.release(), *explain); - LOGV2_DEBUG(4553501, 3, "$unionWith attached cursor to pipeline for explain"); - // We expect this to be an explanation of a pipeline -- there should only be one field. - invariant(explainObj.nFields() == 1); - Document doc = - DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline" - << explainObj.firstElement())); - array.push_back(Value(doc)); - return; - } else { - BSONArrayBuilder bab; - for (auto&& stage : _pipeline->serialize()) - bab << stage; - Document doc = DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() - << "pipeline" << bab.arr())); - array.push_back(Value(doc)); - } + BSONArrayBuilder bab; + for (auto&& stage : _pipeline->serialize()) + bab << stage; + Document doc = DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() + << "pipeline" << bab.arr())); + array.push_back(Value(doc)); } DepsTracker::State DocumentSourceUnionWith::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 095ae355f79..870717b4624 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -87,24 +87,6 @@ BSONObj pipelineFromJsonArray(const std::string& jsonArray) { return fromjson("{pipeline: " + jsonArray + "}"); } -class StubExplainInterface : public StubMongoProcessInterface { - BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) override { - BSONArrayBuilder bab; - auto pipelineVec = ownedPipeline->writeExplainOps(verbosity); - for (auto&& stage : pipelineVec) { - bab << stage; - } - return BSON("pipeline" << bab.arr()); - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - return pipeline; - } -}; void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, std::string outputPipeJson, std::string serializedPipeJson) { @@ -124,7 +106,6 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, AggregationRequest request(kTestNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(opCtx.get(), request); - ctx->mongoProcessInterface = std::make_shared<StubExplainInterface>(); TempDir tempDir("PipelineTest"); ctx->tempDir = tempDir.path(); @@ -1987,7 +1968,7 @@ TEST(PipelineOptimizationTest, MatchGetsPushedIntoBothChildrenOfUnion) { " pipeline: [" " {$match: {x: {$eq: 2}}}," " {$project: {y: false}}," - " {$sort: {sortKey: {score: 1}}}" + " {$sort: {score: 1}}" " ]" " }}" "]", diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index f9db8dffcdc..25210681f9d 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -265,16 +265,6 @@ public: bool allowTargetingShards = true) = 0; /** - * Accepts a pipeline and attaches a cursor source to it. Returns a BSONObj of the form - * {"pipeline": <explainOutput>}. Note that <explainOutput> can be an object (shardsvr) or an - * array (non_shardsvr). - */ - virtual BSONObj attachCursorSourceAndExplain( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) = 0; - - /** * Accepts a pipeline and returns a new one which will draw input from the underlying * collection _locally_. Trying to run this method on mongos is a programming error. Running * this method on a shard server will only return results which match the pipeline on that diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 0d1526bac3f..09fc0fc00d0 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -103,21 +103,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - bool allowTargetingShards) { - // On mongos we can't have local cursors. - return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards); -} - -BSONObj MongosProcessInterface::attachCursorSourceAndExplain( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) { - return sharded_agg_helpers::targetShardsForExplain(expCtx, ownedPipeline); -} - boost::optional<Document> MongosProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -356,4 +341,11 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( targetCollectionVersion}; } +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 423b9cfb620..16bfc0dfe80 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -148,10 +148,6 @@ public: MONGO_UNREACHABLE; } - BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) final; - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { // It is not meaningful to perform a "local read" on mongos. diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index 6a6ab6d1582..7a8c52966fa 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -172,19 +172,4 @@ void NonShardServerProcessInterface::dropCollection(OperationContext* opCtx, uassertStatusOK(mongo::dropCollectionForApplyOps( opCtx, ns, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); } - -BSONObj NonShardServerProcessInterface::attachCursorSourceAndExplain( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) { - auto pipelineWithCursor = attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); - BSONArrayBuilder bab; - auto pipelineVec = pipelineWithCursor->writeExplainOps(verbosity); - for (auto&& stage : pipelineVec) { - bab << stage; - } - - return BSON("pipeline" << bab.arr()); -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 53d6aa8bda9..d2dec9f412e 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -56,10 +56,6 @@ public: Pipeline* pipeline, bool allowTargetingShards) override; - BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity); - std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { // We'll never do shard filtering on a standalone. diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 8fab1b4d000..e8ea0b8521e 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -156,13 +156,6 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd return {{response.getN(), response.getNModified()}}; } -BSONObj ShardServerProcessInterface::attachCursorSourceAndExplain( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) { - return sharded_agg_helpers::targetShardsForExplain(expCtx, ownedPipeline); -} - std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const { auto collectionFilter = diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 6aab2103f97..592d8043017 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -87,10 +87,6 @@ public: bool multi, boost::optional<OID> targetEpoch) final; - BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) final; - std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 9d5dac4a503..f6a19dd9851 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -149,12 +149,6 @@ public: MONGO_UNREACHABLE; } - BSONObj attachCursorSourceAndExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - ExplainOptions::Verbosity verbosity) override { - MONGO_UNREACHABLE; - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 7f66213b15d..aace58f534b 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -49,7 +49,6 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/logv2/log.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/cluster_query_knobs_gen.h" @@ -582,9 +581,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { } } - -} // namespace - /** * For a sharded collection, establishes remote cursors on each shard that may have results, and * creates a DocumentSourceMergeCursors stage to merge the remote cursors. Returns a pipeline @@ -651,6 +647,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( return mergePipeline; } +} // namespace + boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx, const Pipeline* mergePipeline) { if (internalQueryDisableExchange.load()) { @@ -1003,113 +1001,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline, mergePipeline->addInitialSource(std::move(mergeCursorsStage)); } -Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - BSONObjBuilder* result) { - if (dispatchResults.splitPipeline) { - auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); - const char* mergeType = [&]() { - if (mergePipeline->canRunOnMongos()) { - if (mergeCtx->inMongos) { - return "mongos"; - } - return "local"; - } else if (dispatchResults.exchangeSpec) { - return "exchange"; - } else if (mergePipeline->needsPrimaryShardMerger()) { - return "primaryShard"; - } else { - return "anyShard"; - } - }(); - - *result << "mergeType" << mergeType; - - MutableDocument pipelinesDoc; - // We specify "queryPlanner" verbosity when building the output for "shardsPart" because - // execution stats are reported by each shard individually. - pipelinesDoc.addField("shardsPart", - Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( - ExplainOptions::Verbosity::kQueryPlanner))); - if (dispatchResults.exchangeSpec) { - BSONObjBuilder bob; - dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob); - bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards); - pipelinesDoc.addField("exchange", Value(bob.obj())); - } - // We specify "queryPlanner" verbosity because execution stats are not currently - // supported when building the output for "mergerPart". - pipelinesDoc.addField( - "mergerPart", - Value(mergePipeline->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner))); - - *result << "splitPipeline" << pipelinesDoc.freeze(); - } else { - *result << "splitPipeline" << BSONNULL; - } - - BSONObjBuilder shardExplains(result->subobjStart("shards")); - for (const auto& shardResult : dispatchResults.remoteExplainOutput) { - invariant(shardResult.shardHostAndPort); - - uassertStatusOK(shardResult.swResponse.getStatus()); - uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data)); - - auto shardId = shardResult.shardId.toString(); - const auto& data = shardResult.swResponse.getValue().data; - BSONObjBuilder explain(shardExplains.subobjStart(shardId)); - explain << "host" << shardResult.shardHostAndPort->toString(); - if (auto stagesElement = data["stages"]) { - explain << "stages" << stagesElement; - } else { - auto queryPlannerElement = data["queryPlanner"]; - uassert(51157, - str::stream() << "Malformed explain response received from shard " << shardId - << ": " << data.toString(), - queryPlannerElement); - explain << "queryPlanner" << queryPlannerElement; - if (auto executionStatsElement = data["executionStats"]) { - explain << "executionStats" << executionStatsElement; - } - } - } - return Status::OK(); -} - -BSONObj targetShardsForExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline) { - - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); - invariant(expCtx->explain); - // Generate the command object for the targeted shards. - auto rawStages = [&pipeline]() { - auto serialization = pipeline->serialize(); - std::vector<BSONObj> stages; - stages.reserve(serialization.size()); - - for (const auto& stageObj : serialization) { - invariant(stageObj.getType() == BSONType::Object); - stages.push_back(stageObj.getDocument().toBson()); - } - - return stages; - }(); - - AggregationRequest aggRequest(expCtx->ns, rawStages); - LiteParsedPipeline liteParsedPipeline(aggRequest); - auto hasChangeStream = liteParsedPipeline.hasChangeStream(); - auto shardDispatchResults = dispatchShardPipeline( - aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline)); - BSONObjBuilder explainBuilder; - auto appendStatus = - appendExplainResults(std::move(shardDispatchResults), expCtx, &explainBuilder); - uassertStatusOK(appendStatus); - return BSON("pipeline" << explainBuilder.done()); -} - StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, const NamespaceString& execNss) { // First, verify that there are shards present in the cluster. If not, then we return the diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 6001a0050ba..c8bea34d92c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -152,29 +152,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline, bool hasChangeStream); /** - * For a sharded collection, establishes remote cursors on each shard that may have results, and - * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline - * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might - * be this node itself. - */ -std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline); - -/** - * Targets the shards with an aggregation command built from `ownedPipeline` and explain set to - * true. Returns a BSONObj of the form {"pipeline": {<pipelineExplainOutput>}}. - */ -BSONObj targetShardsForExplain(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline); - -/** - * Appends the explain output of `dispatchResults` to `result`. - */ -Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, - const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - BSONObjBuilder* result); - -/** * Returns the proper routing table to use for targeting shards: either a historical routing table * based on the global read timestamp if there is an active transaction with snapshot level read * concern or the latest routing table otherwise. diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 0988e6fff7f..f47f3744196 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -461,6 +461,77 @@ ClusterClientCursorGuard convertPipelineToRouterStages( std::move(cursorParams)); } +Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, + const boost::intrusive_ptr<ExpressionContext>& mergeCtx, + BSONObjBuilder* result) { + if (dispatchResults.splitPipeline) { + auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); + const char* mergeType = [&]() { + if (mergePipeline->canRunOnMongos()) { + return "mongos"; + } else if (dispatchResults.exchangeSpec) { + return "exchange"; + } else if (mergePipeline->needsPrimaryShardMerger()) { + return "primaryShard"; + } else { + return "anyShard"; + } + }(); + + *result << "mergeType" << mergeType; + + MutableDocument pipelinesDoc; + // We specify "queryPlanner" verbosity when building the output for "shardsPart" because + // execution stats are reported by each shard individually. + pipelinesDoc.addField("shardsPart", + Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( + ExplainOptions::Verbosity::kQueryPlanner))); + if (dispatchResults.exchangeSpec) { + BSONObjBuilder bob; + dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob); + bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards); + pipelinesDoc.addField("exchange", Value(bob.obj())); + } + // We specify "queryPlanner" verbosity because execution stats are not currently + // supported when building the output for "mergerPart". + pipelinesDoc.addField( + "mergerPart", + Value(mergePipeline->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner))); + + *result << "splitPipeline" << pipelinesDoc.freeze(); + } else { + *result << "splitPipeline" << BSONNULL; + } + + BSONObjBuilder shardExplains(result->subobjStart("shards")); + for (const auto& shardResult : dispatchResults.remoteExplainOutput) { + invariant(shardResult.shardHostAndPort); + + uassertStatusOK(shardResult.swResponse.getStatus()); + uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data)); + + auto shardId = shardResult.shardId.toString(); + const auto& data = shardResult.swResponse.getValue().data; + BSONObjBuilder explain(shardExplains.subobjStart(shardId)); + explain << "host" << shardResult.shardHostAndPort->toString(); + if (auto stagesElement = data["stages"]) { + explain << "stages" << stagesElement; + } else { + auto queryPlannerElement = data["queryPlanner"]; + uassert(51157, + str::stream() << "Malformed explain response received from shard " << shardId + << ": " << data.toString(), + queryPlannerElement); + explain << "queryPlanner" << queryPlannerElement; + if (auto executionStatsElement = data["executionStats"]) { + explain << "executionStats" << executionStatsElement; + } + } + } + + return Status::OK(); +} + /** * Returns the output of the listCollections command filtered to the namespace 'nss'. */ @@ -697,8 +768,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, // If the operation is an explain, then we verify that it succeeded on all targeted // shards, write the results to the output builder, and return immediately. if (expCtx->explain) { - return sharded_agg_helpers::appendExplainResults( - std::move(shardDispatchResults), expCtx, result); + return appendExplainResults(std::move(shardDispatchResults), expCtx, result); } // If this isn't an explain, then we must have established cursors on at least one |