/**
* Copyright (c) 2011 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using boost::intrusive_ptr;
using std::endl;
using std::ostringstream;
using std::string;
using std::vector;
const char Pipeline::commandName[] = "aggregate";
const char Pipeline::pipelineName[] = "pipeline";
const char Pipeline::explainName[] = "explain";
const char Pipeline::fromRouterName[] = "fromRouter";
const char Pipeline::serverPipelineName[] = "serverPipeline";
const char Pipeline::mongosPipelineName[] = "mongosPipeline";
Pipeline::Pipeline(const intrusive_ptr& pTheCtx)
: explain(false), pCtx(pTheCtx) {}
intrusive_ptr Pipeline::parseCommand(string& errmsg,
const BSONObj& cmdObj,
const intrusive_ptr& pCtx) {
intrusive_ptr pPipeline(new Pipeline(pCtx));
vector pipeline;
repl::ReadConcernArgs readConcernArgs;
/* gather the specification for the aggregation */
for (BSONObj::iterator cmdIterator = cmdObj.begin(); cmdIterator.more();) {
BSONElement cmdElement(cmdIterator.next());
const char* pFieldName = cmdElement.fieldName();
// ignore top-level fields prefixed with $. They are for the command processor, not us.
if (pFieldName[0] == '$') {
continue;
}
// maxTimeMS is also for the command processor.
if (str::equals(pFieldName, LiteParsedQuery::cmdOptionMaxTimeMS)) {
continue;
}
if (pFieldName == repl::ReadConcernArgs::kReadConcernFieldName) {
uassertStatusOK(readConcernArgs.initialize(cmdElement));
continue;
}
// ignore cursor options since they are handled externally.
if (str::equals(pFieldName, "cursor")) {
continue;
}
// ignore writeConcern since it's handled externally
if (str::equals(pFieldName, "writeConcern")) {
continue;
}
/* look for the aggregation command */
if (!strcmp(pFieldName, commandName)) {
continue;
}
/* check for the collection name */
if (!strcmp(pFieldName, pipelineName)) {
pipeline = cmdElement.Array();
continue;
}
/* check for explain option */
if (!strcmp(pFieldName, explainName)) {
pPipeline->explain = cmdElement.Bool();
continue;
}
/* if the request came from the router, we're in a shard */
if (!strcmp(pFieldName, fromRouterName)) {
pCtx->inShard = cmdElement.Bool();
continue;
}
if (str::equals(pFieldName, "allowDiskUse")) {
uassert(ErrorCodes::IllegalOperation,
"The 'allowDiskUse' option is not permitted in read-only mode.",
!storageGlobalParams.readOnly);
uassert(16949,
str::stream() << "allowDiskUse must be a bool, not a "
<< typeName(cmdElement.type()),
cmdElement.type() == Bool);
pCtx->extSortAllowed = cmdElement.Bool();
continue;
}
if (pFieldName == bypassDocumentValidationCommandOption()) {
pCtx->bypassDocumentValidation = cmdElement.trueValue();
continue;
}
/* we didn't recognize a field in the command */
ostringstream sb;
sb << "unrecognized field '" << cmdElement.fieldName() << "'";
errmsg = sb.str();
return intrusive_ptr();
}
/*
If we get here, we've harvested the fields we expect for a pipeline.
Set up the specified document source pipeline.
*/
SourceContainer& sources = pPipeline->sources; // shorthand
/* iterate over the steps in the pipeline */
const size_t nSteps = pipeline.size();
for (size_t iStep = 0; iStep < nSteps; ++iStep) {
/* pull out the pipeline element as an object */
BSONElement pipeElement(pipeline[iStep]);
uassert(15942,
str::stream() << "pipeline element " << iStep << " is not an object",
pipeElement.type() == Object);
sources.push_back(DocumentSource::parse(pCtx, pipeElement.Obj()));
if (sources.back()->isValidInitialSource()) {
uassert(28837,
str::stream() << sources.back()->getSourceName()
<< " is only valid as the first stage in a pipeline.",
iStep == 0);
}
if (dynamic_cast(sources.back().get())) {
uassert(16991, "$out can only be the final stage in the pipeline", iStep == nSteps - 1);
uassert(ErrorCodes::InvalidOptions,
"$out can only be used with the 'local' read concern level",
readConcernArgs.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
}
}
pPipeline->optimizePipeline();
return pPipeline;
}
void Pipeline::optimizePipeline() {
SourceContainer optimizedSources;
SourceContainer::iterator itr = sources.begin();
while (itr != sources.end() && std::next(itr) != sources.end()) {
invariant((*itr).get());
itr = (*itr).get()->optimizeAt(itr, &sources);
}
// Once we have reached our final number of stages, optimize each individually.
for (auto&& source : sources) {
if (auto out = source->optimize()) {
optimizedSources.push_back(out);
}
}
sources.swap(optimizedSources);
}
Status Pipeline::checkAuthForCommand(ClientBasic* client,
const std::string& db,
const BSONObj& cmdObj) {
NamespaceString inputNs(db, cmdObj.firstElement().str());
auto inputResource = ResourcePattern::forExactNamespace(inputNs);
uassert(17138,
mongoutils::str::stream() << "Invalid input namespace, " << inputNs.ns(),
inputNs.isValid());
std::vector privileges;
if (cmdObj.getFieldDotted("pipeline.0.$indexStats")) {
Privilege::addPrivilegeToPrivilegeVector(
&privileges,
Privilege(ResourcePattern::forAnyNormalResource(), ActionType::indexStats));
} else {
// If no source requiring an alternative permission scheme is specified then default to
// requiring find() privileges on the given namespace.
Privilege::addPrivilegeToPrivilegeVector(&privileges,
Privilege(inputResource, ActionType::find));
}
BSONObj pipeline = cmdObj.getObjectField("pipeline");
BSONForEach(stageElem, pipeline) {
BSONObj stage = stageElem.embeddedObjectUserCheck();
StringData stageName = stage.firstElementFieldName();
if (stageName == "$out" && stage.firstElementType() == String) {
NamespaceString outputNs(db, stage.firstElement().str());
uassert(17139,
mongoutils::str::stream() << "Invalid $out target namespace, " << outputNs.ns(),
outputNs.isValid());
ActionSet actions;
actions.addAction(ActionType::remove);
actions.addAction(ActionType::insert);
if (shouldBypassDocumentValidationForCommand(cmdObj)) {
actions.addAction(ActionType::bypassDocumentValidation);
}
Privilege::addPrivilegeToPrivilegeVector(
&privileges, Privilege(ResourcePattern::forExactNamespace(outputNs), actions));
} else if (stageName == "$lookup" && stage.firstElementType() == Object) {
NamespaceString fromNs(db, stage.firstElement()["from"].str());
Privilege::addPrivilegeToPrivilegeVector(
&privileges,
Privilege(ResourcePattern::forExactNamespace(fromNs), ActionType::find));
}
}
if (AuthorizationSession::get(client)->isAuthorizedForPrivileges(privileges))
return Status::OK();
return Status(ErrorCodes::Unauthorized, "unauthorized");
}
bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
if (cmd.hasField("pipeline") == false) {
return false;
}
auto stages = cmd["pipeline"].Array();
for (auto stage : stages) {
if (stage.Obj().hasField("$out")) {
return true;
}
}
return false;
}
void Pipeline::detachFromOperationContext() {
pCtx->opCtx = nullptr;
for (auto* source : sourcesNeedingMongod) {
source->setOperationContext(nullptr);
}
}
void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
invariant(pCtx->opCtx == nullptr);
pCtx->opCtx = opCtx;
for (auto* source : sourcesNeedingMongod) {
source->setOperationContext(opCtx);
}
}
intrusive_ptr Pipeline::splitForSharded() {
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
intrusive_ptr shardPipeline(new Pipeline(pCtx));
shardPipeline->explain = explain;
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
return shardPipeline;
}
void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
while (!mergePipe->sources.empty()) {
intrusive_ptr current = mergePipe->sources.front();
mergePipe->sources.pop_front();
// Check if this source is splittable
SplittableDocumentSource* splittable =
dynamic_cast(current.get());
if (!splittable) {
// move the source from the merger sources to the shard sources
shardPipe->sources.push_back(current);
} else {
// split this source into Merge and Shard sources
intrusive_ptr shardSource = splittable->getShardSource();
intrusive_ptr mergeSource = splittable->getMergeSource();
if (shardSource)
shardPipe->sources.push_back(shardSource);
if (mergeSource)
mergePipe->sources.push_front(mergeSource);
break;
}
}
}
void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
while (!shardPipe->sources.empty() &&
dynamic_cast(shardPipe->sources.back().get())) {
mergePipe->sources.push_front(shardPipe->sources.back());
shardPipe->sources.pop_back();
}
}
void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery());
if (mergeDeps.needWholeDocument)
return; // the merge needs all fields, so nothing we can do.
// Empty project is "special" so if no fields are needed, we just ask for _id instead.
if (mergeDeps.fields.empty())
mergeDeps.fields.insert("_id");
// Remove metadata from dependencies since it automatically flows through projection and we
// don't want to project it in to the document.
mergeDeps.needTextScore = false;
// HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
// field dependencies. While this may not be 100% ideal in all cases, it is simple and
// avoids the worst cases by ensuring that:
// 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
// dependencies. This situation can happen when a $sort is before the first $project or
// $group. Without the optimization, the shards would have to reify and transmit full
// objects even though only a subset of fields are needed.
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
for (auto&& source : shardPipe->sources) {
DepsTracker dt;
if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
// if we get here, add the project.
shardPipe->sources.push_back(DocumentSourceProject::createFromBson(
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx));
}
BSONObj Pipeline::getInitialQuery() const {
if (sources.empty())
return BSONObj();
/* look for an initial $match */
DocumentSourceMatch* match = dynamic_cast(sources.front().get());
if (match) {
return match->getQuery();
}
DocumentSourceGeoNear* geoNear = dynamic_cast(sources.front().get());
if (geoNear) {
return geoNear->getQuery();
}
return BSONObj();
}
bool Pipeline::needsPrimaryShardMerger() const {
for (auto&& source : sources) {
if (source->needsPrimaryShard()) {
return true;
}
}
return false;
}
std::vector Pipeline::getInvolvedCollections() const {
std::vector collections;
for (auto&& source : sources) {
source->addInvolvedCollections(&collections);
}
return collections;
}
Document Pipeline::serialize() const {
MutableDocument serialized;
// create an array out of the pipeline operations
vector array;
for (SourceContainer::const_iterator iter(sources.begin()), listEnd(sources.end());
iter != listEnd;
++iter) {
intrusive_ptr pSource(*iter);
pSource->serializeToArray(array);
}
// add the top-level items to the command
serialized.setField(commandName, Value(pCtx->ns.coll()));
serialized.setField(pipelineName, Value(array));
if (explain) {
serialized.setField(explainName, Value(explain));
}
if (pCtx->extSortAllowed) {
serialized.setField("allowDiskUse", Value(true));
}
if (pCtx->bypassDocumentValidation) {
serialized.setField(bypassDocumentValidationCommandOption(), Value(true));
}
return serialized.freeze();
}
void Pipeline::stitch() {
massert(16600, "should not have an empty pipeline", !sources.empty());
/* chain together the sources we found */
DocumentSource* prevSource = sources.front().get();
for (SourceContainer::iterator iter(++sources.begin()), listEnd(sources.end()); iter != listEnd;
++iter) {
intrusive_ptr pTemp(*iter);
pTemp->setSource(prevSource);
prevSource = pTemp.get();
}
// Cache the document sources that have a MongodInterface to avoid the cost of a dynamic cast
// when updating the OperationContext of their DBDirectClients while executing the pipeline.
for (auto&& source : sources) {
if (auto* needsMongod = dynamic_cast(source.get())) {
sourcesNeedingMongod.push_back(needsMongod);
}
}
}
void Pipeline::run(BSONObjBuilder& result) {
// should not get here in the explain case
verify(!explain);
// the array in which the aggregation results reside
// cant use subArrayStart() due to error handling
BSONArrayBuilder resultArray;
DocumentSource* finalSource = sources.back().get();
while (boost::optional next = finalSource->getNext()) {
// add the document to the result set
BSONObjBuilder documentBuilder(resultArray.subobjStart());
next->toBson(&documentBuilder);
documentBuilder.doneFast();
// object will be too large, assert. the extra 1KB is for headers
uassert(16389,
str::stream() << "aggregation result exceeds maximum document size ("
<< BSONObjMaxUserSize / (1024 * 1024) << "MB)",
resultArray.len() < BSONObjMaxUserSize - 1024);
}
resultArray.done();
result.appendArray("result", resultArray.arr());
}
vector Pipeline::writeExplainOps() const {
vector array;
for (SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) {
(*it)->serializeToArray(array, /*explain=*/true);
}
return array;
}
void Pipeline::addInitialSource(intrusive_ptr source) {
sources.push_front(source);
}
DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
DepsTracker deps;
bool knowAllFields = false;
bool knowAllMeta = false;
for (auto&& source : sources) {
DepsTracker localDeps;
DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
if (status == DocumentSource::NOT_SUPPORTED) {
// Assume this stage needs everything. We may still know something about our
// dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
// EXHAUSTIVE_META.
break;
}
if (!knowAllFields) {
deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
if (localDeps.needWholeDocument)
deps.needWholeDocument = true;
knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
}
if (!knowAllMeta) {
if (localDeps.needTextScore)
deps.needTextScore = true;
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
if (knowAllMeta && knowAllFields) {
break;
}
}
if (!knowAllFields)
deps.needWholeDocument = true; // don't know all fields we need
// NOTE This code assumes that textScore can only be generated by the initial query.
if (DocumentSourceMatch::isTextQuery(initialQuery)) {
// If doing a text query, assume we need the score if we can't prove we don't.
if (!knowAllMeta)
deps.needTextScore = true;
} else {
// If we aren't doing a text query, then we don't need to ask for the textScore since we
// know it will be missing anyway.
deps.needTextScore = false;
}
return deps;
}
} // namespace mongo