/**
* Copyright (c) 2011 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using boost::intrusive_ptr;
using std::endl;
using std::ostringstream;
using std::string;
using std::vector;
const char Pipeline::commandName[] = "aggregate";
const char Pipeline::pipelineName[] = "pipeline";
const char Pipeline::explainName[] = "explain";
const char Pipeline::fromRouterName[] = "fromRouter";
const char Pipeline::serverPipelineName[] = "serverPipeline";
const char Pipeline::mongosPipelineName[] = "mongosPipeline";
Pipeline::Pipeline(const intrusive_ptr& pTheCtx)
: explain(false), pCtx(pTheCtx) {}
/* this structure is used to make a lookup table of operators */
struct StageDesc {
const char* pName;
intrusive_ptr(*pFactory)(BSONElement, const intrusive_ptr&);
};
/* this table must be in alphabetical order by name for bsearch() */
static const StageDesc stageDesc[] = {
{DocumentSourceGeoNear::geoNearName, DocumentSourceGeoNear::createFromBson},
{DocumentSourceGroup::groupName, DocumentSourceGroup::createFromBson},
{DocumentSourceLimit::limitName, DocumentSourceLimit::createFromBson},
{DocumentSourceMatch::matchName, DocumentSourceMatch::createFromBson},
{DocumentSourceMergeCursors::name, DocumentSourceMergeCursors::createFromBson},
{DocumentSourceOut::outName, DocumentSourceOut::createFromBson},
{DocumentSourceProject::projectName, DocumentSourceProject::createFromBson},
{DocumentSourceRedact::redactName, DocumentSourceRedact::createFromBson},
{DocumentSourceSkip::skipName, DocumentSourceSkip::createFromBson},
{DocumentSourceSort::sortName, DocumentSourceSort::createFromBson},
{DocumentSourceUnwind::unwindName, DocumentSourceUnwind::createFromBson},
};
static const size_t nStageDesc = sizeof(stageDesc) / sizeof(StageDesc);
static int stageDescCmp(const void* pL, const void* pR) {
return strcmp(((const StageDesc*)pL)->pName, ((const StageDesc*)pR)->pName);
}
intrusive_ptr Pipeline::parseCommand(string& errmsg,
const BSONObj& cmdObj,
const intrusive_ptr& pCtx) {
intrusive_ptr pPipeline(new Pipeline(pCtx));
vector pipeline;
/* gather the specification for the aggregation */
for (BSONObj::iterator cmdIterator = cmdObj.begin(); cmdIterator.more();) {
BSONElement cmdElement(cmdIterator.next());
const char* pFieldName = cmdElement.fieldName();
// ignore top-level fields prefixed with $. They are for the command processor, not us.
if (pFieldName[0] == '$') {
continue;
}
// maxTimeMS is also for the command processor.
if (pFieldName == LiteParsedQuery::cmdOptionMaxTimeMS) {
continue;
}
// ignore cursor options since they are handled externally.
if (str::equals(pFieldName, "cursor")) {
continue;
}
/* look for the aggregation command */
if (!strcmp(pFieldName, commandName)) {
continue;
}
/* check for the collection name */
if (!strcmp(pFieldName, pipelineName)) {
pipeline = cmdElement.Array();
continue;
}
/* check for explain option */
if (!strcmp(pFieldName, explainName)) {
pPipeline->explain = cmdElement.Bool();
continue;
}
/* if the request came from the router, we're in a shard */
if (!strcmp(pFieldName, fromRouterName)) {
pCtx->inShard = cmdElement.Bool();
continue;
}
if (str::equals(pFieldName, "allowDiskUse")) {
uassert(16949,
str::stream() << "allowDiskUse must be a bool, not a "
<< typeName(cmdElement.type()),
cmdElement.type() == Bool);
pCtx->extSortAllowed = cmdElement.Bool();
continue;
}
if (pFieldName == bypassDocumentValidationCommandOption()) {
pCtx->bypassDocumentValidation = cmdElement.trueValue();
continue;
}
/* we didn't recognize a field in the command */
ostringstream sb;
sb << "unrecognized field '" << cmdElement.fieldName() << "'";
errmsg = sb.str();
return intrusive_ptr();
}
/*
If we get here, we've harvested the fields we expect for a pipeline.
Set up the specified document source pipeline.
*/
SourceContainer& sources = pPipeline->sources; // shorthand
/* iterate over the steps in the pipeline */
const size_t nSteps = pipeline.size();
for (size_t iStep = 0; iStep < nSteps; ++iStep) {
/* pull out the pipeline element as an object */
BSONElement pipeElement(pipeline[iStep]);
uassert(15942,
str::stream() << "pipeline element " << iStep << " is not an object",
pipeElement.type() == Object);
BSONObj bsonObj(pipeElement.Obj());
// Parse a pipeline stage from 'bsonObj'.
uassert(16435,
"A pipeline stage specification object must contain exactly one field.",
bsonObj.nFields() == 1);
BSONElement stageSpec = bsonObj.firstElement();
const char* stageName = stageSpec.fieldName();
// Create a DocumentSource pipeline stage from 'stageSpec'.
StageDesc key;
key.pName = stageName;
const StageDesc* pDesc =
(const StageDesc*)bsearch(&key, stageDesc, nStageDesc, sizeof(StageDesc), stageDescCmp);
uassert(16436,
str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'",
pDesc);
intrusive_ptr stage = pDesc->pFactory(stageSpec, pCtx);
verify(stage);
sources.push_back(stage);
// TODO find a good general way to check stages that must be first syntactically
if (dynamic_cast(stage.get())) {
uassert(16991, "$out can only be the final stage in the pipeline", iStep == nSteps - 1);
}
}
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
Optimizations::Local::moveMatchBeforeSort(pPipeline.get());
Optimizations::Local::moveSkipAndLimitBeforeProject(pPipeline.get());
Optimizations::Local::moveLimitBeforeSkip(pPipeline.get());
Optimizations::Local::coalesceAdjacent(pPipeline.get());
Optimizations::Local::optimizeEachDocumentSource(pPipeline.get());
Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get());
return pPipeline;
}
void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) {
// TODO Keep moving matches across multiple sorts as moveLimitBeforeSkip does below.
// TODO Check sort for limit. Not an issue currently due to order optimizations are applied,
// but should be fixed.
SourceContainer& sources = pipeline->sources;
for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) {
intrusive_ptr& pSource = sources[srci];
DocumentSourceMatch* match = dynamic_cast(pSource.get());
if (match && !match->isTextQuery()) {
intrusive_ptr& pPrevious = sources[srci - 1];
if (dynamic_cast(pPrevious.get())) {
/* swap this item with the previous */
intrusive_ptr pTemp(pPrevious);
pPrevious = pSource;
pSource = pTemp;
}
}
}
}
void Pipeline::Optimizations::Local::moveSkipAndLimitBeforeProject(Pipeline* pipeline) {
SourceContainer& sources = pipeline->sources;
if (sources.empty())
return;
for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) {
// This optimization only applies when a $project comes before a $skip or $limit.
auto project = dynamic_cast(sources[i - 1].get());
if (!project)
continue;
auto skip = dynamic_cast(sources[i].get());
auto limit = dynamic_cast(sources[i].get());
if (!(skip || limit))
continue;
swap(sources[i], sources[i - 1]);
// Start at back again. This is needed to handle cases with more than 1 $skip or
// $limit (S means skip, L means limit, P means project)
//
// These would work without second pass (assuming back to front ordering)
// PS -> SP
// PL -> LP
// PPL -> LPP
// PPS -> SPP
//
// The following cases need a second pass to handle the second skip or limit
// PLL -> LLP
// PPLL -> LLPP
// PLPL -> LLPP
i = sources.size(); // decremented before next pass
}
}
void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) {
SourceContainer& sources = pipeline->sources;
if (sources.empty())
return;
for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) {
DocumentSourceLimit* limit = dynamic_cast(sources[i].get());
DocumentSourceSkip* skip = dynamic_cast(sources[i - 1].get());
if (limit && skip) {
// Increase limit by skip since the skipped docs now pass through the $limit
limit->setLimit(limit->getLimit() + skip->getSkip());
swap(sources[i], sources[i - 1]);
// Start at back again. This is needed to handle cases with more than 1 $limit
// (S means skip, L means limit)
//
// These two would work without second pass (assuming back to front ordering)
// SL -> LS
// SSL -> LSS
//
// The following cases need a second pass to handle the second limit
// SLL -> LLS
// SSLL -> LLSS
// SLSL -> LLSS
i = sources.size(); // decremented before next pass
}
}
}
void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) {
SourceContainer& sources = pipeline->sources;
if (sources.empty())
return;
// move all sources to a temporary list
SourceContainer tempSources;
sources.swap(tempSources);
// move the first one to the final list
sources.push_back(tempSources[0]);
// run through the sources, coalescing them or keeping them
for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) {
// If we can't coalesce the source with the last, then move it
// to the final list, and make it the new last. (If we succeeded,
// then we're still on the same last, and there's no need to move
// or do anything with the source -- the destruction of tempSources
// will take care of the rest.)
intrusive_ptr& pLastSource = sources.back();
intrusive_ptr& pTemp = tempSources[tempi];
verify(pTemp && pLastSource);
if (!pLastSource->coalesce(pTemp))
sources.push_back(pTemp);
}
}
void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) {
SourceContainer& sources = pipeline->sources;
SourceContainer newSources;
for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) {
if (auto out = (*it)->optimize()) {
newSources.push_back(std::move(out));
}
}
pipeline->sources = std::move(newSources);
}
void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) {
SourceContainer& sources = pipeline->sources;
if (sources.size() >= 2 && dynamic_cast(sources[0].get())) {
if (DocumentSourceMatch* match = dynamic_cast(sources[1].get())) {
const BSONObj redactSafePortion = match->redactSafePortion();
if (!redactSafePortion.isEmpty()) {
sources.push_front(DocumentSourceMatch::createFromBson(
BSON("$match" << redactSafePortion).firstElement(), pipeline->pCtx));
}
}
}
}
void Pipeline::addRequiredPrivileges(Command* commandTemplate,
const string& db,
BSONObj cmdObj,
vector* out) {
ResourcePattern inputResource(commandTemplate->parseResourcePattern(db, cmdObj));
uassert(17138,
mongoutils::str::stream() << "Invalid input resource, " << inputResource.toString(),
inputResource.isExactNamespacePattern());
out->push_back(Privilege(inputResource, ActionType::find));
BSONObj pipeline = cmdObj.getObjectField("pipeline");
BSONForEach(stageElem, pipeline) {
BSONObj stage = stageElem.embeddedObjectUserCheck();
if (str::equals(stage.firstElementFieldName(), "$out")) {
NamespaceString outputNs(db, stage.firstElement().str());
uassert(17139,
mongoutils::str::stream() << "Invalid $out target namespace, " << outputNs.ns(),
outputNs.isValid());
ActionSet actions;
actions.addAction(ActionType::remove);
actions.addAction(ActionType::insert);
if (shouldBypassDocumentValidationForCommand(cmdObj)) {
actions.addAction(ActionType::bypassDocumentValidation);
}
out->push_back(Privilege(ResourcePattern::forExactNamespace(outputNs), actions));
}
}
}
intrusive_ptr Pipeline::splitForSharded() {
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
intrusive_ptr shardPipeline(new Pipeline(pCtx));
shardPipeline->explain = explain;
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
return shardPipeline;
}
void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
while (!mergePipe->sources.empty()) {
intrusive_ptr current = mergePipe->sources.front();
mergePipe->sources.pop_front();
// Check if this source is splittable
SplittableDocumentSource* splittable =
dynamic_cast(current.get());
if (!splittable) {
// move the source from the merger sources to the shard sources
shardPipe->sources.push_back(current);
} else {
// split this source into Merge and Shard sources
intrusive_ptr shardSource = splittable->getShardSource();
intrusive_ptr mergeSource = splittable->getMergeSource();
if (shardSource)
shardPipe->sources.push_back(shardSource);
if (mergeSource)
mergePipe->sources.push_front(mergeSource);
break;
}
}
}
void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
while (!shardPipe->sources.empty() &&
dynamic_cast(shardPipe->sources.back().get())) {
mergePipe->sources.push_front(shardPipe->sources.back());
shardPipe->sources.pop_back();
}
}
void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery());
if (mergeDeps.needWholeDocument)
return; // the merge needs all fields, so nothing we can do.
// Empty project is "special" so if no fields are needed, we just ask for _id instead.
if (mergeDeps.fields.empty())
mergeDeps.fields.insert("_id");
// Remove metadata from dependencies since it automatically flows through projection and we
// don't want to project it in to the document.
mergeDeps.needTextScore = false;
// HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
// field dependencies. While this may not be 100% ideal in all cases, it is simple and
// avoids the worst cases by ensuring that:
// 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
// dependencies. This situation can happen when a $sort is before the first $project or
// $group. Without the optimization, the shards would have to reify and transmit full
// objects even though only a subset of fields are needed.
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
for (size_t i = 0; i < shardPipe->sources.size(); i++) {
DepsTracker dt; // ignored
if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
// if we get here, add the project.
shardPipe->sources.push_back(DocumentSourceProject::createFromBson(
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx));
}
BSONObj Pipeline::getInitialQuery() const {
if (sources.empty())
return BSONObj();
/* look for an initial $match */
DocumentSourceMatch* match = dynamic_cast(sources.front().get());
if (!match)
return BSONObj();
return match->getQuery();
}
bool Pipeline::hasOutStage() const {
if (sources.empty()) {
return false;
}
// The $out stage must be the last one in the pipeline, so check if the last stage is $out.
return dynamic_cast(sources.back().get());
}
Document Pipeline::serialize() const {
MutableDocument serialized;
// create an array out of the pipeline operations
vector array;
for (SourceContainer::const_iterator iter(sources.begin()), listEnd(sources.end());
iter != listEnd;
++iter) {
intrusive_ptr pSource(*iter);
pSource->serializeToArray(array);
}
// add the top-level items to the command
serialized.setField(commandName, Value(pCtx->ns.coll()));
serialized.setField(pipelineName, Value(array));
if (explain) {
serialized.setField(explainName, Value(explain));
}
if (pCtx->extSortAllowed) {
serialized.setField("allowDiskUse", Value(true));
}
if (pCtx->bypassDocumentValidation) {
serialized.setField(bypassDocumentValidationCommandOption(), Value(true));
}
return serialized.freeze();
}
void Pipeline::stitch() {
massert(16600, "should not have an empty pipeline", !sources.empty());
/* chain together the sources we found */
DocumentSource* prevSource = sources.front().get();
for (SourceContainer::iterator iter(sources.begin() + 1), listEnd(sources.end());
iter != listEnd;
++iter) {
intrusive_ptr pTemp(*iter);
pTemp->setSource(prevSource);
prevSource = pTemp.get();
}
}
void Pipeline::run(BSONObjBuilder& result) {
// should not get here in the explain case
verify(!explain);
// the array in which the aggregation results reside
// cant use subArrayStart() due to error handling
BSONArrayBuilder resultArray;
DocumentSource* finalSource = sources.back().get();
while (boost::optional next = finalSource->getNext()) {
// add the document to the result set
BSONObjBuilder documentBuilder(resultArray.subobjStart());
next->toBson(&documentBuilder);
documentBuilder.doneFast();
// object will be too large, assert. the extra 1KB is for headers
uassert(16389,
str::stream() << "aggregation result exceeds maximum document size ("
<< BSONObjMaxUserSize / (1024 * 1024) << "MB)",
resultArray.len() < BSONObjMaxUserSize - 1024);
}
resultArray.done();
result.appendArray("result", resultArray.arr());
}
vector Pipeline::writeExplainOps() const {
vector array;
for (SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) {
(*it)->serializeToArray(array, /*explain=*/true);
}
return array;
}
void Pipeline::addInitialSource(intrusive_ptr source) {
sources.push_front(source);
}
DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
DepsTracker deps;
bool knowAllFields = false;
bool knowAllMeta = false;
for (size_t i = 0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) {
DepsTracker localDeps;
DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps);
if (status == DocumentSource::NOT_SUPPORTED) {
// Assume this stage needs everything. We may still know something about our
// dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
// EXHAUSTIVE_META.
break;
}
if (!knowAllFields) {
deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
if (localDeps.needWholeDocument)
deps.needWholeDocument = true;
knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
}
if (!knowAllMeta) {
if (localDeps.needTextScore)
deps.needTextScore = true;
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
}
if (!knowAllFields)
deps.needWholeDocument = true; // don't know all fields we need
// NOTE This code assumes that textScore can only be generated by the initial query.
if (DocumentSourceMatch::isTextQuery(initialQuery)) {
// If doing a text query, assume we need the score if we can't prove we don't.
if (!knowAllMeta)
deps.needTextScore = true;
} else {
// If we aren't doing a text query, then we don't need to ask for the textScore since we
// know it will be missing anyway.
deps.needTextScore = false;
}
return deps;
}
} // namespace mongo