summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp350
1 files changed, 164 insertions, 186 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b6ddfdd7e12..6dbcfe4c812 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -45,219 +45,197 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::shared_ptr;
- using std::string;
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::string;
namespace {
- class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface {
- public:
- MongodImplementation(const intrusive_ptr<ExpressionContext>& ctx)
- : _ctx(ctx)
- , _client(ctx->opCtx)
- {}
-
- DBClientBase* directClient() final {
- // opCtx may have changed since our last call
- invariant(_ctx->opCtx);
- _client.setOpCtx(_ctx->opCtx);
- return &_client;
- }
+class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface {
+public:
+ MongodImplementation(const intrusive_ptr<ExpressionContext>& ctx)
+ : _ctx(ctx), _client(ctx->opCtx) {}
+
+ DBClientBase* directClient() final {
+ // opCtx may have changed since our last call
+ invariant(_ctx->opCtx);
+ _client.setOpCtx(_ctx->opCtx);
+ return &_client;
+ }
- bool isSharded(const NamespaceString& ns) final {
- const ChunkVersion unsharded(0, 0, OID());
- return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded));
- }
+ bool isSharded(const NamespaceString& ns) final {
+ const ChunkVersion unsharded(0, 0, OID());
+ return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded));
+ }
- bool isCapped(const NamespaceString& ns) final {
- AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns());
- Collection* collection = ctx.getCollection();
- return collection && collection->isCapped();
- }
+ bool isCapped(const NamespaceString& ns) final {
+ AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns());
+ Collection* collection = ctx.getCollection();
+ return collection && collection->isCapped();
+ }
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (_ctx->bypassDocumentValidation)
- maybeDisableValidation.emplace(_ctx->opCtx);
+ BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (_ctx->bypassDocumentValidation)
+ maybeDisableValidation.emplace(_ctx->opCtx);
- _client.insert(ns.ns(), objs);
- return _client.getLastErrorDetailed();
- }
+ _client.insert(ns.ns(), objs);
+ return _client.getLastErrorDetailed();
+ }
- private:
- intrusive_ptr<ExpressionContext> _ctx;
- DBDirectClient _client;
- };
+private:
+ intrusive_ptr<ExpressionContext> _ctx;
+ DBDirectClient _client;
+};
}
- shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
- OperationContext* txn,
- Collection* collection,
- const intrusive_ptr<Pipeline>& pPipeline,
- const intrusive_ptr<ExpressionContext>& pExpCtx) {
- // get the full "namespace" name
- const string& fullName = pExpCtx->ns.ns();
-
- // We will be modifying the source vector as we go
- Pipeline::SourceContainer& sources = pPipeline->sources;
-
- // Inject a MongodImplementation to sources that need them.
- for (size_t i = 0; i < sources.size(); i++) {
- DocumentSourceNeedsMongod* needsMongod =
- dynamic_cast<DocumentSourceNeedsMongod*>(sources[i].get());
- if (needsMongod) {
- needsMongod->injectMongodInterface(
- std::make_shared<MongodImplementation>(pExpCtx));
- }
+shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
+ OperationContext* txn,
+ Collection* collection,
+ const intrusive_ptr<Pipeline>& pPipeline,
+ const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ // get the full "namespace" name
+ const string& fullName = pExpCtx->ns.ns();
+
+ // We will be modifying the source vector as we go
+ Pipeline::SourceContainer& sources = pPipeline->sources;
+
+ // Inject a MongodImplementation to sources that need them.
+ for (size_t i = 0; i < sources.size(); i++) {
+ DocumentSourceNeedsMongod* needsMongod =
+ dynamic_cast<DocumentSourceNeedsMongod*>(sources[i].get());
+ if (needsMongod) {
+ needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(pExpCtx));
}
+ }
- if (!sources.empty() && sources.front()->isValidInitialSource()) {
- if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) {
- // Enable the hooks for setting up authentication on the subsequent internal
- // connections we are going to create. This would normally have been done
- // when SetShardVersion was called, but since SetShardVersion is never called
- // on secondaries, this is needed.
- ShardedConnectionInfo::addHook();
- }
- return std::shared_ptr<PlanExecutor>(); // don't need a cursor
+ if (!sources.empty() && sources.front()->isValidInitialSource()) {
+ if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) {
+ // Enable the hooks for setting up authentication on the subsequent internal
+ // connections we are going to create. This would normally have been done
+ // when SetShardVersion was called, but since SetShardVersion is never called
+ // on secondaries, this is needed.
+ ShardedConnectionInfo::addHook();
}
+ return std::shared_ptr<PlanExecutor>(); // don't need a cursor
+ }
- // Look for an initial match. This works whether we got an initial query or not.
- // If not, it results in a "{}" query, which will be what we want in that case.
- const BSONObj queryObj = pPipeline->getInitialQuery();
- if (!queryObj.isEmpty()) {
- // This will get built in to the Cursor we'll create, so
- // remove the match from the pipeline
- sources.pop_front();
- }
-
- // Find the set of fields in the source documents depended on by this pipeline.
- const DepsTracker deps = pPipeline->getDependencies(queryObj);
-
- // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
- // This will need to change to support covering indexes (SERVER-12015). There is an
- // exception for textScore since that can only be retrieved by a query projection.
- const BSONObj projectionForQuery = deps.needTextScore ? deps.toProjection() : BSONObj();
-
- /*
- Look for an initial sort; we'll try to add this to the
- Cursor we create. If we're successful in doing that (further down),
- we'll remove the $sort from the pipeline, because the documents
- will already come sorted in the specified order as a result of the
- index scan.
- */
- intrusive_ptr<DocumentSourceSort> sortStage;
- BSONObj sortObj;
- if (!sources.empty()) {
- sortStage = dynamic_cast<DocumentSourceSort*>(sources.front().get());
- if (sortStage) {
- // build the sort key
- sortObj = sortStage->serializeSortKey(/*explain*/false).toBson();
- }
- }
-
- // Create the PlanExecutor.
- //
- // If we try to create a PlanExecutor that includes both the match and the
- // sort, and the two are incompatible wrt the available indexes, then
- // we don't get a PlanExecutor back.
- //
- // So we try to use both first. If that fails, try again, without the
- // sort.
- //
- // If we don't have a sort, jump straight to just creating a PlanExecutor.
- // without the sort.
- //
- // If we are able to incorporate the sort into the PlanExecutor, remove it
- // from the head of the pipeline.
- //
- // LATER - we should be able to find this out before we create the
- // cursor. Either way, we can then apply other optimizations there
- // are tickets for, such as SERVER-4507.
- const size_t runnerOptions = QueryPlannerParams::DEFAULT
- | QueryPlannerParams::INCLUDE_SHARD_FILTER
- | QueryPlannerParams::NO_BLOCKING_SORT
- ;
- std::shared_ptr<PlanExecutor> exec;
- bool sortInRunner = false;
-
- const WhereCallbackReal whereCallback(pExpCtx->opCtx, pExpCtx->ns.db());
+ // Look for an initial match. This works whether we got an initial query or not.
+ // If not, it results in a "{}" query, which will be what we want in that case.
+ const BSONObj queryObj = pPipeline->getInitialQuery();
+ if (!queryObj.isEmpty()) {
+ // This will get built in to the Cursor we'll create, so
+ // remove the match from the pipeline
+ sources.pop_front();
+ }
+ // Find the set of fields in the source documents depended on by this pipeline.
+ const DepsTracker deps = pPipeline->getDependencies(queryObj);
+
+ // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
+ // This will need to change to support covering indexes (SERVER-12015). There is an
+ // exception for textScore since that can only be retrieved by a query projection.
+ const BSONObj projectionForQuery = deps.needTextScore ? deps.toProjection() : BSONObj();
+
+ /*
+ Look for an initial sort; we'll try to add this to the
+ Cursor we create. If we're successful in doing that (further down),
+ we'll remove the $sort from the pipeline, because the documents
+ will already come sorted in the specified order as a result of the
+ index scan.
+ */
+ intrusive_ptr<DocumentSourceSort> sortStage;
+ BSONObj sortObj;
+ if (!sources.empty()) {
+ sortStage = dynamic_cast<DocumentSourceSort*>(sources.front().get());
if (sortStage) {
- CanonicalQuery* cq;
- Status status =
- CanonicalQuery::canonicalize(pExpCtx->ns,
- queryObj,
- sortObj,
- projectionForQuery,
- &cq,
- whereCallback);
-
- PlanExecutor* rawExec;
- if (status.isOK() && getExecutor(txn,
- collection,
- cq,
- PlanExecutor::YIELD_AUTO,
- &rawExec,
- runnerOptions).isOK()) {
- // success: The PlanExecutor will handle sorting for us using an index.
- exec.reset(rawExec);
- sortInRunner = true;
-
- sources.pop_front();
- if (sortStage->getLimitSrc()) {
- // need to reinsert coalesced $limit after removing $sort
- sources.push_front(sortStage->getLimitSrc());
- }
- }
+ // build the sort key
+ sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson();
}
+ }
- if (!exec.get()) {
- const BSONObj noSort;
- CanonicalQuery* cq;
- uassertStatusOK(
- CanonicalQuery::canonicalize(pExpCtx->ns,
- queryObj,
- noSort,
- projectionForQuery,
- &cq,
- whereCallback));
-
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutor(txn,
- collection,
- cq,
- PlanExecutor::YIELD_AUTO,
- &rawExec,
- runnerOptions));
+ // Create the PlanExecutor.
+ //
+ // If we try to create a PlanExecutor that includes both the match and the
+ // sort, and the two are incompatible wrt the available indexes, then
+ // we don't get a PlanExecutor back.
+ //
+ // So we try to use both first. If that fails, try again, without the
+ // sort.
+ //
+ // If we don't have a sort, jump straight to just creating a PlanExecutor.
+ // without the sort.
+ //
+ // If we are able to incorporate the sort into the PlanExecutor, remove it
+ // from the head of the pipeline.
+ //
+ // LATER - we should be able to find this out before we create the
+ // cursor. Either way, we can then apply other optimizations there
+ // are tickets for, such as SERVER-4507.
+ const size_t runnerOptions = QueryPlannerParams::DEFAULT |
+ QueryPlannerParams::INCLUDE_SHARD_FILTER | QueryPlannerParams::NO_BLOCKING_SORT;
+ std::shared_ptr<PlanExecutor> exec;
+ bool sortInRunner = false;
+
+ const WhereCallbackReal whereCallback(pExpCtx->opCtx, pExpCtx->ns.db());
+
+ if (sortStage) {
+ CanonicalQuery* cq;
+ Status status = CanonicalQuery::canonicalize(
+ pExpCtx->ns, queryObj, sortObj, projectionForQuery, &cq, whereCallback);
+
+ PlanExecutor* rawExec;
+ if (status.isOK() &&
+ getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, runnerOptions)
+ .isOK()) {
+ // success: The PlanExecutor will handle sorting for us using an index.
exec.reset(rawExec);
+ sortInRunner = true;
+
+ sources.pop_front();
+ if (sortStage->getLimitSrc()) {
+ // need to reinsert coalesced $limit after removing $sort
+ sources.push_front(sortStage->getLimitSrc());
+ }
}
+ }
+ if (!exec.get()) {
+ const BSONObj noSort;
+ CanonicalQuery* cq;
+ uassertStatusOK(CanonicalQuery::canonicalize(
+ pExpCtx->ns, queryObj, noSort, projectionForQuery, &cq, whereCallback));
- // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
- // deregister the PlanExecutor so that it can be registered with ClientCursor.
- exec->deregisterExec();
- exec->saveState();
+ PlanExecutor* rawExec;
+ uassertStatusOK(
+ getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, runnerOptions));
+ exec.reset(rawExec);
+ }
- // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
- intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(fullName, exec, pExpCtx);
- // Note the query, sort, and projection for explain.
- pSource->setQuery(queryObj);
- if (sortInRunner)
- pSource->setSort(sortObj);
+ // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
+ // deregister the PlanExecutor so that it can be registered with ClientCursor.
+ exec->deregisterExec();
+ exec->saveState();
- pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
+ // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
+ intrusive_ptr<DocumentSourceCursor> pSource =
+ DocumentSourceCursor::create(fullName, exec, pExpCtx);
- while (!sources.empty() && pSource->coalesce(sources.front())) {
- sources.pop_front();
- }
+ // Note the query, sort, and projection for explain.
+ pSource->setQuery(queryObj);
+ if (sortInRunner)
+ pSource->setSort(sortObj);
- pPipeline->addInitialSource(pSource);
+ pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
- return exec;
+ while (!sources.empty() && pSource->coalesce(sources.front())) {
+ sources.pop_front();
}
-} // namespace mongo
+ pPipeline->addInitialSource(pSource);
+
+ return exec;
+}
+
+} // namespace mongo