diff options
10 files changed, 107 insertions, 44 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml index 41902b5f51a..9597b8980d1 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml @@ -11,8 +11,6 @@ selector: # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections. - jstests/aggregation/sources/lookup/lookup_subpipeline.js - jstests/aggregation/sources/graphLookup/variables.js - # TODO SERVER-50246 Investigate and remove. - - jstests/aggregation/sources/unionWith/unionWith_explain.js exclude_with_any_tags: # Tests tagged with the following will fail because they assume collections are not sharded. - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml index 7dae8488b00..4f685288fb3 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_causally_consistent_passthrough.yml @@ -24,9 +24,6 @@ selector: # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections. - jstests/aggregation/sources/lookup/lookup_subpipeline.js - jstests/aggregation/sources/graphLookup/variables.js - # TODO SERVER-50246 Investigate and remove. - - jstests/aggregation/sources/unionWith/unionWith_explain.js - exclude_with_any_tags: # The next tag corresponds to the special error thrown by the set_read_preference_secondary.js # override when it refuses to replace the readPreference of a particular command. Above each tag diff --git a/jstests/aggregation/sources/unionWith/unionWith_explain.js b/jstests/aggregation/sources/unionWith/unionWith_explain.js index 4bc762c7c21..841ba8a31bb 100644 --- a/jstests/aggregation/sources/unionWith/unionWith_explain.js +++ b/jstests/aggregation/sources/unionWith/unionWith_explain.js @@ -11,6 +11,7 @@ "use strict"; load("jstests/aggregation/extras/utils.js"); // arrayEq, documentEq load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. +load("jstests/libs/analyze_plan.js"); // For getAggPlanStage. const testDB = db.getSiblingDB(jsTestName()); const collA = testDB.A; @@ -19,17 +20,25 @@ const collB = testDB.B; collB.drop(); const collC = testDB.C; collC.drop(); -for (let i = 0; i < 5; i++) { +const docsPerColl = 5; +for (let i = 0; i < docsPerColl; i++) { assert.commandWorked(collA.insert({a: i, val: i, groupKey: i})); assert.commandWorked(collB.insert({b: i, val: i * 2, groupKey: i})); assert.commandWorked(collC.insert({c: i, val: 10 - i, groupKey: i})); } -function getUnionWithStage(pipeline) { - for (let i = 0; i < pipeline.length; i++) { - const stage = pipeline[i]; - if (stage.hasOwnProperty("$unionWith")) { - return stage; +function getUnionWithStage(explain) { + if (explain.splitPipeline != null) { + // If there is only one shard, the whole pipeline will run on that shard. + const subAggPipe = explain.splitPipeline === null ? explain.shards["shard-rs0"].stages + : explain.splitPipeline.mergerPart; + for (let i = 0; i < subAggPipe.length; i++) { + const stage = subAggPipe[i]; + if (stage.hasOwnProperty("$unionWith")) { + return stage; + } } + } else { + return getAggPlanStage(explain, "$unionWith"); } } @@ -50,13 +59,11 @@ function docEqWithIgnoredFields(union, regular) { } function assertExplainEq(unionExplain, regularExplain) { + const unionStage = getUnionWithStage(unionExplain); + assert(unionStage); + const unionSubExplain = unionStage.$unionWith.pipeline; if (FixtureHelpers.isMongos(testDB)) { const splitPipe = unionExplain.splitPipeline; - // If there is only one shard, the whole pipeline will run on that shard. - const subAggPipe = - splitPipe === null ? unionExplain.shards["shard-rs0"].stages : splitPipe.mergerPart; - const unionStage = getUnionWithStage(subAggPipe); - const unionSubExplain = unionStage.$unionWith.pipeline; if (splitPipe === null) { assert.eq(unionSubExplain.splitPipeline, regularExplain.splitPipeline, @@ -72,8 +79,6 @@ function assertExplainEq(unionExplain, regularExplain) { assert(docEqWithIgnoredFields(unionSubExplain.shards, regularExplain.shards), buildErrorString(unionSubExplain, regularExplain, "shards")); } else { - const unionStage = getUnionWithStage(unionExplain.stages); - const unionSubExplain = unionStage.$unionWith.pipeline; if ("executionStats" in unionSubExplain[0].$cursor) { const unionSubStats = unionStage.$unionWith.pipeline[0].$cursor.executionStats.executionStages; @@ -143,7 +148,7 @@ assert.commandWorked(testDB.runCommand({ })); // Ensure that $unionWith can still execute explain if followed by a stage that calls dispose(). -var result = assert.commandWorked(testDB.runCommand({ +let result = assert.commandWorked(testDB.runCommand({ explain: { aggregate: collA.getName(), pipeline: [{$unionWith: collB.getName()}, {$limit: 1}], @@ -153,29 +158,61 @@ var result = assert.commandWorked(testDB.runCommand({ // Test that execution stats inner cursor is populated. result = collA.explain("executionStats").aggregate([{"$unionWith": collB.getName()}]); -var expectedResult = collB.explain("executionStats").aggregate([]); -assert(result.ok, result); -assert(expectedResult.ok, result); -// If we attached a fresh cursor stage, the number returned would still be zero. +assert.commandWorked(result); +let expectedResult = collB.explain("executionStats").aggregate([]); +assert.commandWorked(expectedResult); +let unionStage = getUnionWithStage(result); +assert(unionStage, result); if (FixtureHelpers.isMongos(testDB)) { - if (result.splitPipeline != null) { - const pipeline = result.splitPipeline.mergerPart; - const unionStage = getUnionWithStage(pipeline); - assert(docEqWithIgnoredFields(expectedResult.shards, unionStage.$unionWith.pipeline.shards), - buildErrorString(unionStage, expectedResult)); - } + assert(docEqWithIgnoredFields(expectedResult.shards, unionStage.$unionWith.pipeline.shards), + buildErrorString(unionStage, expectedResult)); + // TODO SERVER-50597 Fix unionWith nReturned stat in sharded cluster + // assert.eq(unionStage.nReturned, docsPerColl, unionStage); } else { - assert(result.stages[1].$unionWith.pipeline[0].$cursor.executionStats.nreturned != 0, result); + assert.eq(unionStage.nReturned, docsPerColl * 2, unionStage); + assert.eq(unionStage.$unionWith.pipeline[0].$cursor.executionStats.nReturned, + docsPerColl, + unionStage); +} + +// Test explain with executionStats when the $unionWith stage doesn't need to read from it's +// sub-pipeline. +result = collA.explain("executionStats").aggregate([{"$unionWith": collB.getName()}, {$limit: 1}]); +assert.commandWorked(result); +unionStage = getUnionWithStage(result); +assert(unionStage, result); +if (!FixtureHelpers.isSharded(collB)) { + assert.eq(unionStage.nReturned, 1, unionStage); + assert.eq(unionStage.$unionWith, {coll: "B", pipeline: []}, unionStage); +} + +// Test explain with executionStats when the $unionWith stage partially reads from it's +// sub-pipeline. +result = collA.explain("executionStats") + .aggregate([{"$unionWith": collB.getName()}, {$limit: docsPerColl + 1}]); +assert.commandWorked(result); +unionStage = getUnionWithStage(result); +assert(unionStage, result); +if (!FixtureHelpers.isSharded(collB)) { + assert.eq(unionStage.nReturned, docsPerColl + 1, unionStage); + // TODO SERVER-50597 Fix the executionStats of $unionWith sub-pipeline, the actual result should + // be 1 instead of docsPerColl. + assert.eq(unionStage.$unionWith.pipeline[0].$cursor.executionStats.nReturned, + docsPerColl, + unionStage); } // Test an index scan. const indexedColl = testDB.indexed; assert.commandWorked(indexedColl.createIndex({val: 1})); indexedColl.insert([{val: 0}, {val: 1}, {val: 2}, {val: 3}]); + result = collA.explain("executionStats").aggregate([ {$unionWith: {coll: indexedColl.getName(), pipeline: [{$match: {val: {$gt: 2}}}]}} ]); expectedResult = indexedColl.explain("executionStats").aggregate([{$match: {val: {$gt: 2}}}]); - assertExplainEq(result, expectedResult); + +// Test a nested $unionWith which itself should perform an index scan. +testPipeline([{$unionWith: {coll: indexedColl.getName(), pipeline: [{$match: {val: {$gt: 0}}}]}}]); })(); diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 63d7e124a64..294b16accc3 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -66,7 +66,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns); if (resolvedNs.pipeline.empty()) { - return Pipeline::parse(std::move(currentPipeline), unionExpCtx, validatorCallback); + return Pipeline::parse(currentPipeline, unionExpCtx, validatorCallback); } auto resolvedPipeline = std::move(resolvedNs.pipeline); resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size()); @@ -251,18 +251,40 @@ void DocumentSourceUnionWith::doDispose() { Value DocumentSourceUnionWith::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { + // There are several different possible states depending on the explain verbosity as well as + // the other stages in the pipeline: + // * If verbosity is queryPlanner, then the sub-pipeline should be untouched and we can + // explain it directly. + // * If verbosity is execStats or allPlansExecution, then whether or not to explain the + // sub-pipeline depends on if we've started reading from it. For instance, there could be a + // $limit stage after the $unionWith which results in only reading from the base collection + // branch and not the sub-pipeline. + Pipeline* pipeCopy = nullptr; + if (*explain == ExplainOptions::Verbosity::kQueryPlanner) { + pipeCopy = Pipeline::create(_pipeline->getSources(), _pipeline->getContext()).release(); + } else if (*explain >= ExplainOptions::Verbosity::kExecStats && + _executionState > ExecutionProgress::kIteratingSource) { + // We've either exhausted the sub-pipeline or at least started iterating it. Use the + // cached pipeline to get the explain output since the '_pipeline' may have been + // modified for any optimizations or pushdowns into the initial $cursor stage. + pipeCopy = _cachedPipeline; + } else { + // The plan does not require reading from the sub-pipeline, so just include the + // serialization in the explain output. + BSONArrayBuilder bab; + for (auto&& stage : _pipeline->serialize()) + bab << stage; + return Value(DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() + << "pipeline" << bab.arr()))); + } - auto pipeCopy = Pipeline::create(_pipeline->getSources(), _pipeline->getContext()); - - // If we have already started getting documents from the sub-pipeline, this is an explain - // that has done some execution. We don't want to serialize the mergeCursors stage, so if - // we have a cursor stage we tell the process interface to remove it in the case it is a - // mergeCursors stage. + invariant(pipeCopy); BSONObj explainLocal = - pExpCtx->mongoProcessInterface->preparePipelineAndExplain(pipeCopy.release(), *explain); + pExpCtx->mongoProcessInterface->preparePipelineAndExplain(pipeCopy, *explain); LOGV2_DEBUG(4553501, 3, "$unionWith attached cursor to pipeline for explain"); // We expect this to be an explanation of a pipeline -- there should only be one field. invariant(explainLocal.nFields() == 1); + return Value( DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline" << explainLocal.firstElement()))); diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index 05c3d5d8505..5686f569374 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -62,7 +62,14 @@ public: DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) - : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {} + : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) { + // If this pipeline is being run as part of explain, then cache a copy to use later during + // serialization. + if (expCtx->explain >= ExplainOptions::Verbosity::kExecStats) { + _cachedPipeline = + Pipeline::create(_pipeline->getSources(), _pipeline->getContext()).release(); + } + } ~DocumentSourceUnionWith(); @@ -155,6 +162,7 @@ private: void addViewDefinition(NamespaceString nss, std::vector<BSONObj> viewPipeline); std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; + Pipeline* _cachedPipeline = nullptr; bool _usedDisk = false; ExecutionProgress _executionState = ExecutionProgress::kIteratingSource; }; diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index 3cae0550907..537d89749d1 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -187,6 +187,11 @@ BSONObj NonShardServerProcessInterface::preparePipelineAndExplain( ownedPipeline = nullptr; } else { auto pipelineWithCursor = attachCursorSourceToPipelineForLocalRead(ownedPipeline); + // If we need execution stats, this runs the plan in order to gather the stats. + if (verbosity >= ExplainOptions::Verbosity::kExecStats) { + while (pipelineWithCursor->getNext()) { + } + } pipelineVec = pipelineWithCursor->writeExplainOps(verbosity); } BSONArrayBuilder bab; diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index c6a166b942c..58ac2b0129e 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -215,7 +215,6 @@ bool runAggregationMapReduce(OperationContext* opCtx, // a pointer to the constructed ExpressionContext. uassertStatusOK(cluster_aggregation_planner::dispatchPipelineAndMerge( opCtx, - expCtx->mongoProcessInterface->taskExecutor, std::move(targeter), std::move(serialized), std::numeric_limits<long long>::max(), diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index d85252e309c..279c425ec68 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -335,7 +335,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::kAnyShard: { return cluster_aggregation_planner::dispatchPipelineAndMerge( opCtx, - expCtx->mongoProcessInterface->taskExecutor, std::move(targeter), request.serializeToCommandObj(), request.getBatchSize(), diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 25b7a4e71f2..3a703dba1f7 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -698,7 +698,6 @@ Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, } Status dispatchPipelineAndMerge(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor, AggregationTargeter targeter, Document serializedCommand, long long batchSize, diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 5beaed7b8f2..b1257914099 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -117,7 +117,6 @@ Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, * necessary on either mongos or a randomly designated shard. */ Status dispatchPipelineAndMerge(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor>, AggregationTargeter targeter, Document serializedCommand, long long batchSize, |