diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 350 |
1 files changed, 164 insertions, 186 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index b6ddfdd7e12..6dbcfe4c812 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -45,219 +45,197 @@ namespace mongo { - using boost::intrusive_ptr; - using std::shared_ptr; - using std::string; +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; namespace { - class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { - public: - MongodImplementation(const intrusive_ptr<ExpressionContext>& ctx) - : _ctx(ctx) - , _client(ctx->opCtx) - {} - - DBClientBase* directClient() final { - // opCtx may have changed since our last call - invariant(_ctx->opCtx); - _client.setOpCtx(_ctx->opCtx); - return &_client; - } +class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { +public: + MongodImplementation(const intrusive_ptr<ExpressionContext>& ctx) + : _ctx(ctx), _client(ctx->opCtx) {} + + DBClientBase* directClient() final { + // opCtx may have changed since our last call + invariant(_ctx->opCtx); + _client.setOpCtx(_ctx->opCtx); + return &_client; + } - bool isSharded(const NamespaceString& ns) final { - const ChunkVersion unsharded(0, 0, OID()); - return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded)); - } + bool isSharded(const NamespaceString& ns) final { + const ChunkVersion unsharded(0, 0, OID()); + return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded)); + } - bool isCapped(const NamespaceString& ns) final { - AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); - Collection* collection = ctx.getCollection(); - return collection && collection->isCapped(); - } + bool isCapped(const NamespaceString& ns) final { + AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); + Collection* collection = ctx.getCollection(); + return collection && collection->isCapped(); + } - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (_ctx->bypassDocumentValidation) - maybeDisableValidation.emplace(_ctx->opCtx); + BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { + boost::optional<DisableDocumentValidation> maybeDisableValidation; + if (_ctx->bypassDocumentValidation) + maybeDisableValidation.emplace(_ctx->opCtx); - _client.insert(ns.ns(), objs); - return _client.getLastErrorDetailed(); - } + _client.insert(ns.ns(), objs); + return _client.getLastErrorDetailed(); + } - private: - intrusive_ptr<ExpressionContext> _ctx; - DBDirectClient _client; - }; +private: + intrusive_ptr<ExpressionContext> _ctx; + DBDirectClient _client; +}; } - shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( - OperationContext* txn, - Collection* collection, - const intrusive_ptr<Pipeline>& pPipeline, - const intrusive_ptr<ExpressionContext>& pExpCtx) { - // get the full "namespace" name - const string& fullName = pExpCtx->ns.ns(); - - // We will be modifying the source vector as we go - Pipeline::SourceContainer& sources = pPipeline->sources; - - // Inject a MongodImplementation to sources that need them. - for (size_t i = 0; i < sources.size(); i++) { - DocumentSourceNeedsMongod* needsMongod = - dynamic_cast<DocumentSourceNeedsMongod*>(sources[i].get()); - if (needsMongod) { - needsMongod->injectMongodInterface( - std::make_shared<MongodImplementation>(pExpCtx)); - } +shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( + OperationContext* txn, + Collection* collection, + const intrusive_ptr<Pipeline>& pPipeline, + const intrusive_ptr<ExpressionContext>& pExpCtx) { + // get the full "namespace" name + const string& fullName = pExpCtx->ns.ns(); + + // We will be modifying the source vector as we go + Pipeline::SourceContainer& sources = pPipeline->sources; + + // Inject a MongodImplementation to sources that need them. + for (size_t i = 0; i < sources.size(); i++) { + DocumentSourceNeedsMongod* needsMongod = + dynamic_cast<DocumentSourceNeedsMongod*>(sources[i].get()); + if (needsMongod) { + needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(pExpCtx)); } + } - if (!sources.empty() && sources.front()->isValidInitialSource()) { - if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) { - // Enable the hooks for setting up authentication on the subsequent internal - // connections we are going to create. This would normally have been done - // when SetShardVersion was called, but since SetShardVersion is never called - // on secondaries, this is needed. - ShardedConnectionInfo::addHook(); - } - return std::shared_ptr<PlanExecutor>(); // don't need a cursor + if (!sources.empty() && sources.front()->isValidInitialSource()) { + if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) { + // Enable the hooks for setting up authentication on the subsequent internal + // connections we are going to create. This would normally have been done + // when SetShardVersion was called, but since SetShardVersion is never called + // on secondaries, this is needed. + ShardedConnectionInfo::addHook(); } + return std::shared_ptr<PlanExecutor>(); // don't need a cursor + } - // Look for an initial match. This works whether we got an initial query or not. - // If not, it results in a "{}" query, which will be what we want in that case. - const BSONObj queryObj = pPipeline->getInitialQuery(); - if (!queryObj.isEmpty()) { - // This will get built in to the Cursor we'll create, so - // remove the match from the pipeline - sources.pop_front(); - } - - // Find the set of fields in the source documents depended on by this pipeline. - const DepsTracker deps = pPipeline->getDependencies(queryObj); - - // Passing query an empty projection since it is faster to use ParsedDeps::extractFields(). - // This will need to change to support covering indexes (SERVER-12015). There is an - // exception for textScore since that can only be retrieved by a query projection. - const BSONObj projectionForQuery = deps.needTextScore ? deps.toProjection() : BSONObj(); - - /* - Look for an initial sort; we'll try to add this to the - Cursor we create. If we're successful in doing that (further down), - we'll remove the $sort from the pipeline, because the documents - will already come sorted in the specified order as a result of the - index scan. - */ - intrusive_ptr<DocumentSourceSort> sortStage; - BSONObj sortObj; - if (!sources.empty()) { - sortStage = dynamic_cast<DocumentSourceSort*>(sources.front().get()); - if (sortStage) { - // build the sort key - sortObj = sortStage->serializeSortKey(/*explain*/false).toBson(); - } - } - - // Create the PlanExecutor. - // - // If we try to create a PlanExecutor that includes both the match and the - // sort, and the two are incompatible wrt the available indexes, then - // we don't get a PlanExecutor back. - // - // So we try to use both first. If that fails, try again, without the - // sort. - // - // If we don't have a sort, jump straight to just creating a PlanExecutor. - // without the sort. - // - // If we are able to incorporate the sort into the PlanExecutor, remove it - // from the head of the pipeline. - // - // LATER - we should be able to find this out before we create the - // cursor. Either way, we can then apply other optimizations there - // are tickets for, such as SERVER-4507. - const size_t runnerOptions = QueryPlannerParams::DEFAULT - | QueryPlannerParams::INCLUDE_SHARD_FILTER - | QueryPlannerParams::NO_BLOCKING_SORT - ; - std::shared_ptr<PlanExecutor> exec; - bool sortInRunner = false; - - const WhereCallbackReal whereCallback(pExpCtx->opCtx, pExpCtx->ns.db()); + // Look for an initial match. This works whether we got an initial query or not. + // If not, it results in a "{}" query, which will be what we want in that case. + const BSONObj queryObj = pPipeline->getInitialQuery(); + if (!queryObj.isEmpty()) { + // This will get built in to the Cursor we'll create, so + // remove the match from the pipeline + sources.pop_front(); + } + // Find the set of fields in the source documents depended on by this pipeline. + const DepsTracker deps = pPipeline->getDependencies(queryObj); + + // Passing query an empty projection since it is faster to use ParsedDeps::extractFields(). + // This will need to change to support covering indexes (SERVER-12015). There is an + // exception for textScore since that can only be retrieved by a query projection. + const BSONObj projectionForQuery = deps.needTextScore ? deps.toProjection() : BSONObj(); + + /* + Look for an initial sort; we'll try to add this to the + Cursor we create. If we're successful in doing that (further down), + we'll remove the $sort from the pipeline, because the documents + will already come sorted in the specified order as a result of the + index scan. + */ + intrusive_ptr<DocumentSourceSort> sortStage; + BSONObj sortObj; + if (!sources.empty()) { + sortStage = dynamic_cast<DocumentSourceSort*>(sources.front().get()); if (sortStage) { - CanonicalQuery* cq; - Status status = - CanonicalQuery::canonicalize(pExpCtx->ns, - queryObj, - sortObj, - projectionForQuery, - &cq, - whereCallback); - - PlanExecutor* rawExec; - if (status.isOK() && getExecutor(txn, - collection, - cq, - PlanExecutor::YIELD_AUTO, - &rawExec, - runnerOptions).isOK()) { - // success: The PlanExecutor will handle sorting for us using an index. - exec.reset(rawExec); - sortInRunner = true; - - sources.pop_front(); - if (sortStage->getLimitSrc()) { - // need to reinsert coalesced $limit after removing $sort - sources.push_front(sortStage->getLimitSrc()); - } - } + // build the sort key + sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson(); } + } - if (!exec.get()) { - const BSONObj noSort; - CanonicalQuery* cq; - uassertStatusOK( - CanonicalQuery::canonicalize(pExpCtx->ns, - queryObj, - noSort, - projectionForQuery, - &cq, - whereCallback)); - - PlanExecutor* rawExec; - uassertStatusOK(getExecutor(txn, - collection, - cq, - PlanExecutor::YIELD_AUTO, - &rawExec, - runnerOptions)); + // Create the PlanExecutor. + // + // If we try to create a PlanExecutor that includes both the match and the + // sort, and the two are incompatible wrt the available indexes, then + // we don't get a PlanExecutor back. + // + // So we try to use both first. If that fails, try again, without the + // sort. + // + // If we don't have a sort, jump straight to just creating a PlanExecutor. + // without the sort. + // + // If we are able to incorporate the sort into the PlanExecutor, remove it + // from the head of the pipeline. + // + // LATER - we should be able to find this out before we create the + // cursor. Either way, we can then apply other optimizations there + // are tickets for, such as SERVER-4507. + const size_t runnerOptions = QueryPlannerParams::DEFAULT | + QueryPlannerParams::INCLUDE_SHARD_FILTER | QueryPlannerParams::NO_BLOCKING_SORT; + std::shared_ptr<PlanExecutor> exec; + bool sortInRunner = false; + + const WhereCallbackReal whereCallback(pExpCtx->opCtx, pExpCtx->ns.db()); + + if (sortStage) { + CanonicalQuery* cq; + Status status = CanonicalQuery::canonicalize( + pExpCtx->ns, queryObj, sortObj, projectionForQuery, &cq, whereCallback); + + PlanExecutor* rawExec; + if (status.isOK() && + getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, runnerOptions) + .isOK()) { + // success: The PlanExecutor will handle sorting for us using an index. exec.reset(rawExec); + sortInRunner = true; + + sources.pop_front(); + if (sortStage->getLimitSrc()) { + // need to reinsert coalesced $limit after removing $sort + sources.push_front(sortStage->getLimitSrc()); + } } + } + if (!exec.get()) { + const BSONObj noSort; + CanonicalQuery* cq; + uassertStatusOK(CanonicalQuery::canonicalize( + pExpCtx->ns, queryObj, noSort, projectionForQuery, &cq, whereCallback)); - // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We - // deregister the PlanExecutor so that it can be registered with ClientCursor. - exec->deregisterExec(); - exec->saveState(); + PlanExecutor* rawExec; + uassertStatusOK( + getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, runnerOptions)); + exec.reset(rawExec); + } - // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. - intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(fullName, exec, pExpCtx); - // Note the query, sort, and projection for explain. - pSource->setQuery(queryObj); - if (sortInRunner) - pSource->setSort(sortObj); + // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We + // deregister the PlanExecutor so that it can be registered with ClientCursor. + exec->deregisterExec(); + exec->saveState(); - pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); + // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. + intrusive_ptr<DocumentSourceCursor> pSource = + DocumentSourceCursor::create(fullName, exec, pExpCtx); - while (!sources.empty() && pSource->coalesce(sources.front())) { - sources.pop_front(); - } + // Note the query, sort, and projection for explain. + pSource->setQuery(queryObj); + if (sortInRunner) + pSource->setSort(sortObj); - pPipeline->addInitialSource(pSource); + pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); - return exec; + while (!sources.empty() && pSource->coalesce(sources.front())) { + sources.pop_front(); } -} // namespace mongo + pPipeline->addInitialSource(pSource); + + return exec; +} + +} // namespace mongo |