diff options
author | Mihai Andrei <mihai.andrei@10gen.com> | 2022-01-12 10:11:47 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-27 14:59:17 +0000 |
commit | 7e54e175e5e1301d825f93e0ae8372d30b0896a0 (patch) | |
tree | 8bc36d8ed740d3e3368ab2a3c6e3329d5c47a00d | |
parent | 441dfbc1c41912bc505e46e4258b93bed040fda7 (diff) | |
download | mongo-7e54e175e5e1301d825f93e0ae8372d30b0896a0.tar.gz |
SERVER-62351 Introduce structs for query planner to be aware of multiple collections
-rw-r--r-- | jstests/noPassthrough/query_knobs_validation.js | 6 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 79 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 223 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 53 | ||||
-rw-r--r-- | src/mongo/db/query/multi_collection.h | 95 | ||||
-rw-r--r-- | src/mongo/db/query/query_knobs.idl | 7 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner_params.h | 18 |
14 files changed, 489 insertions, 122 deletions
diff --git a/jstests/noPassthrough/query_knobs_validation.js b/jstests/noPassthrough/query_knobs_validation.js index a647bc24df1..119d2029c23 100644 --- a/jstests/noPassthrough/query_knobs_validation.js +++ b/jstests/noPassthrough/query_knobs_validation.js @@ -47,7 +47,8 @@ const expectedParamDefaults = { internalQueryPlannerGenerateCoveredWholeIndexScans: false, internalQueryIgnoreUnknownJSONSchemaKeywords: false, internalQueryProhibitBlockingMergeOnMongoS: false, - internalQuerySlotBasedExecutionMaxStaticIndexScanIntervals: 1000 + internalQuerySlotBasedExecutionMaxStaticIndexScanIntervals: 1000, + internalEnableMultipleAutoGetCollections: false }; function assertDefaultParameterValues() { @@ -202,5 +203,8 @@ assertSetParameterFails("internalQuerySlotBasedExecutionMaxStaticIndexScanInterv assertSetParameterSucceeds("internalQueryForceClassicEngine", true); assertSetParameterSucceeds("internalQueryForceClassicEngine", false); +assertSetParameterSucceeds("internalEnableMultipleAutoGetCollections", true); +assertSetParameterSucceeds("internalEnableMultipleAutoGetCollections", false); + MongoRunner.stopMongod(conn); })(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e8aacc935bd..2bb3d6f9d3f 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -559,6 +559,13 @@ Status runAggregate(OperationContext* opCtx, // For operations on views, this will be the underlying namespace. NamespaceString nss = request.getNamespace(); + stdx::unordered_set<NamespaceString> secondaryExecNssList; + + // Determine if this aggregation has foreign collections that the execution subsystem needs + // to be aware of. + if (internalEnableMultipleAutoGetCollections.load()) { + liteParsedPipeline.getForeignExecutionNamespaces(secondaryExecNssList); + } // The collation to use for this aggregation. boost::optional to distinguish between the case // where the collation has not yet been resolved, and where it has been resolved to nullptr. @@ -573,6 +580,27 @@ Status runAggregate(OperationContext* opCtx, // re-running the expanded aggregation. boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> ctx; + // Vector of AutoGets for secondary collections. At the moment, this is internal to testing + // only because eventually, this will be replaced by 'AutoGetCollectionMulti'. + // TODO SERVER-62798: Replace this and the above AutoGet with 'AutoGetCollectionMulti'. + std::vector<std::unique_ptr<AutoGetCollectionForReadCommandMaybeLockFree>> secondaryCtx; + MultiCollection collections; + + auto initContext = [&](AutoGetCollectionViewMode m) -> void { + ctx.emplace(opCtx, nss, m); + for (const auto& ns : secondaryExecNssList) { + secondaryCtx.emplace_back( + std::make_unique<AutoGetCollectionForReadCommandMaybeLockFree>(opCtx, ns, m)); + } + collections = MultiCollection(ctx, secondaryCtx); + }; + + auto resetContext = [&]() -> void { + ctx.reset(); + secondaryCtx.clear(); + collections.clear(); + }; + std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; boost::intrusive_ptr<ExpressionContext> expCtx; auto curOp = CurOp::get(opCtx); @@ -629,7 +657,7 @@ Status runAggregate(OperationContext* opCtx, collatorToUseMatchesDefault = match; // Obtain collection locks on the execution namespace; that is, the oplog. - ctx.emplace(opCtx, nss, AutoGetCollectionViewMode::kViewsForbidden); + initContext(AutoGetCollectionViewMode::kViewsForbidden); } else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) { uassert(4928901, str::stream() << AggregateCommandRequest::kCollectionUUIDFieldName @@ -646,20 +674,24 @@ Status runAggregate(OperationContext* opCtx, opCtx, request.getCollation().get_value_or(BSONObj()), nullptr); collatorToUse.emplace(std::move(collator)); collatorToUseMatchesDefault = match; + tassert(6235101, "A collection-less aggregate should not take any locks", !ctx); + tassert(6235102, + "A collection-less aggregate should not take any secondary locks", + secondaryCtx.empty()); } else { // This is a regular aggregation. Lock the collection or view. - ctx.emplace(opCtx, nss, AutoGetCollectionViewMode::kViewsPermitted); - auto [collator, match] = PipelineD::resolveCollator( - opCtx, request.getCollation().get_value_or(BSONObj()), ctx->getCollection()); + initContext(AutoGetCollectionViewMode::kViewsPermitted); + auto [collator, match] = + PipelineD::resolveCollator(opCtx, + request.getCollation().get_value_or(BSONObj()), + collections.getMainCollection()); collatorToUse.emplace(std::move(collator)); collatorToUseMatchesDefault = match; - if (ctx->getCollection()) { - uuid = ctx->getCollection()->uuid(); + if (collections.hasMainCollection()) { + uuid = collections.getMainCollection()->uuid(); } } - const auto& collection = ctx ? ctx->getCollection() : CollectionPtr::null; - // If this is a view, resolve it by finding the underlying collection and stitching view // pipelines and this request's pipeline together. We then release our locks before // recursively calling runAggregate(), which will re-acquire locks on the underlying @@ -708,7 +740,7 @@ Status runAggregate(OperationContext* opCtx, uassertStatusOK(viewCatalog->resolveView(opCtx, nss, timeSeriesCollator)); // With the view & collation resolved, we can relinquish locks. - ctx.reset(); + resetContext(); // Set this operation's shard version for the underlying collection to unsharded. // This is prerequisite for future shard versioning checks. @@ -788,7 +820,7 @@ Status runAggregate(OperationContext* opCtx, // Prepare a PlanExecutor to provide input into the pipeline, if needed. auto attachExecutorCallback = - PipelineD::buildInnerQueryExecutor(collection, nss, &request, pipeline.get()); + PipelineD::buildInnerQueryExecutor(collections, nss, &request, pipeline.get()); if (canOptimizeAwayPipeline(pipeline.get(), attachExecutorCallback.second.get(), @@ -807,7 +839,7 @@ Status runAggregate(OperationContext* opCtx, // Mark that this query uses DocumentSource. curOp->debug().documentSourceUsed = true; // Complete creation of the initial $cursor stage, if needed. - PipelineD::attachInnerQueryExecutorToPipeline(collection, + PipelineD::attachInnerQueryExecutorToPipeline(collections, attachExecutorCallback.first, std::move(attachExecutorCallback.second), pipeline.get()); @@ -831,7 +863,7 @@ Status runAggregate(OperationContext* opCtx, // though, as we will be changing its lock policy to 'kLockExternally' (see details // below), and in order to execute the initial getNext() call in 'handleCursorCommand', // we need to hold the collection lock. - ctx.reset(); + resetContext(); } { diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 0047782344e..184659a45bc 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -137,6 +137,7 @@ void DocumentSourceCursor::loadBatch() { PlanExecutor::ExecState state; Document resultObj; + // TODO SERVER-62798: Replace this with 'AutoGetCollectionMulti'. boost::optional<AutoGetCollectionForReadMaybeLockFree> autoColl; tassert(5565800, "Expected PlanExecutor to use an external lock policy", diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 74b71070fbe..9b5f87803fc 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,6 +96,12 @@ public: } } + void getForeignExecutionNamespaces( + stdx::unordered_set<NamespaceString>& nssSet) const final { + tassert(6235100, "Expected foreignNss to be initialized for $lookup", _foreignNss); + nssSet.emplace(*_foreignNss); + } + PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const override final; diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index 3988bd3c289..378de678f6c 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -120,11 +120,20 @@ public: const BSONObj& spec); /** - * Returns the foreign collection(s) referenced by this stage, if any. + * Returns the foreign collection(s) referenced by this stage (that is, any collection that + * the pipeline references, but doesn't get locked), if any. */ virtual stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const = 0; /** + * Returns the foreign collections(s) referenced by this stage that potentially will be + * involved in query execution (that is, a collection that the pipeline references, and gets + * locked for the purposes of query execution), if any. + */ + virtual void getForeignExecutionNamespaces(stdx::unordered_set<NamespaceString>& nssSet) const { + } + + /** * Returns a list of the privileges required for this stage. */ virtual PrivilegeVector requiredPrivileges(bool isMongos, diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 696c31be0ba..4b37ff52eef 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -77,6 +77,23 @@ public: } /** + * Inserts the foreign collections(s) referenced by this stage that potentially will be involved + * in query execution, if any, into 'nssSet'. For example, consider the pipeline: + * + * [{$lookup: {from: "bar", localField: "a", foreignField: "b", as: "output"}}, + * {$unionWith: {coll: "foo", pipeline: [...]}}]. + * + * Here, "foo" is not considered a foreign execution namespace because "$unionWith" cannot be + * pushed down into the execution subsystem underneath the leading cursor stage, while "bar" + * is considered one because "$lookup" can be pushed down in certain cases. + */ + void getForeignExecutionNamespaces(stdx::unordered_set<NamespaceString>& nssSet) const { + for (auto&& spec : _stageSpecs) { + spec->getForeignExecutionNamespaces(nssSet); + } + } + + /** * Returns a list of the priviliges required for this pipeline. */ PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index e6119eccc83..67d52f4e103 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -146,7 +146,7 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGr StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( const intrusive_ptr<ExpressionContext>& expCtx, - const CollectionPtr& collection, + const MultiCollection& collections, const NamespaceString& nss, BSONObj queryObj, BSONObj projectionObj, @@ -158,6 +158,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const size_t plannerOpts, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, Pipeline* pipeline) { + const auto& collection = collections.getMainCollection(); auto findCommand = std::make_unique<FindCommandRequest>(nss); query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get()); findCommand->setFilter(queryObj.getOwned()); @@ -253,7 +254,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe auto permitYield = true; return getExecutorFind(expCtx->opCtx, - &collection, + collections, std::move(cq.getValue()), [&](auto* canonicalQuery) { canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown( @@ -619,10 +620,11 @@ PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, } std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> -PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, +PipelineD::buildInnerQueryExecutor(const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline) { + const auto& collection = collections.getMainCollection(); auto expCtx = pipeline->getContext(); // We will be modifying the source vector as we go. @@ -652,17 +654,18 @@ PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, const auto geoNearStage = sources.empty() ? nullptr : dynamic_cast<DocumentSourceGeoNear*>(sources.front().get()); if (geoNearStage) { - return buildInnerQueryExecutorGeoNear(collection, nss, aggRequest, pipeline); + return buildInnerQueryExecutorGeoNear(collections, nss, aggRequest, pipeline); } else { - return buildInnerQueryExecutorGeneric(collection, nss, aggRequest, pipeline); + return buildInnerQueryExecutorGeneric(collections, nss, aggRequest, pipeline); } } void PipelineD::attachInnerQueryExecutorToPipeline( - const CollectionPtr& collection, + const MultiCollection& collections, PipelineD::AttachExecutorCallback attachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, Pipeline* pipeline) { + auto& collection = collections.getMainCollection(); // If the pipeline doesn't need a $cursor stage, there will be no callback function and // PlanExecutor provided in the 'attachExecutorCallback' object, so we don't need to do // anything. @@ -672,14 +675,14 @@ void PipelineD::attachInnerQueryExecutorToPipeline( } void PipelineD::buildAndAttachInnerQueryExecutorToPipeline( - const CollectionPtr& collection, + const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline) { - auto callback = PipelineD::buildInnerQueryExecutor(collection, nss, aggRequest, pipeline); + auto callback = PipelineD::buildInnerQueryExecutor(collections, nss, aggRequest, pipeline); PipelineD::attachInnerQueryExecutorToPipeline( - collection, callback.first, std::move(callback.second), pipeline); + collections, callback.first, std::move(callback.second), pipeline); } namespace { @@ -808,7 +811,7 @@ auto buildProjectionForPushdown(const DepsTracker& deps, } // namespace std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> -PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection, +PipelineD::buildInnerQueryExecutorGeneric(const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline) { @@ -865,7 +868,7 @@ PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection, // Create the PlanExecutor. bool shouldProduceEmptyDocs = false; auto exec = uassertStatusOK(prepareExecutor(expCtx, - collection, + collections, nss, pipeline, sortStage, @@ -899,10 +902,11 @@ PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection, } std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> -PipelineD::buildInnerQueryExecutorGeoNear(const CollectionPtr& collection, +PipelineD::buildInnerQueryExecutorGeoNear(const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline) { + const auto& collection = collections.getMainCollection(); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "$geoNear requires a geo index to run, but " << nss.ns() << " does not exist", @@ -927,7 +931,7 @@ PipelineD::buildInnerQueryExecutorGeoNear(const CollectionPtr& collection, bool shouldProduceEmptyDocs = false; auto exec = uassertStatusOK( prepareExecutor(expCtx, - collection, + collections, nss, pipeline, nullptr, /* sortStage */ @@ -961,7 +965,7 @@ PipelineD::buildInnerQueryExecutorGeoNear(const CollectionPtr& collection, StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prepareExecutor( const intrusive_ptr<ExpressionContext>& expCtx, - const CollectionPtr& collection, + const MultiCollection& collections, const NamespaceString& nss, Pipeline* pipeline, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, @@ -1053,7 +1057,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // See if the query system can handle the $group and $sort stage using a DISTINCT_SCAN // (SERVER-9507). auto swExecutorGrouped = attemptToGetExecutor(expCtx, - collection, + collections, nss, queryObj, projObj, @@ -1102,7 +1106,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep isChangeStream ? expCtx->temporarilyChangeCollator(std::move(collatorForCursor)) : nullptr; return attemptToGetExecutor(expCtx, - collection, + collections, nss, queryObj, projObj, diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 0116c9d5163..340ce818ccd 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -41,6 +41,7 @@ #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/query/multi_collection.h" #include "mongo/db/query/plan_executor.h" namespace mongo { @@ -79,11 +80,12 @@ public: * PlanExecutor. For example, an early $match can be removed and replaced with a * DocumentSourceCursor containing a PlanExecutor that will do an index scan. * - * Callers must take care to ensure that 'nss' is locked in at least IS-mode. + * Callers must take care to ensure that 'nss' and each collection referenced in + * 'collections' is locked in at least IS-mode. * * When not null, 'aggRequest' provides access to pipeline command options such as hint. * - * The 'collection' parameter is optional and can be passed as 'nullptr'. + * The 'collections' parameter can reference any number of collections. * * This method will not add a $cursor stage to the pipeline, but will create a PlanExecutor and * a callback function. The executor and the callback can later be used to create the $cursor @@ -92,7 +94,7 @@ public: * 'nullptr'. */ static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> - buildInnerQueryExecutor(const CollectionPtr& collection, + buildInnerQueryExecutor(const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline); @@ -101,11 +103,11 @@ public: * Completes creation of the $cursor stage using the given callback pair obtained by calling * 'buildInnerQueryExecutor()' method. If the callback doesn't hold a valid PlanExecutor, the * method does nothing. Otherwise, a new $cursor stage is created using the given PlanExecutor, - * and added to the pipeline. The 'collection' parameter is optional and can be passed as - * 'nullptr'. + * and added to the pipeline. The 'collections' parameter can reference any number of + * collections. */ static void attachInnerQueryExecutorToPipeline( - const CollectionPtr& collection, + const MultiCollection& collection, AttachExecutorCallback attachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, Pipeline* pipeline); @@ -114,10 +116,10 @@ public: * This method combines 'buildInnerQueryExecutor()' and 'attachInnerQueryExecutorToPipeline()' * into a single call to support auto completion of the cursor stage creation process. Can be * used when the executor attachment phase doesn't need to be deferred and the $cursor stage - * can be created right after buiding the executor. + * can be created right after building the executor. */ static void buildAndAttachInnerQueryExecutorToPipeline( - const CollectionPtr& collection, + const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline); @@ -166,39 +168,17 @@ private: * the 'pipeline'. */ static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> - buildInnerQueryExecutorGeneric(const CollectionPtr& collection, + buildInnerQueryExecutorGeneric(const MultiCollection& collections, const NamespaceString& nss, const AggregateCommandRequest* aggRequest, Pipeline* pipeline); /** - * Build a PlanExecutor and prepare a callback to create a special DocumentSourceGeoNearCursor - * for the 'pipeline'. Unlike 'buildInnerQueryExecutorGeneric()', throws if 'collection' does - * not exist, as the $geoNearCursor requires a 2d or 2dsphere index. - */ - static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> - buildInnerQueryExecutorGeoNear(const CollectionPtr& collection, - const NamespaceString& nss, - const AggregateCommandRequest* aggRequest, - Pipeline* pipeline); - - /** - * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a - * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a - * storage engine supplied random cursor if the heuristics used for the optimization allows. If - * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor - * pointer. - */ - static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> - buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, - DocumentSourceInternalUnpackBucket* unpackBucketStage, - const CollectionPtr& collection, - Pipeline* pipeline); - - /** * Creates a PlanExecutor to be used in the initial cursor source. This function will try to * push down the $sort, $project, $match and $limit stages into the PlanStage layer whenever - * possible. In this case, these stages will be incorporated into the PlanExecutor. + * possible. In this case, these stages will be incorporated into the PlanExecutor. Note that + * this function takes a 'MultiCollection' because certain $lookup stages that reference + * multiple collections may be eligible for pushdown in the PlanExecutor. * * Set 'rewrittenGroupStage' when the pipeline uses $match+$sort+$group stages that are * compatible with a DISTINCT_SCAN plan that visits the first document in each group @@ -209,7 +189,7 @@ private: */ static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> prepareExecutor( const boost::intrusive_ptr<ExpressionContext>& expCtx, - const CollectionPtr& collection, + const MultiCollection& collections, const NamespaceString& nss, Pipeline* pipeline, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, @@ -222,6 +202,35 @@ private: bool* hasNoRequirements); /** + * Build a PlanExecutor and prepare a callback to create a special DocumentSourceGeoNearCursor + * for the 'pipeline'. Unlike 'buildInnerQueryExecutorGeneric()', throws if the main collection + * defined on 'collections' does not exist, as the $geoNearCursor requires a 2d or 2dsphere + * index. + * + * Note that this method takes a 'MultiCollection' even though DocumentSourceGeoNearCursor + * only operates over a single collection because the underlying execution API expects a + * 'MultiCollection'. + */ + static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> + buildInnerQueryExecutorGeoNear(const MultiCollection& collections, + const NamespaceString& nss, + const AggregateCommandRequest* aggRequest, + Pipeline* pipeline); + + /** + * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a + * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a + * storage engine supplied random cursor if the heuristics used for the optimization allows. If + * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor + * pointer. + */ + static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> + buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, + DocumentSourceInternalUnpackBucket* unpackBucketStage, + const CollectionPtr& collection, + Pipeline* pipeline); + + /** * Returns a 'PlanExecutor' which uses a random cursor to sample documents if successful as * determined by the boolean. Returns {} if the storage engine doesn't support random cursors, * or if 'sampleSize' is a large enough percentage of the collection. diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 47a318996ff..3e74385e31a 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -318,8 +318,9 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* Date_t::max(), AutoStatsTracker::LogMode::kUpdateTop); + MultiCollection holder{autoColl->getCollection()}; PipelineD::buildAndAttachInnerQueryExecutorToPipeline( - autoColl->getCollection(), expCtx->ns, nullptr, pipeline.get()); + holder, expCtx->ns, nullptr, pipeline.get()); return pipeline; } diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 18c4ca9972a..a1a0716d29e 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -271,20 +271,17 @@ void applyIndexFilters(const CollectionPtr& collection, } } -void fillOutPlannerParams(OperationContext* opCtx, - const CollectionPtr& collection, - CanonicalQuery* canonicalQuery, - QueryPlannerParams* plannerParams) { - invariant(canonicalQuery); - bool apiStrict = APIParameters::get(opCtx).getAPIStrict().value_or(false); - // If it's not NULL, we may have indices. Access the catalog and fill out IndexEntry(s) - std::unique_ptr<IndexCatalog::IndexIterator> ii = - collection->getIndexCatalog()->getIndexIterator(opCtx, false); +void fillOutIndexEntries(OperationContext* opCtx, + bool apiStrict, + CanonicalQuery* canonicalQuery, + const CollectionPtr& collection, + std::vector<IndexEntry>& entries) { + auto ii = collection->getIndexCatalog()->getIndexIterator(opCtx, false); while (ii->more()) { const IndexCatalogEntry* ice = ii->next(); - // Indexes excluded from API version 1 should _not_ be used for planning if apiStrict is set - // to true. + // Indexes excluded from API version 1 should _not_ be used for planning if apiStrict is + // set to true. auto indexType = ice->descriptor()->getIndexType(); if (apiStrict && (indexType == IndexType::INDEX_HAYSTACK || indexType == IndexType::INDEX_TEXT || @@ -294,9 +291,20 @@ void fillOutPlannerParams(OperationContext* opCtx, // Skip the addition of hidden indexes to prevent use in query planning. if (ice->descriptor()->hidden()) continue; - plannerParams->indices.push_back( + entries.emplace_back( indexEntryFromIndexCatalogEntry(opCtx, collection, *ice, canonicalQuery)); } +} + +void fillOutPlannerParams(OperationContext* opCtx, + const CollectionPtr& collection, + CanonicalQuery* canonicalQuery, + QueryPlannerParams* plannerParams) { + invariant(canonicalQuery); + bool apiStrict = APIParameters::get(opCtx).getAPIStrict().value_or(false); + + // If it's not NULL, we may have indices. Access the catalog and fill out IndexEntry(s) + fillOutIndexEntries(opCtx, apiStrict, canonicalQuery, collection, plannerParams->indices); // If query supports index filters, filter params.indices by indices in query settings. // Ignore index filters when it is possible to use the id-hack. @@ -363,6 +371,34 @@ void fillOutPlannerParams(OperationContext* opCtx, } } +void fillOutSecondaryCollectionsInformation(OperationContext* opCtx, + const MultiCollection& collections, + CanonicalQuery* canonicalQuery, + QueryPlannerParams* plannerParams) { + bool apiStrict = APIParameters::get(opCtx).getAPIStrict().value_or(false); + for (auto& [collName, secondaryColl] : collections.getSecondaryCollections()) { + plannerParams->secondaryCollectionsInfo.emplace_back(SecondaryCollectionInfo()); + auto& info = plannerParams->secondaryCollectionsInfo.back(); + info.nss = collName; + if (secondaryColl) { + fillOutIndexEntries(opCtx, apiStrict, canonicalQuery, secondaryColl, info.indexes); + info.isSharded = secondaryColl.isSharded(); + info.approximateCollectionSizeBytes = secondaryColl.get()->dataSize(opCtx); + info.isView = !secondaryColl.get()->getCollectionOptions().viewOn.empty(); + } else { + info.exists = false; + } + } +} + +void fillOutPlannerParams(OperationContext* opCtx, + const MultiCollection& collections, + CanonicalQuery* canonicalQuery, + QueryPlannerParams* plannerParams) { + fillOutPlannerParams(opCtx, collections.getMainCollection(), canonicalQuery, plannerParams); + fillOutSecondaryCollectionsInformation(opCtx, collections, canonicalQuery, plannerParams); +} + bool shouldWaitForOplogVisibility(OperationContext* opCtx, const CollectionPtr& collection, bool tailable) { @@ -518,20 +554,26 @@ template <typename KeyType, typename PlanStageType, typename ResultType> class PrepareExecutionHelper { public: PrepareExecutionHelper(OperationContext* opCtx, - const CollectionPtr& collection, CanonicalQuery* cq, PlanYieldPolicy* yieldPolicy, size_t plannerOptions) - : _opCtx{opCtx}, - _collection{collection}, - _cq{cq}, - _yieldPolicy{yieldPolicy}, - _plannerOptions{plannerOptions} { + : _opCtx{opCtx}, _cq{cq}, _yieldPolicy{yieldPolicy}, _plannerOptions{plannerOptions} { invariant(_cq); } + /** + * Returns a reference to the main collection that is targetted by this query. + */ + virtual const CollectionPtr& getMainCollection() const = 0; + + /** + * Fills out planning params needed for the target execution engine. + */ + virtual void fillOutPlannerParamsHelper(QueryPlannerParams* plannerParams) = 0; + StatusWith<std::unique_ptr<ResultType>> prepare() { - if (!_collection) { + const auto& mainColl = getMainCollection(); + if (!mainColl) { LOGV2_DEBUG(20921, 2, "Collection does not exist. Using EOF plan", @@ -551,7 +593,7 @@ public: // Fill out the planning params. We use these for both cached solutions and non-cached. QueryPlannerParams plannerParams; plannerParams.options = _plannerOptions; - fillOutPlannerParams(_opCtx, _collection, _cq, &plannerParams); + fillOutPlannerParamsHelper(&plannerParams); tassert( 5842901, "Fast count queries aren't supported in SBE, therefore, should never lower parts of " @@ -561,14 +603,13 @@ public: // If the canonical query does not have a user-specified collation and no one has given the // CanonicalQuery a collation already, set it from the collection default. if (_cq->getFindCommandRequest().getCollation().isEmpty() && - _cq->getCollator() == nullptr && _collection->getDefaultCollator()) { - _cq->setCollator(_collection->getDefaultCollator()->clone()); + _cq->getCollator() == nullptr && mainColl->getDefaultCollator()) { + _cq->setCollator(mainColl->getDefaultCollator()->clone()); } // If we have an _id index we can use an idhack plan. - if (const IndexDescriptor* idIndexDesc = - _collection->getIndexCatalog()->findIdIndex(_opCtx); - idIndexDesc && isIdHackEligibleQuery(_collection, *_cq)) { + if (const IndexDescriptor* idIndexDesc = mainColl->getIndexCatalog()->findIdIndex(_opCtx); + idIndexDesc && isIdHackEligibleQuery(mainColl, *_cq)) { LOGV2_DEBUG( 20922, 2, "Using idhack", "canonicalQuery"_attr = redact(_cq->toStringShort())); // If an IDHACK plan is not supported, we will use the normal plan generation process @@ -580,13 +621,13 @@ public: } // Tailable: If the query requests tailable the collection must be capped. - if (_cq->getFindCommandRequest().getTailable() && !_collection->isCapped()) { + if (_cq->getFindCommandRequest().getTailable() && !mainColl->isCapped()) { return Status(ErrorCodes::BadValue, str::stream() << "error processing query: " << _cq->toString() << " tailable cursor requested on non capped collection"); } - auto planCacheKey = plan_cache_key_factory::make<KeyType>(*_cq, _collection); + auto planCacheKey = plan_cache_key_factory::make<KeyType>(*_cq, mainColl); // Fill in some opDebug information, unless it has already been filled by an outer pipeline. OpDebug& opDebug = CurOp::get(_opCtx)->debug(); if (!opDebug.queryHash) { @@ -709,7 +750,6 @@ protected: const QueryPlannerParams& plannerParams) = 0; OperationContext* _opCtx; - const CollectionPtr& _collection; CanonicalQuery* _cq; PlanYieldPolicy* _yieldPolicy; const size_t _plannerOptions; @@ -729,9 +769,18 @@ public: CanonicalQuery* cq, PlanYieldPolicy* yieldPolicy, size_t plannerOptions) - : PrepareExecutionHelper{opCtx, collection, std::move(cq), yieldPolicy, plannerOptions}, + : PrepareExecutionHelper{opCtx, std::move(cq), yieldPolicy, plannerOptions}, + _collection(collection), _ws{ws} {} + const CollectionPtr& getMainCollection() const override { + return _collection; + } + + virtual void fillOutPlannerParamsHelper(QueryPlannerParams* plannerParams) override { + fillOutPlannerParams(_opCtx, getMainCollection(), _cq, plannerParams); + } + protected: std::unique_ptr<PlanStage> buildExecutableTree(const QuerySolution& solution) const final { return stage_builder::buildClassicExecutableTree(_opCtx, _collection, *_cq, solution, _ws); @@ -880,6 +929,7 @@ protected: } private: + const CollectionPtr& _collection; WorkingSet* _ws; }; @@ -894,10 +944,29 @@ class SlotBasedPrepareExecutionHelper final public: using PrepareExecutionHelper::PrepareExecutionHelper; + SlotBasedPrepareExecutionHelper(OperationContext* opCtx, + const MultiCollection& collections, + CanonicalQuery* cq, + PlanYieldPolicy* yieldPolicy, + size_t plannerOptions) + : PrepareExecutionHelper{opCtx, std::move(cq), yieldPolicy, plannerOptions}, + _collections(collections) {} + + const CollectionPtr& getMainCollection() const override { + return _collections.getMainCollection(); + } + + void fillOutPlannerParamsHelper(QueryPlannerParams* plannerParams) override { + fillOutPlannerParams(_opCtx, _collections, _cq, plannerParams); + } + std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> buildExecutableTree( const QuerySolution& solution) const final { + // TODO SERVER-58437 We don't pass '_collections' to the function below because at the + // moment, no pushdown is actually happening. This should be changed once the logic for + // pushdown is implemented. return stage_builder::buildSlotBasedExecutableTree( - _opCtx, _collection, *_cq, solution, _yieldPolicy); + _opCtx, getMainCollection(), *_cq, solution, _yieldPolicy); } protected: @@ -916,9 +985,10 @@ protected: } invariant(descriptor->getEntry()); + const auto& mainColl = _collections.getMainCollection(); std::unique_ptr<QuerySolutionNode> root = [&]() { auto ixScan = std::make_unique<IndexScanNode>( - indexEntryFromIndexCatalogEntry(_opCtx, _collection, *descriptor->getEntry(), _cq)); + indexEntryFromIndexCatalogEntry(_opCtx, mainColl, *descriptor->getEntry(), _cq)); const auto bsonKey = IndexBoundsBuilder::objFromElement(_cq->getQueryObj()["_id"], _cq->getCollator()); @@ -1012,15 +1082,15 @@ protected: // TODO SERVER-61314: Remove this function when "featureFlagSbePlanCache" is removed. std::unique_ptr<SlotBasedPrepareExecutionResult> buildCachedPlanFromClassicCache( const QueryPlannerParams& plannerParams) { - auto planCacheKey = plan_cache_key_factory::make<PlanCacheKey>(*_cq, _collection); + const auto& mainColl = getMainCollection(); + auto planCacheKey = plan_cache_key_factory::make<PlanCacheKey>(*_cq, mainColl); OpDebug& opDebug = CurOp::get(_opCtx)->debug(); if (!opDebug.planCacheKey) { opDebug.planCacheKey = planCacheKey.planCacheKeyHash(); } // Try to look up a cached solution for the query. - if (auto cs = CollectionQueryInfo::get(_collection) - .getPlanCache() - ->getCacheEntryIfActive(planCacheKey)) { + if (auto cs = CollectionQueryInfo::get(mainColl).getPlanCache()->getCacheEntryIfActive( + planCacheKey)) { // We have a CachedSolution. Have the planner turn it into a QuerySolution. auto statusWithQs = QueryPlanner::planFromCache(*_cq, plannerParams, *cs); @@ -1067,11 +1137,14 @@ protected: } return result; } + +private: + const MultiCollection& _collections; }; StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getClassicExecutor( OperationContext* opCtx, - const CollectionPtr* collection, + const CollectionPtr& collection, std::unique_ptr<CanonicalQuery> canonicalQuery, PlanYieldPolicy::YieldPolicy yieldPolicy, size_t plannerOptions) { @@ -1082,7 +1155,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getClassicExecu } auto ws = std::make_unique<WorkingSet>(); ClassicPrepareExecutionHelper helper{ - opCtx, *collection, ws.get(), canonicalQuery.get(), nullptr, plannerOptions}; + opCtx, collection, ws.get(), canonicalQuery.get(), nullptr, plannerOptions}; auto executionResult = helper.prepare(); if (!executionResult.isOK()) { return executionResult.getStatus(); @@ -1095,7 +1168,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getClassicExecu return plan_executor_factory::make(std::move(canonicalQuery), std::move(ws), std::move(root), - collection, + &collection, yieldPolicy, plannerOptions, {}, @@ -1109,7 +1182,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getClassicExecu */ std::unique_ptr<sbe::RuntimePlanner> makeRuntimePlannerIfNeeded( OperationContext* opCtx, - const CollectionPtr& collection, + const MultiCollection& collections, CanonicalQuery* canonicalQuery, size_t numSolutions, boost::optional<size_t> decisionWorks, @@ -1117,11 +1190,13 @@ std::unique_ptr<sbe::RuntimePlanner> makeRuntimePlannerIfNeeded( PlanYieldPolicySBE* yieldPolicy, size_t plannerOptions, std::unique_ptr<plan_cache_debug_info::DebugInfoSBE> debugInfo) { + const auto& mainColl = collections.getMainCollection(); + // If we have multiple solutions, we always need to do the runtime planning. if (numSolutions > 1) { invariant(!needsSubplanning && !decisionWorks); return std::make_unique<sbe::MultiPlanner>( - opCtx, collection, *canonicalQuery, PlanCachingMode::AlwaysCache, yieldPolicy); + opCtx, mainColl, *canonicalQuery, PlanCachingMode::AlwaysCache, yieldPolicy); } // If the query can be run as sub-queries, the needSubplanning flag will be set to true and @@ -1132,10 +1207,10 @@ std::unique_ptr<sbe::RuntimePlanner> makeRuntimePlannerIfNeeded( QueryPlannerParams plannerParams; plannerParams.options = plannerOptions; - fillOutPlannerParams(opCtx, collection, canonicalQuery, &plannerParams); + fillOutPlannerParams(opCtx, collections, canonicalQuery, &plannerParams); return std::make_unique<sbe::SubPlanner>( - opCtx, collection, *canonicalQuery, plannerParams, yieldPolicy); + opCtx, mainColl, *canonicalQuery, plannerParams, yieldPolicy); } invariant(numSolutions == 1); @@ -1147,10 +1222,10 @@ std::unique_ptr<sbe::RuntimePlanner> makeRuntimePlannerIfNeeded( if (decisionWorks) { QueryPlannerParams plannerParams; plannerParams.options = plannerOptions; - fillOutPlannerParams(opCtx, collection, canonicalQuery, &plannerParams); + fillOutPlannerParams(opCtx, collections, canonicalQuery, &plannerParams); return std::make_unique<sbe::CachedSolutionPlanner>(opCtx, - collection, + mainColl, *canonicalQuery, plannerParams, *decisionWorks, @@ -1179,7 +1254,7 @@ std::unique_ptr<PlanYieldPolicySBE> makeSbeYieldPolicy( StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExecutor( OperationContext* opCtx, - const CollectionPtr* collection, + const MultiCollection& collections, std::unique_ptr<CanonicalQuery> cq, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, PlanYieldPolicy::YieldPolicy requestedYieldPolicy, @@ -1193,11 +1268,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe if (!opDebug.classicEngineUsed) { opDebug.classicEngineUsed = false; } + + const auto mainColl = &collections.getMainCollection(); + // Analyze the provided query and build the list of candidate plans for it. auto nss = cq->nss(); - auto yieldPolicy = makeSbeYieldPolicy(opCtx, requestedYieldPolicy, collection, nss); + auto yieldPolicy = makeSbeYieldPolicy(opCtx, requestedYieldPolicy, mainColl, nss); SlotBasedPrepareExecutionHelper helper{ - opCtx, *collection, cq.get(), yieldPolicy.get(), plannerOptions}; + opCtx, collections, cq.get(), yieldPolicy.get(), plannerOptions}; auto planningResultWithStatus = helper.prepare(); if (!planningResultWithStatus.isOK()) { return planningResultWithStatus.getStatus(); @@ -1209,7 +1287,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe // In some circumstances (e.g. when have multiple candidate plans or using a cached one), we // might need to execute the plan(s) to pick the best one or to confirm the choice. if (auto planner = makeRuntimePlannerIfNeeded(opCtx, - *collection, + collections, cq.get(), solutions.size(), planningResult->decisionWorks(), @@ -1223,7 +1301,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe return plan_executor_factory::make(opCtx, std::move(cq), std::move(candidates), - collection, + mainColl, plannerOptions, std::move(nss), std::move(yieldPolicy)); @@ -1240,7 +1318,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe std::move(cq), std::move(solutions[0]), std::move(roots[0]), - collection, + mainColl, plannerOptions, std::move(nss), std::move(yieldPolicy)); @@ -1249,22 +1327,39 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( OperationContext* opCtx, - const CollectionPtr* collection, + const MultiCollection& collections, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, PlanYieldPolicy::YieldPolicy yieldPolicy, size_t plannerOptions) { + const auto& mainColl = collections.getMainCollection(); canonicalQuery->setSbeCompatible( - sbe::isQuerySbeCompatible(collection, canonicalQuery.get(), plannerOptions)); + sbe::isQuerySbeCompatible(&mainColl, canonicalQuery.get(), plannerOptions)); return !canonicalQuery->getForceClassicEngine() && canonicalQuery->isSbeCompatible() ? getSlotBasedExecutor(opCtx, - collection, + collections, std::move(canonicalQuery), extractAndAttachPipelineStages, yieldPolicy, plannerOptions) : getClassicExecutor( - opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); + opCtx, mainColl, std::move(canonicalQuery), yieldPolicy, plannerOptions); +} + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( + OperationContext* opCtx, + const CollectionPtr* collection, + std::unique_ptr<CanonicalQuery> canonicalQuery, + std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + PlanYieldPolicy::YieldPolicy yieldPolicy, + size_t plannerOptions) { + MultiCollection multi{collection}; + return getExecutor(opCtx, + multi, + std::move(canonicalQuery), + extractAndAttachPipelineStages, + yieldPolicy, + plannerOptions); } // @@ -1273,7 +1368,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, - const CollectionPtr* collection, + const MultiCollection& collections, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, bool permitYield, @@ -1287,13 +1382,29 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind } return getExecutor(opCtx, - collection, + collections, std::move(canonicalQuery), extractAndAttachPipelineStages, yieldPolicy, plannerOptions); } +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( + OperationContext* opCtx, + const CollectionPtr* coll, + std::unique_ptr<CanonicalQuery> canonicalQuery, + std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + bool permitYield, + size_t plannerOptions) { + MultiCollection multi{*coll}; + return getExecutorFind(opCtx, + multi, + std::move(canonicalQuery), + extractAndAttachPipelineStages, + permitYield, + plannerOptions); +} + namespace { /** diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 84d1ca7f82a..c06caf23f9d 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -38,6 +38,7 @@ #include "mongo/db/ops/update_request.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/count_command_gen.h" +#include "mongo/db/query/multi_collection.h" #include "mongo/db/query/parsed_distinct.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_planner_params.h" @@ -72,6 +73,23 @@ void filterAllowedIndexEntries(const AllowedIndicesFilter& allowedIndicesFilter, std::vector<IndexEntry>* indexEntries); /** + * Fills out 'entries' with information about the indexes on 'collection'. + */ +void fillOutIndexEntries(OperationContext* opCtx, + bool apiStrict, + CanonicalQuery* canonicalQuery, + const CollectionPtr& collection, + std::vector<IndexEntry>& entries); + +/** + * Fills out information about secondary collections held by 'collections' in 'plannerParams'. + */ +void fillOutSecondaryCollectionsInformation(OperationContext* opCtx, + const MultiCollection& collections, + CanonicalQuery* canonicalQuery, + QueryPlannerParams* plannerParams); + +/** * Fill out the provided 'plannerParams' for the 'canonicalQuery' operating on the collection * 'collection'. Exposed for testing. */ @@ -79,6 +97,17 @@ void fillOutPlannerParams(OperationContext* opCtx, const CollectionPtr& collection, CanonicalQuery* canonicalQuery, QueryPlannerParams* plannerParams); +/** + * Overload of the above function that does two things: + * - Calls the single collection overload of 'fillOutPlannerParams' on the main collection held + * by 'collections' + * - Calls 'fillOutSecondaryCollectionsInformation' to store information about the set of + * secondary collections held by 'collections' on 'plannerParams'. + */ +void fillOutPlannerParams(OperationContext* opCtx, + const MultiCollection& collections, + CanonicalQuery* canonicalQuery, + QueryPlannerParams* plannerParams); /** * Return whether or not any component of the path 'path' is multikey given an index key pattern @@ -125,9 +154,21 @@ bool shouldWaitForOplogVisibility(OperationContext* opCtx, * pushdown into the find layer this function will be invoked to extract pipeline stages and * attach them to the provided 'CanonicalQuery'. This function should capture the Pipeline that * stages should be extracted from. + * + * Note that the first overload takes a 'MultiCollection' and can construct a PlanExecutor over + * multiple collections, while the second overload takes a single 'CollectionPtr' and can only + * construct a PlanExecutor over a single collection. */ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( OperationContext* opCtx, + const MultiCollection& collections, + std::unique_ptr<CanonicalQuery> canonicalQuery, + std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + PlanYieldPolicy::YieldPolicy yieldPolicy, + size_t plannerOptions = 0); + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( + OperationContext* opCtx, const CollectionPtr* collection, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, @@ -148,9 +189,21 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( * pushdown into the find layer this function will be invoked to extract pipeline stages and * attach them to the provided 'CanonicalQuery'. This function should capture the Pipeline that * stages should be extracted from. + * + * Note that the first overload takes a 'MultiCollection' and can construct a PlanExecutor over + * multiple collections, while the second overload takes a single 'CollectionPtr' and can only + * construct a PlanExecutor over a single collection. */ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, + const MultiCollection& collections, + std::unique_ptr<CanonicalQuery> canonicalQuery, + std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + bool permitYield = false, + size_t plannerOptions = QueryPlannerParams::DEFAULT); + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( + OperationContext* opCtx, const CollectionPtr* collection, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, diff --git a/src/mongo/db/query/multi_collection.h b/src/mongo/db/query/multi_collection.h new file mode 100644 index 00000000000..77368d13682 --- /dev/null +++ b/src/mongo/db/query/multi_collection.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/query/query_planner_params.h" + +namespace mongo { + +/** + * Class which holds a set of pointers to multiple collections. This class distinguishes between + * a 'main collection' and 'secondary collections'. While the former represents the collection a + * given command is run against, the latter represents other collections that the execution + * engine may need to be made aware of. + */ +class MultiCollection final { +public: + MultiCollection() = default; + + MultiCollection(boost::optional<AutoGetCollectionForReadCommandMaybeLockFree>& mainCollCtx, + std::vector<std::unique_ptr<AutoGetCollectionForReadCommandMaybeLockFree>>& + secondaryCollCtxes) { + if (mainCollCtx) { + _mainColl = &mainCollCtx->getCollection(); + _mainCollName = mainCollCtx->getNss(); + } + + for (auto& secondaryColl : secondaryCollCtxes) { + if (*secondaryColl) { + // Even if 'secondaryColl' doesn't exist, we still want to include it. It is the + // responsibility of consumers of this class to verify that a collection exists + // before accessing it. + _secondaryColls.emplace(secondaryColl->getNss(), secondaryColl->getCollection()); + } + } + } + + explicit MultiCollection(const CollectionPtr* mainColl) + : _mainColl(mainColl), _secondaryColls({}) {} + + explicit MultiCollection(const CollectionPtr& mainColl) : MultiCollection(&mainColl) {} + + bool hasMainCollection() const { + return _mainColl->get(); + } + + const CollectionPtr& getMainCollection() const { + return *_mainColl; + } + + const std::map<NamespaceString, const CollectionPtr&>& getSecondaryCollections() const { + return _secondaryColls; + } + + void clear() { + _mainColl = &CollectionPtr::null; + _secondaryColls.clear(); + } + +private: + const CollectionPtr* _mainColl{&CollectionPtr::null}; + boost::optional<NamespaceString> _mainCollName = boost::none; + + // Map from namespace to a corresponding CollectionPtr. + std::map<NamespaceString, const CollectionPtr&> _secondaryColls{}; +}; +} // namespace mongo diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index 528b6b0e2cf..d5f2e260cb6 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -620,3 +620,10 @@ server_parameters: expr: 100 * 1024 * 1024 validator: gt: 0 + + internalEnableMultipleAutoGetCollections: + description: "Test only parameter to enable taking multiple AutoGetCollections in runAggregate" + set_at: [ startup, runtime ] + cpp_varname: "internalEnableMultipleAutoGetCollections" + cpp_vartype: AtomicWord<bool> + default: false diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index ad34bdd1ee6..54697f2cc9a 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -39,6 +39,21 @@ namespace mongo { +/** + * Struct containing information about secondary collections (such as the 'from' collection in + * $lookup) useful for query planning. + */ +struct SecondaryCollectionInfo { + NamespaceString nss; + std::vector<IndexEntry> indexes{}; + bool exists{true}; + bool isSharded{false}; + bool isView{false}; + + // The approximate size of the collection in bytes. + long long approximateCollectionSizeBytes{0}; +}; + struct QueryPlannerParams { QueryPlannerParams() : options(DEFAULT), @@ -147,6 +162,9 @@ struct QueryPlannerParams { // Specifies the collator information necessary to utilize the cluster key in bounded // collection scans and other query operations. const CollatorInterface* clusteredCollectionCollator; + + // List of information about any secondary collections that can be executed against. + std::vector<SecondaryCollectionInfo> secondaryCollectionsInfo; }; } // namespace mongo |