diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 1037 |
1 files changed, 506 insertions, 531 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index bba6bf9615f..9e9427190f1 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -46,597 +46,572 @@ namespace mongo { - using boost::intrusive_ptr; - using std::endl; - using std::ostringstream; - using std::string; - using std::vector; - - const char Pipeline::commandName[] = "aggregate"; - const char Pipeline::pipelineName[] = "pipeline"; - const char Pipeline::explainName[] = "explain"; - const char Pipeline::fromRouterName[] = "fromRouter"; - const char Pipeline::serverPipelineName[] = "serverPipeline"; - const char Pipeline::mongosPipelineName[] = "mongosPipeline"; - - Pipeline::Pipeline(const intrusive_ptr<ExpressionContext> &pTheCtx): - explain(false), - pCtx(pTheCtx) { - } - - - /* this structure is used to make a lookup table of operators */ - struct StageDesc { - const char *pName; - intrusive_ptr<DocumentSource> (*pFactory)( - BSONElement, const intrusive_ptr<ExpressionContext> &); - }; - - /* this table must be in alphabetical order by name for bsearch() */ - static const StageDesc stageDesc[] = { - {DocumentSourceGeoNear::geoNearName, - DocumentSourceGeoNear::createFromBson}, - {DocumentSourceGroup::groupName, - DocumentSourceGroup::createFromBson}, - {DocumentSourceLimit::limitName, - DocumentSourceLimit::createFromBson}, - {DocumentSourceMatch::matchName, - DocumentSourceMatch::createFromBson}, - {DocumentSourceMergeCursors::name, - DocumentSourceMergeCursors::createFromBson}, - {DocumentSourceOut::outName, - DocumentSourceOut::createFromBson}, - {DocumentSourceProject::projectName, - DocumentSourceProject::createFromBson}, - {DocumentSourceRedact::redactName, - DocumentSourceRedact::createFromBson}, - {DocumentSourceSkip::skipName, - DocumentSourceSkip::createFromBson}, - {DocumentSourceSort::sortName, - DocumentSourceSort::createFromBson}, - {DocumentSourceUnwind::unwindName, - DocumentSourceUnwind::createFromBson}, - }; - static const size_t nStageDesc = sizeof(stageDesc) / sizeof(StageDesc); - - static int stageDescCmp(const void *pL, const void *pR) { - return strcmp(((const StageDesc *)pL)->pName, - ((const StageDesc *)pR)->pName); - } - - intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg, - const BSONObj& cmdObj, - const intrusive_ptr<ExpressionContext>& pCtx) { - intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx)); - vector<BSONElement> pipeline; - - /* gather the specification for the aggregation */ - for(BSONObj::iterator cmdIterator = cmdObj.begin(); - cmdIterator.more(); ) { - BSONElement cmdElement(cmdIterator.next()); - const char *pFieldName = cmdElement.fieldName(); - - // ignore top-level fields prefixed with $. They are for the command processor, not us. - if (pFieldName[0] == '$') { - continue; - } - - // maxTimeMS is also for the command processor. - if (pFieldName == LiteParsedQuery::cmdOptionMaxTimeMS) { - continue; - } - - // ignore cursor options since they are handled externally. - if (str::equals(pFieldName, "cursor")) { - continue; - } +using boost::intrusive_ptr; +using std::endl; +using std::ostringstream; +using std::string; +using std::vector; + +const char Pipeline::commandName[] = "aggregate"; +const char Pipeline::pipelineName[] = "pipeline"; +const char Pipeline::explainName[] = "explain"; +const char Pipeline::fromRouterName[] = "fromRouter"; +const char Pipeline::serverPipelineName[] = "serverPipeline"; +const char Pipeline::mongosPipelineName[] = "mongosPipeline"; + +Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) + : explain(false), pCtx(pTheCtx) {} + + +/* this structure is used to make a lookup table of operators */ +struct StageDesc { + const char* pName; + intrusive_ptr<DocumentSource>(*pFactory)(BSONElement, const intrusive_ptr<ExpressionContext>&); +}; + +/* this table must be in alphabetical order by name for bsearch() */ +static const StageDesc stageDesc[] = { + {DocumentSourceGeoNear::geoNearName, DocumentSourceGeoNear::createFromBson}, + {DocumentSourceGroup::groupName, DocumentSourceGroup::createFromBson}, + {DocumentSourceLimit::limitName, DocumentSourceLimit::createFromBson}, + {DocumentSourceMatch::matchName, DocumentSourceMatch::createFromBson}, + {DocumentSourceMergeCursors::name, DocumentSourceMergeCursors::createFromBson}, + {DocumentSourceOut::outName, DocumentSourceOut::createFromBson}, + {DocumentSourceProject::projectName, DocumentSourceProject::createFromBson}, + {DocumentSourceRedact::redactName, DocumentSourceRedact::createFromBson}, + {DocumentSourceSkip::skipName, DocumentSourceSkip::createFromBson}, + {DocumentSourceSort::sortName, DocumentSourceSort::createFromBson}, + {DocumentSourceUnwind::unwindName, DocumentSourceUnwind::createFromBson}, +}; +static const size_t nStageDesc = sizeof(stageDesc) / sizeof(StageDesc); + +static int stageDescCmp(const void* pL, const void* pR) { + return strcmp(((const StageDesc*)pL)->pName, ((const StageDesc*)pR)->pName); +} + +intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg, + const BSONObj& cmdObj, + const intrusive_ptr<ExpressionContext>& pCtx) { + intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx)); + vector<BSONElement> pipeline; + + /* gather the specification for the aggregation */ + for (BSONObj::iterator cmdIterator = cmdObj.begin(); cmdIterator.more();) { + BSONElement cmdElement(cmdIterator.next()); + const char* pFieldName = cmdElement.fieldName(); + + // ignore top-level fields prefixed with $. They are for the command processor, not us. + if (pFieldName[0] == '$') { + continue; + } - /* look for the aggregation command */ - if (!strcmp(pFieldName, commandName)) { - continue; - } + // maxTimeMS is also for the command processor. + if (pFieldName == LiteParsedQuery::cmdOptionMaxTimeMS) { + continue; + } - /* check for the collection name */ - if (!strcmp(pFieldName, pipelineName)) { - pipeline = cmdElement.Array(); - continue; - } + // ignore cursor options since they are handled externally. + if (str::equals(pFieldName, "cursor")) { + continue; + } - /* check for explain option */ - if (!strcmp(pFieldName, explainName)) { - pPipeline->explain = cmdElement.Bool(); - continue; - } + /* look for the aggregation command */ + if (!strcmp(pFieldName, commandName)) { + continue; + } - /* if the request came from the router, we're in a shard */ - if (!strcmp(pFieldName, fromRouterName)) { - pCtx->inShard = cmdElement.Bool(); - continue; - } + /* check for the collection name */ + if (!strcmp(pFieldName, pipelineName)) { + pipeline = cmdElement.Array(); + continue; + } - if (str::equals(pFieldName, "allowDiskUse")) { - uassert(16949, - str::stream() << "allowDiskUse must be a bool, not a " - << typeName(cmdElement.type()), - cmdElement.type() == Bool); - pCtx->extSortAllowed = cmdElement.Bool(); - continue; - } + /* check for explain option */ + if (!strcmp(pFieldName, explainName)) { + pPipeline->explain = cmdElement.Bool(); + continue; + } - if (pFieldName == bypassDocumentValidationCommandOption()) { - pCtx->bypassDocumentValidation = cmdElement.trueValue(); - continue; - } + /* if the request came from the router, we're in a shard */ + if (!strcmp(pFieldName, fromRouterName)) { + pCtx->inShard = cmdElement.Bool(); + continue; + } - /* we didn't recognize a field in the command */ - ostringstream sb; - sb << "unrecognized field '" << cmdElement.fieldName() << "'"; - errmsg = sb.str(); - return intrusive_ptr<Pipeline>(); + if (str::equals(pFieldName, "allowDiskUse")) { + uassert(16949, + str::stream() << "allowDiskUse must be a bool, not a " + << typeName(cmdElement.type()), + cmdElement.type() == Bool); + pCtx->extSortAllowed = cmdElement.Bool(); + continue; } - /* - If we get here, we've harvested the fields we expect for a pipeline. - - Set up the specified document source pipeline. - */ - SourceContainer& sources = pPipeline->sources; // shorthand - - /* iterate over the steps in the pipeline */ - const size_t nSteps = pipeline.size(); - for(size_t iStep = 0; iStep < nSteps; ++iStep) { - /* pull out the pipeline element as an object */ - BSONElement pipeElement(pipeline[iStep]); - uassert(15942, str::stream() << "pipeline element " << - iStep << " is not an object", - pipeElement.type() == Object); - BSONObj bsonObj(pipeElement.Obj()); - - // Parse a pipeline stage from 'bsonObj'. - uassert(16435, "A pipeline stage specification object must contain exactly one field.", - bsonObj.nFields() == 1); - BSONElement stageSpec = bsonObj.firstElement(); - const char* stageName = stageSpec.fieldName(); - - // Create a DocumentSource pipeline stage from 'stageSpec'. - StageDesc key; - key.pName = stageName; - const StageDesc* pDesc = (const StageDesc*) - bsearch(&key, stageDesc, nStageDesc, sizeof(StageDesc), - stageDescCmp); - - uassert(16436, - str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'", - pDesc); - intrusive_ptr<DocumentSource> stage = pDesc->pFactory(stageSpec, pCtx); - verify(stage); - sources.push_back(stage); - - // TODO find a good general way to check stages that must be first syntactically - - if (dynamic_cast<DocumentSourceOut*>(stage.get())) { - uassert(16991, "$out can only be the final stage in the pipeline", - iStep == nSteps - 1); - } + if (pFieldName == bypassDocumentValidationCommandOption()) { + pCtx->bypassDocumentValidation = cmdElement.trueValue(); + continue; } - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - Optimizations::Local::moveMatchBeforeSort(pPipeline.get()); - Optimizations::Local::moveSkipAndLimitBeforeProject(pPipeline.get()); - Optimizations::Local::moveLimitBeforeSkip(pPipeline.get()); - Optimizations::Local::coalesceAdjacent(pPipeline.get()); - Optimizations::Local::optimizeEachDocumentSource(pPipeline.get()); - Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get()); + /* we didn't recognize a field in the command */ + ostringstream sb; + sb << "unrecognized field '" << cmdElement.fieldName() << "'"; + errmsg = sb.str(); + return intrusive_ptr<Pipeline>(); + } - return pPipeline; + /* + If we get here, we've harvested the fields we expect for a pipeline. + + Set up the specified document source pipeline. + */ + SourceContainer& sources = pPipeline->sources; // shorthand + + /* iterate over the steps in the pipeline */ + const size_t nSteps = pipeline.size(); + for (size_t iStep = 0; iStep < nSteps; ++iStep) { + /* pull out the pipeline element as an object */ + BSONElement pipeElement(pipeline[iStep]); + uassert(15942, + str::stream() << "pipeline element " << iStep << " is not an object", + pipeElement.type() == Object); + BSONObj bsonObj(pipeElement.Obj()); + + // Parse a pipeline stage from 'bsonObj'. + uassert(16435, + "A pipeline stage specification object must contain exactly one field.", + bsonObj.nFields() == 1); + BSONElement stageSpec = bsonObj.firstElement(); + const char* stageName = stageSpec.fieldName(); + + // Create a DocumentSource pipeline stage from 'stageSpec'. + StageDesc key; + key.pName = stageName; + const StageDesc* pDesc = + (const StageDesc*)bsearch(&key, stageDesc, nStageDesc, sizeof(StageDesc), stageDescCmp); + + uassert(16436, + str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'", + pDesc); + intrusive_ptr<DocumentSource> stage = pDesc->pFactory(stageSpec, pCtx); + verify(stage); + sources.push_back(stage); + + // TODO find a good general way to check stages that must be first syntactically + + if (dynamic_cast<DocumentSourceOut*>(stage.get())) { + uassert(16991, "$out can only be the final stage in the pipeline", iStep == nSteps - 1); + } } - void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) { - // TODO Keep moving matches across multiple sorts as moveLimitBeforeSkip does below. - // TODO Check sort for limit. Not an issue currently due to order optimizations are applied, - // but should be fixed. - SourceContainer& sources = pipeline->sources; - for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) { - intrusive_ptr<DocumentSource> &pSource = sources[srci]; - DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch *>(pSource.get()); - if (match && !match->isTextQuery()) { - intrusive_ptr<DocumentSource> &pPrevious = sources[srci - 1]; - if (dynamic_cast<DocumentSourceSort *>(pPrevious.get())) { - /* swap this item with the previous */ - intrusive_ptr<DocumentSource> pTemp(pPrevious); - pPrevious = pSource; - pSource = pTemp; - } + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + Optimizations::Local::moveMatchBeforeSort(pPipeline.get()); + Optimizations::Local::moveSkipAndLimitBeforeProject(pPipeline.get()); + Optimizations::Local::moveLimitBeforeSkip(pPipeline.get()); + Optimizations::Local::coalesceAdjacent(pPipeline.get()); + Optimizations::Local::optimizeEachDocumentSource(pPipeline.get()); + Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get()); + + return pPipeline; +} + +void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) { + // TODO Keep moving matches across multiple sorts as moveLimitBeforeSkip does below. + // TODO Check sort for limit. Not an issue currently due to order optimizations are applied, + // but should be fixed. + SourceContainer& sources = pipeline->sources; + for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) { + intrusive_ptr<DocumentSource>& pSource = sources[srci]; + DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(pSource.get()); + if (match && !match->isTextQuery()) { + intrusive_ptr<DocumentSource>& pPrevious = sources[srci - 1]; + if (dynamic_cast<DocumentSourceSort*>(pPrevious.get())) { + /* swap this item with the previous */ + intrusive_ptr<DocumentSource> pTemp(pPrevious); + pPrevious = pSource; + pSource = pTemp; } } } - - void Pipeline::Optimizations::Local::moveSkipAndLimitBeforeProject(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) return; - - for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { - // This optimization only applies when a $project comes before a $skip or $limit. - auto project = dynamic_cast<DocumentSourceProject*>(sources[i-1].get()); - if (!project) continue; - - auto skip = dynamic_cast<DocumentSourceSkip*>(sources[i].get()); - auto limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get()); - if (!(skip || limit)) continue; - - swap(sources[i], sources[i-1]); - - // Start at back again. This is needed to handle cases with more than 1 $skip or - // $limit (S means skip, L means limit, P means project) +} + +void Pipeline::Optimizations::Local::moveSkipAndLimitBeforeProject(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.empty()) + return; + + for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { + // This optimization only applies when a $project comes before a $skip or $limit. + auto project = dynamic_cast<DocumentSourceProject*>(sources[i - 1].get()); + if (!project) + continue; + + auto skip = dynamic_cast<DocumentSourceSkip*>(sources[i].get()); + auto limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get()); + if (!(skip || limit)) + continue; + + swap(sources[i], sources[i - 1]); + + // Start at back again. This is needed to handle cases with more than 1 $skip or + // $limit (S means skip, L means limit, P means project) + // + // These would work without second pass (assuming back to front ordering) + // PS -> SP + // PL -> LP + // PPL -> LPP + // PPS -> SPP + // + // The following cases need a second pass to handle the second skip or limit + // PLL -> LLP + // PPLL -> LLPP + // PLPL -> LLPP + i = sources.size(); // decremented before next pass + } +} + +void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.empty()) + return; + + for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { + DocumentSourceLimit* limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get()); + DocumentSourceSkip* skip = dynamic_cast<DocumentSourceSkip*>(sources[i - 1].get()); + if (limit && skip) { + // Increase limit by skip since the skipped docs now pass through the $limit + limit->setLimit(limit->getLimit() + skip->getSkip()); + swap(sources[i], sources[i - 1]); + + // Start at back again. This is needed to handle cases with more than 1 $limit + // (S means skip, L means limit) // - // These would work without second pass (assuming back to front ordering) - // PS -> SP - // PL -> LP - // PPL -> LPP - // PPS -> SPP + // These two would work without second pass (assuming back to front ordering) + // SL -> LS + // SSL -> LSS // - // The following cases need a second pass to handle the second skip or limit - // PLL -> LLP - // PPLL -> LLPP - // PLPL -> LLPP - i = sources.size(); // decremented before next pass + // The following cases need a second pass to handle the second limit + // SLL -> LLS + // SSLL -> LLSS + // SLSL -> LLSS + i = sources.size(); // decremented before next pass } } - - void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) - return; - - for(int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { - DocumentSourceLimit* limit = - dynamic_cast<DocumentSourceLimit*>(sources[i].get()); - DocumentSourceSkip* skip = - dynamic_cast<DocumentSourceSkip*>(sources[i-1].get()); - if (limit && skip) { - // Increase limit by skip since the skipped docs now pass through the $limit - limit->setLimit(limit->getLimit() + skip->getSkip()); - swap(sources[i], sources[i-1]); - - // Start at back again. This is needed to handle cases with more than 1 $limit - // (S means skip, L means limit) - // - // These two would work without second pass (assuming back to front ordering) - // SL -> LS - // SSL -> LSS - // - // The following cases need a second pass to handle the second limit - // SLL -> LLS - // SSLL -> LLSS - // SLSL -> LLSS - i = sources.size(); // decremented before next pass - } - } +} + +void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.empty()) + return; + + // move all sources to a temporary list + SourceContainer tempSources; + sources.swap(tempSources); + + // move the first one to the final list + sources.push_back(tempSources[0]); + + // run through the sources, coalescing them or keeping them + for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) { + // If we can't coalesce the source with the last, then move it + // to the final list, and make it the new last. (If we succeeded, + // then we're still on the same last, and there's no need to move + // or do anything with the source -- the destruction of tempSources + // will take care of the rest.) + intrusive_ptr<DocumentSource>& pLastSource = sources.back(); + intrusive_ptr<DocumentSource>& pTemp = tempSources[tempi]; + verify(pTemp && pLastSource); + if (!pLastSource->coalesce(pTemp)) + sources.push_back(pTemp); } - - void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) - return; - - // move all sources to a temporary list - SourceContainer tempSources; - sources.swap(tempSources); - - // move the first one to the final list - sources.push_back(tempSources[0]); - - // run through the sources, coalescing them or keeping them - for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) { - // If we can't coalesce the source with the last, then move it - // to the final list, and make it the new last. (If we succeeded, - // then we're still on the same last, and there's no need to move - // or do anything with the source -- the destruction of tempSources - // will take care of the rest.) - intrusive_ptr<DocumentSource> &pLastSource = sources.back(); - intrusive_ptr<DocumentSource> &pTemp = tempSources[tempi]; - verify(pTemp && pLastSource); - if (!pLastSource->coalesce(pTemp)) - sources.push_back(pTemp); +} + +void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + SourceContainer newSources; + for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) { + if (auto out = (*it)->optimize()) { + newSources.push_back(std::move(out)); } } - - void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - SourceContainer newSources; - for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) { - if (auto out = (*it)->optimize()) { - newSources.push_back(std::move(out)); + pipeline->sources = std::move(newSources); +} + +void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.size() >= 2 && dynamic_cast<DocumentSourceRedact*>(sources[0].get())) { + if (DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources[1].get())) { + const BSONObj redactSafePortion = match->redactSafePortion(); + if (!redactSafePortion.isEmpty()) { + sources.push_front(DocumentSourceMatch::createFromBson( + BSON("$match" << redactSafePortion).firstElement(), pipeline->pCtx)); } } - pipeline->sources = std::move(newSources); } - - void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.size() >= 2 && dynamic_cast<DocumentSourceRedact*>(sources[0].get())) { - if (DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources[1].get())) { - const BSONObj redactSafePortion = match->redactSafePortion(); - if (!redactSafePortion.isEmpty()) { - sources.push_front( - DocumentSourceMatch::createFromBson( - BSON("$match" << redactSafePortion).firstElement(), - pipeline->pCtx)); - } +} + +void Pipeline::addRequiredPrivileges(Command* commandTemplate, + const string& db, + BSONObj cmdObj, + vector<Privilege>* out) { + ResourcePattern inputResource(commandTemplate->parseResourcePattern(db, cmdObj)); + uassert(17138, + mongoutils::str::stream() << "Invalid input resource, " << inputResource.toString(), + inputResource.isExactNamespacePattern()); + + out->push_back(Privilege(inputResource, ActionType::find)); + + BSONObj pipeline = cmdObj.getObjectField("pipeline"); + BSONForEach(stageElem, pipeline) { + BSONObj stage = stageElem.embeddedObjectUserCheck(); + if (str::equals(stage.firstElementFieldName(), "$out")) { + NamespaceString outputNs(db, stage.firstElement().str()); + uassert(17139, + mongoutils::str::stream() << "Invalid $out target namespace, " << outputNs.ns(), + outputNs.isValid()); + + ActionSet actions; + actions.addAction(ActionType::remove); + actions.addAction(ActionType::insert); + if (shouldBypassDocumentValidationForCommand(cmdObj)) { + actions.addAction(ActionType::bypassDocumentValidation); } + + out->push_back(Privilege(ResourcePattern::forExactNamespace(outputNs), actions)); } } - - void Pipeline::addRequiredPrivileges(Command* commandTemplate, - const string& db, - BSONObj cmdObj, - vector<Privilege>* out) { - ResourcePattern inputResource(commandTemplate->parseResourcePattern(db, cmdObj)); - uassert(17138, - mongoutils::str::stream() << "Invalid input resource, " << inputResource.toString(), - inputResource.isExactNamespacePattern()); - - out->push_back(Privilege(inputResource, ActionType::find)); - - BSONObj pipeline = cmdObj.getObjectField("pipeline"); - BSONForEach(stageElem, pipeline) { - BSONObj stage = stageElem.embeddedObjectUserCheck(); - if (str::equals(stage.firstElementFieldName(), "$out")) { - NamespaceString outputNs(db, stage.firstElement().str()); - uassert(17139, - mongoutils::str::stream() << "Invalid $out target namespace, " << - outputNs.ns(), - outputNs.isValid()); - - ActionSet actions; - actions.addAction(ActionType::remove); - actions.addAction(ActionType::insert); - if (shouldBypassDocumentValidationForCommand(cmdObj)) { - actions.addAction(ActionType::bypassDocumentValidation); - } - - out->push_back(Privilege(ResourcePattern::forExactNamespace(outputNs), actions)); - } +} + +intrusive_ptr<Pipeline> Pipeline::splitForSharded() { + // Create and initialize the shard spec we'll return. We start with an empty pipeline on the + // shards and all work being done in the merger. Optimizations can move operations between + // the pipelines to be more efficient. + intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx)); + shardPipeline->explain = explain; + + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); + Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); + Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); + + return shardPipeline; +} + +void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { + while (!mergePipe->sources.empty()) { + intrusive_ptr<DocumentSource> current = mergePipe->sources.front(); + mergePipe->sources.pop_front(); + + // Check if this source is splittable + SplittableDocumentSource* splittable = + dynamic_cast<SplittableDocumentSource*>(current.get()); + + if (!splittable) { + // move the source from the merger sources to the shard sources + shardPipe->sources.push_back(current); + } else { + // split this source into Merge and Shard sources + intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); + intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); + if (shardSource) + shardPipe->sources.push_back(shardSource); + if (mergeSource) + mergePipe->sources.push_front(mergeSource); + + break; } } +} + +void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, + Pipeline* mergePipe) { + while (!shardPipe->sources.empty() && + dynamic_cast<DocumentSourceUnwind*>(shardPipe->sources.back().get())) { + mergePipe->sources.push_front(shardPipe->sources.back()); + shardPipe->sources.pop_back(); + } +} + +void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, + Pipeline* mergePipe) { + DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery()); + if (mergeDeps.needWholeDocument) + return; // the merge needs all fields, so nothing we can do. + + // Empty project is "special" so if no fields are needed, we just ask for _id instead. + if (mergeDeps.fields.empty()) + mergeDeps.fields.insert("_id"); + + // Remove metadata from dependencies since it automatically flows through projection and we + // don't want to project it in to the document. + mergeDeps.needTextScore = false; + + // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of + // field dependencies. While this may not be 100% ideal in all cases, it is simple and + // avoids the worst cases by ensuring that: + // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of + // dependencies. This situation can happen when a $sort is before the first $project or + // $group. Without the optimization, the shards would have to reify and transmit full + // objects even though only a subset of fields are needed. + // 2) Optimization IS NOT applied immediately following a $project or $group since it would + // add an unnecessary project (and therefore a deep-copy). + for (size_t i = 0; i < shardPipe->sources.size(); i++) { + DepsTracker dt; // ignored + if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) + return; + } - intrusive_ptr<Pipeline> Pipeline::splitForSharded() { - // Create and initialize the shard spec we'll return. We start with an empty pipeline on the - // shards and all work being done in the merger. Optimizations can move operations between - // the pipelines to be more efficient. - intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx)); - shardPipeline->explain = explain; - - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); - Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); - Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); + // if we get here, add the project. + shardPipe->sources.push_back(DocumentSourceProject::createFromBson( + BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx)); +} - return shardPipeline; - } +BSONObj Pipeline::getInitialQuery() const { + if (sources.empty()) + return BSONObj(); - void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, - Pipeline* mergePipe) { - while (!mergePipe->sources.empty()) { - intrusive_ptr<DocumentSource> current = mergePipe->sources.front(); - mergePipe->sources.pop_front(); + /* look for an initial $match */ + DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources.front().get()); + if (!match) + return BSONObj(); - // Check if this source is splittable - SplittableDocumentSource* splittable = - dynamic_cast<SplittableDocumentSource*>(current.get()); + return match->getQuery(); +} - if (!splittable){ - // move the source from the merger sources to the shard sources - shardPipe->sources.push_back(current); - } - else { - // split this source into Merge and Shard sources - intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); - if (shardSource) shardPipe->sources.push_back(shardSource); - if (mergeSource) mergePipe->sources.push_front(mergeSource); - - break; - } - } +bool Pipeline::hasOutStage() const { + if (sources.empty()) { + return false; } - void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - while (!shardPipe->sources.empty() - && dynamic_cast<DocumentSourceUnwind*>(shardPipe->sources.back().get())) { - mergePipe->sources.push_front(shardPipe->sources.back()); - shardPipe->sources.pop_back(); - } + // The $out stage must be the last one in the pipeline, so check if the last stage is $out. + return dynamic_cast<DocumentSourceOut*>(sources.back().get()); +} + +Document Pipeline::serialize() const { + MutableDocument serialized; + // create an array out of the pipeline operations + vector<Value> array; + for (SourceContainer::const_iterator iter(sources.begin()), listEnd(sources.end()); + iter != listEnd; + ++iter) { + intrusive_ptr<DocumentSource> pSource(*iter); + pSource->serializeToArray(array); } - void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery()); - if (mergeDeps.needWholeDocument) - return; // the merge needs all fields, so nothing we can do. - - // Empty project is "special" so if no fields are needed, we just ask for _id instead. - if (mergeDeps.fields.empty()) - mergeDeps.fields.insert("_id"); - - // Remove metadata from dependencies since it automatically flows through projection and we - // don't want to project it in to the document. - mergeDeps.needTextScore = false; - - // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of - // field dependencies. While this may not be 100% ideal in all cases, it is simple and - // avoids the worst cases by ensuring that: - // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of - // dependencies. This situation can happen when a $sort is before the first $project or - // $group. Without the optimization, the shards would have to reify and transmit full - // objects even though only a subset of fields are needed. - // 2) Optimization IS NOT applied immediately following a $project or $group since it would - // add an unnecessary project (and therefore a deep-copy). - for (size_t i = 0; i < shardPipe->sources.size(); i++) { - DepsTracker dt; // ignored - if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) - return; - } + // add the top-level items to the command + serialized.setField(commandName, Value(pCtx->ns.coll())); + serialized.setField(pipelineName, Value(array)); - // if we get here, add the project. - shardPipe->sources.push_back( - DocumentSourceProject::createFromBson( - BSON("$project" << mergeDeps.toProjection()).firstElement(), - shardPipe->pCtx)); + if (explain) { + serialized.setField(explainName, Value(explain)); } - BSONObj Pipeline::getInitialQuery() const { - if (sources.empty()) - return BSONObj(); - - /* look for an initial $match */ - DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources.front().get()); - if (!match) - return BSONObj(); - - return match->getQuery(); + if (pCtx->extSortAllowed) { + serialized.setField("allowDiskUse", Value(true)); } - bool Pipeline::hasOutStage() const { - if (sources.empty()) { - return false; - } - - // The $out stage must be the last one in the pipeline, so check if the last stage is $out. - return dynamic_cast<DocumentSourceOut*>(sources.back().get()); + if (pCtx->bypassDocumentValidation) { + serialized.setField(bypassDocumentValidationCommandOption(), Value(true)); } - Document Pipeline::serialize() const { - MutableDocument serialized; - // create an array out of the pipeline operations - vector<Value> array; - for(SourceContainer::const_iterator iter(sources.begin()), - listEnd(sources.end()); - iter != listEnd; - ++iter) { - intrusive_ptr<DocumentSource> pSource(*iter); - pSource->serializeToArray(array); - } - - // add the top-level items to the command - serialized.setField(commandName, Value(pCtx->ns.coll())); - serialized.setField(pipelineName, Value(array)); - - if (explain) { - serialized.setField(explainName, Value(explain)); - } - - if (pCtx->extSortAllowed) { - serialized.setField("allowDiskUse", Value(true)); - } + return serialized.freeze(); +} - if (pCtx->bypassDocumentValidation) { - serialized.setField(bypassDocumentValidationCommandOption(), Value(true)); - } +void Pipeline::stitch() { + massert(16600, "should not have an empty pipeline", !sources.empty()); - return serialized.freeze(); + /* chain together the sources we found */ + DocumentSource* prevSource = sources.front().get(); + for (SourceContainer::iterator iter(sources.begin() + 1), listEnd(sources.end()); + iter != listEnd; + ++iter) { + intrusive_ptr<DocumentSource> pTemp(*iter); + pTemp->setSource(prevSource); + prevSource = pTemp.get(); } - - void Pipeline::stitch() { - massert(16600, "should not have an empty pipeline", - !sources.empty()); - - /* chain together the sources we found */ - DocumentSource* prevSource = sources.front().get(); - for(SourceContainer::iterator iter(sources.begin() + 1), - listEnd(sources.end()); - iter != listEnd; - ++iter) { - intrusive_ptr<DocumentSource> pTemp(*iter); - pTemp->setSource(prevSource); - prevSource = pTemp.get(); - } +} + +void Pipeline::run(BSONObjBuilder& result) { + // should not get here in the explain case + verify(!explain); + + // the array in which the aggregation results reside + // cant use subArrayStart() due to error handling + BSONArrayBuilder resultArray; + DocumentSource* finalSource = sources.back().get(); + while (boost::optional<Document> next = finalSource->getNext()) { + // add the document to the result set + BSONObjBuilder documentBuilder(resultArray.subobjStart()); + next->toBson(&documentBuilder); + documentBuilder.doneFast(); + // object will be too large, assert. the extra 1KB is for headers + uassert(16389, + str::stream() << "aggregation result exceeds maximum document size (" + << BSONObjMaxUserSize / (1024 * 1024) << "MB)", + resultArray.len() < BSONObjMaxUserSize - 1024); } - void Pipeline::run(BSONObjBuilder& result) { - // should not get here in the explain case - verify(!explain); - - // the array in which the aggregation results reside - // cant use subArrayStart() due to error handling - BSONArrayBuilder resultArray; - DocumentSource* finalSource = sources.back().get(); - while (boost::optional<Document> next = finalSource->getNext()) { - // add the document to the result set - BSONObjBuilder documentBuilder (resultArray.subobjStart()); - next->toBson(&documentBuilder); - documentBuilder.doneFast(); - // object will be too large, assert. the extra 1KB is for headers - uassert(16389, - str::stream() << "aggregation result exceeds maximum document size (" - << BSONObjMaxUserSize / (1024 * 1024) << "MB)", - resultArray.len() < BSONObjMaxUserSize - 1024); - } + resultArray.done(); + result.appendArray("result", resultArray.arr()); +} - resultArray.done(); - result.appendArray("result", resultArray.arr()); +vector<Value> Pipeline::writeExplainOps() const { + vector<Value> array; + for (SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) { + (*it)->serializeToArray(array, /*explain=*/true); } - - vector<Value> Pipeline::writeExplainOps() const { - vector<Value> array; - for(SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) { - (*it)->serializeToArray(array, /*explain=*/true); + return array; +} + +void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) { + sources.push_front(source); +} + +DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { + DepsTracker deps; + bool knowAllFields = false; + bool knowAllMeta = false; + for (size_t i = 0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) { + DepsTracker localDeps; + DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps); + + if (status == DocumentSource::NOT_SUPPORTED) { + // Assume this stage needs everything. We may still know something about our + // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or + // EXHAUSTIVE_META. + break; } - return array; - } - void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) { - sources.push_front(source); - } - - DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { - DepsTracker deps; - bool knowAllFields = false; - bool knowAllMeta = false; - for (size_t i=0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) { - DepsTracker localDeps; - DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps); - - if (status == DocumentSource::NOT_SUPPORTED) { - // Assume this stage needs everything. We may still know something about our - // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or - // EXHAUSTIVE_META. - break; - } - - if (!knowAllFields) { - deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end()); - if (localDeps.needWholeDocument) - deps.needWholeDocument = true; - knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS; - } - - if (!knowAllMeta) { - if (localDeps.needTextScore) - deps.needTextScore = true; - - knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; - } + if (!knowAllFields) { + deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end()); + if (localDeps.needWholeDocument) + deps.needWholeDocument = true; + knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS; } - if (!knowAllFields) - deps.needWholeDocument = true; // don't know all fields we need - - // NOTE This code assumes that textScore can only be generated by the initial query. - if (DocumentSourceMatch::isTextQuery(initialQuery)) { - // If doing a text query, assume we need the score if we can't prove we don't. - if (!knowAllMeta) + if (!knowAllMeta) { + if (localDeps.needTextScore) deps.needTextScore = true; + + knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; } - else { - // If we aren't doing a text query, then we don't need to ask for the textScore since we - // know it will be missing anyway. - deps.needTextScore = false; - } + } - return deps; + if (!knowAllFields) + deps.needWholeDocument = true; // don't know all fields we need + + // NOTE This code assumes that textScore can only be generated by the initial query. + if (DocumentSourceMatch::isTextQuery(initialQuery)) { + // If doing a text query, assume we need the score if we can't prove we don't. + if (!knowAllMeta) + deps.needTextScore = true; + } else { + // If we aren't doing a text query, then we don't need to ask for the textScore since we + // know it will be missing anyway. + deps.needTextScore = false; } -} // namespace mongo + + return deps; +} +} // namespace mongo |