/**
* Copyright (c) 2011 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
#include
#include "mongo/base/error_codes.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using boost::intrusive_ptr;
using std::endl;
using std::ostringstream;
using std::string;
using std::vector;
namespace dps = ::mongo::dotted_path_support;
using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
using StreamType = DocumentSource::StageConstraints::StreamType;
Pipeline::Pipeline(const intrusive_ptr& pTheCtx) : pCtx(pTheCtx) {}
Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr& expCtx)
: _sources(std::move(stages)), pCtx(expCtx) {}
Pipeline::~Pipeline() {
invariant(_disposed);
}
StatusWith> Pipeline::parse(
const std::vector& rawPipeline, const intrusive_ptr& expCtx) {
return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
}
StatusWith> Pipeline::parseFacetPipeline(
const std::vector& rawPipeline, const intrusive_ptr& expCtx) {
return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
}
StatusWith> Pipeline::parseTopLevelOrFacetPipeline(
const std::vector& rawPipeline,
const intrusive_ptr& expCtx,
const bool isFacetPipeline) {
SourceContainer stages;
for (auto&& stageObj : rawPipeline) {
auto parsedSources = DocumentSource::parse(expCtx, stageObj);
stages.insert(stages.end(), parsedSources.begin(), parsedSources.end());
}
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
}
StatusWith> Pipeline::create(
SourceContainer stages, const intrusive_ptr& expCtx) {
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
}
StatusWith> Pipeline::createFacetPipeline(
SourceContainer stages, const intrusive_ptr& expCtx) {
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
}
StatusWith> Pipeline::createTopLevelOrFacetPipeline(
SourceContainer stages,
const intrusive_ptr& expCtx,
const bool isFacetPipeline) {
std::unique_ptr pipeline(new Pipeline(std::move(stages), expCtx),
PipelineDeleter(expCtx->opCtx));
try {
if (isFacetPipeline) {
pipeline->validateFacetPipeline();
} else {
pipeline->validatePipeline();
}
} catch (const DBException& ex) {
return ex.toStatus();
}
pipeline->stitch();
return std::move(pipeline);
}
void Pipeline::validatePipeline() const {
// Verify that the specified namespace is valid for the initial stage of this pipeline.
const NamespaceString& nss = pCtx->ns;
if (_sources.empty()) {
if (nss.isCollectionlessAggregateNS()) {
uasserted(ErrorCodes::InvalidNamespace,
"{aggregate: 1} is not valid for an empty pipeline.");
}
} else if (!dynamic_cast(_sources.front().get())) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStageConstraints = _sources.front()->constraints(_splitState);
if (nss.isCollectionlessAggregateNS() &&
!firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
<< _sources.front()->getSourceName()
<< "'; a collection is required.");
}
if (!nss.isCollectionlessAggregateNS() &&
firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
str::stream() << "'" << _sources.front()->getSourceName()
<< "' can only be run with {aggregate: 1}");
}
// If the first stage is a $changeStream stage, then all stages in the pipeline must be
// either $changeStream stages or whitelisted as being able to run in a change stream.
if (firstStageConstraints.isChangeStreamStage()) {
for (auto&& source : _sources) {
uassert(ErrorCodes::IllegalOperation,
str::stream() << source->getSourceName()
<< " is not permitted in a $changeStream pipeline",
source->constraints(_splitState).isAllowedInChangeStream());
}
}
}
// Verify that each stage is in a legal position within the pipeline.
ensureAllStagesAreInLegalPositions();
}
void Pipeline::validateFacetPipeline() const {
if (_sources.empty()) {
uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty");
}
for (auto&& stage : _sources) {
auto stageConstraints = stage->constraints(_splitState);
if (!stageConstraints.isAllowedInsideFacetStage()) {
uasserted(40600,
str::stream() << stage->getSourceName()
<< " is not allowed to be used within a $facet stage");
}
// We expect a stage within a $facet stage to have these properties.
invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
invariant(!stageConstraints.isIndependentOfAnyCollection);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
// first stage, and the 'ensureAllStagesAreInLegalPositions' method checks that there are no
// initial sources in positions 1...N, so we can just return its result directly.
ensureAllStagesAreInLegalPositions();
}
void Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
auto constraints = stage->constraints(_splitState);
// Verify that all stages adhere to their PositionRequirement constraints.
if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) {
uasserted(40602,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline.");
}
auto matchStage = dynamic_cast(stage.get());
if (i != 0 && matchStage && matchStage->isTextQuery()) {
uasserted(17313, "$match with $text is only allowed as the first pipeline stage");
}
if (constraints.requiredPosition == PositionRequirement::kLast &&
i != _sources.size() - 1) {
uasserted(40601,
str::stream() << stage->getSourceName()
<< " can only be the final stage in the pipeline");
}
++i;
// Verify that we are not attempting to run a mongoS-only stage on mongoD.
uassert(40644,
str::stream() << stage->getSourceName() << " can only be run on mongoS",
!(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
}
}
void Pipeline::optimizePipeline() {
SourceContainer optimizedSources;
SourceContainer::iterator itr = _sources.begin();
// We could be swapping around stages during this process, so disconnect the pipeline to prevent
// us from entering a state with dangling pointers.
unstitch();
while (itr != _sources.end()) {
invariant((*itr).get());
itr = (*itr).get()->optimizeAt(itr, &_sources);
}
// Once we have reached our final number of stages, optimize each individually.
for (auto&& source : _sources) {
if (auto out = source->optimize()) {
optimizedSources.push_back(out);
}
}
_sources.swap(optimizedSources);
stitch();
}
bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
auto pipelineElement = cmd["pipeline"];
if (pipelineElement.type() != BSONType::Array) {
return false;
}
for (auto stage : pipelineElement.Obj()) {
if (stage.type() != BSONType::Object) {
return false;
}
if (stage.Obj().hasField("$out")) {
return true;
}
}
return false;
}
void Pipeline::detachFromOperationContext() {
pCtx->opCtx = nullptr;
pCtx->mongoProcessInterface->setOperationContext(nullptr);
for (auto&& source : _sources) {
source->detachFromOperationContext();
}
}
void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
pCtx->opCtx = opCtx;
pCtx->mongoProcessInterface->setOperationContext(opCtx);
for (auto&& source : _sources) {
source->reattachToOperationContext(opCtx);
}
}
void Pipeline::dispose(OperationContext* opCtx) {
try {
pCtx->opCtx = opCtx;
// Make sure all stages are connected, in case we are being disposed via an error path and
// were not stitched at the time of the error.
stitch();
if (!_sources.empty()) {
_sources.back()->dispose();
}
_disposed = true;
} catch (...) {
std::terminate();
}
}
std::unique_ptr Pipeline::splitForSharded() {
invariant(!isSplitForShards());
invariant(!isSplitForMerge());
invariant(!_unsplitSources);
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
std::unique_ptr shardPipeline(new Pipeline(pCtx),
PipelineDeleter(pCtx->opCtx));
// Keep a copy of the original source list in case we need to reset the pipeline from split to
// unsplit later.
shardPipeline->_unsplitSources.emplace(_sources);
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
shardPipeline->_splitState = SplitState::kSplitForShards;
_splitState = SplitState::kSplitForMerge;
stitch();
return shardPipeline;
}
void Pipeline::unsplitFromSharded(
std::unique_ptr pipelineForMergingShard) {
invariant(isSplitForShards());
invariant(!isSplitForMerge());
invariant(pipelineForMergingShard);
invariant(_unsplitSources);
// Clear the merge source list so that destroying the pipeline object won't dispose of the
// stages. We still have a reference to each of the stages which will be moved back to the shard
// pipeline via '_unsplitSources'.
pipelineForMergingShard->_sources.clear();
pipelineForMergingShard.reset();
// Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional.
_sources = *_unsplitSources;
_unsplitSources.reset();
_splitState = SplitState::kUnsplit;
stitch();
}
void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
while (!mergePipe->_sources.empty()) {
intrusive_ptr current = mergePipe->_sources.front();
mergePipe->_sources.pop_front();
// Check if this source is splittable.
SplittableDocumentSource* splittable =
dynamic_cast(current.get());
if (!splittable) {
// Move the source from the merger _sources to the shard _sources.
shardPipe->_sources.push_back(current);
} else {
// Split this source into 'merge' and 'shard' _sources.
intrusive_ptr shardSource = splittable->getShardSource();
auto mergeSources = splittable->getMergeSources();
// A source may not simultaneously be present on both sides of the split.
invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
mergeSources.end());
if (shardSource)
shardPipe->_sources.push_back(shardSource);
// Add the stages in reverse order, so that they appear in the pipeline in the same
// order as they were returned by the stage.
for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
mergePipe->_sources.push_front(*it);
}
break;
}
}
}
void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
while (!shardPipe->_sources.empty() &&
dynamic_cast(shardPipe->_sources.back().get())) {
mergePipe->_sources.push_front(shardPipe->_sources.back());
shardPipe->_sources.pop_back();
}
}
void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
? DepsTracker::MetadataAvailable::kTextScore
: DepsTracker::MetadataAvailable::kNoMetadata;
DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
if (mergeDeps.needWholeDocument)
return; // the merge needs all fields, so nothing we can do.
// Empty project is "special" so if no fields are needed, we just ask for _id instead.
if (mergeDeps.fields.empty())
mergeDeps.fields.insert("_id");
// Remove metadata from dependencies since it automatically flows through projection and we
// don't want to project it in to the document.
mergeDeps.setNeedTextScore(false);
// HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
// field dependencies. While this may not be 100% ideal in all cases, it is simple and
// avoids the worst cases by ensuring that:
// 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
// dependencies. This situation can happen when a $sort is before the first $project or
// $group. Without the optimization, the shards would have to reify and transmit full
// objects even though only a subset of fields are needed.
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
for (auto&& source : shardPipe->_sources) {
DepsTracker dt(depsMetadata);
if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
// if we get here, add the project.
boost::intrusive_ptr project = DocumentSourceProject::createFromBson(
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
shardPipe->_sources.push_back(project);
}
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
/* look for an initial $match */
DocumentSourceMatch* match = dynamic_cast(_sources.front().get());
if (match) {
return match->getQuery();
}
DocumentSourceGeoNear* geoNear = dynamic_cast(_sources.front().get());
if (geoNear) {
return geoNear->getQuery();
}
return BSONObj();
}
bool Pipeline::needsPrimaryShardMerger() const {
return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
return stage->constraints(SplitState::kSplitForMerge).hostRequirement ==
HostTypeRequirement::kPrimaryShard;
});
}
bool Pipeline::needsMongosMerger() const {
return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
return stage->constraints(SplitState::kSplitForMerge).resolvedHostTypeRequirement(pCtx) ==
HostTypeRequirement::kMongoS;
});
}
bool Pipeline::needsShard() const {
return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
auto hostType = stage->constraints().resolvedHostTypeRequirement(pCtx);
return (hostType == HostTypeRequirement::kAnyShard ||
hostType == HostTypeRequirement::kPrimaryShard);
});
}
bool Pipeline::canRunOnMongos() const {
return _pipelineCanRunOnMongoS().isOK();
}
bool Pipeline::requiredToRunOnMongos() const {
invariant(!isSplitForShards());
for (auto&& stage : _sources) {
// If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
// as a whole is not required to run on mongoS.
if (isUnsplit() && dynamic_cast(stage.get())) {
return false;
}
auto hostRequirement = stage->constraints(_splitState).resolvedHostTypeRequirement(pCtx);
// If a mongoS-only stage occurs before a splittable stage, or if the pipeline is already
// split, this entire pipeline must run on mongoS.
if (hostRequirement == HostTypeRequirement::kMongoS) {
// Verify that the remainder of this pipeline can run on mongoS.
auto mongosRunStatus = _pipelineCanRunOnMongoS();
uassertStatusOKWithContext(mongosRunStatus,
str::stream() << stage->getSourceName()
<< " must run on mongoS, but cannot");
return true;
}
}
return false;
}
std::vector Pipeline::getInvolvedCollections() const {
std::vector collections;
for (auto&& source : _sources) {
source->addInvolvedCollections(&collections);
}
return collections;
}
vector Pipeline::serialize() const {
vector serializedSources;
for (auto&& source : _sources) {
source->serializeToArray(serializedSources);
}
return serializedSources;
}
void Pipeline::unstitch() {
for (auto&& stage : _sources) {
stage->setSource(nullptr);
}
}
void Pipeline::stitch() {
if (_sources.empty()) {
return;
}
// Chain together all the stages.
DocumentSource* prevSource = _sources.front().get();
prevSource->setSource(nullptr);
for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end());
iter != listEnd;
++iter) {
intrusive_ptr pTemp(*iter);
pTemp->setSource(prevSource);
prevSource = pTemp.get();
}
}
boost::optional Pipeline::getNext() {
invariant(!_sources.empty());
auto nextResult = _sources.back()->getNext();
while (nextResult.isPaused()) {
nextResult = _sources.back()->getNext();
}
return nextResult.isEOF() ? boost::none
: boost::optional{nextResult.releaseDocument()};
}
vector Pipeline::writeExplainOps(ExplainOptions::Verbosity verbosity) const {
vector array;
for (SourceContainer::const_iterator it = _sources.begin(); it != _sources.end(); ++it) {
(*it)->serializeToArray(array, verbosity);
}
return array;
}
void Pipeline::addInitialSource(intrusive_ptr source) {
if (!_sources.empty()) {
_sources.front()->setSource(source.get());
}
_sources.push_front(source);
}
void Pipeline::addFinalSource(intrusive_ptr source) {
if (!_sources.empty()) {
source->setSource(_sources.back().get());
}
_sources.push_back(source);
}
DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const {
DepsTracker deps(metadataAvailable);
const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables();
bool skipFieldsAndMetadataDeps = false;
bool knowAllFields = false;
bool knowAllMeta = false;
for (auto&& source : _sources) {
DepsTracker localDeps(deps.getMetadataAvailable());
DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
deps.vars.insert(localDeps.vars.begin(), localDeps.vars.end());
if ((skipFieldsAndMetadataDeps |= (status == DocumentSource::NOT_SUPPORTED))) {
// Assume this stage needs everything. We may still know something about our
// dependencies if an earlier stage returned EXHAUSTIVE_FIELDS or EXHAUSTIVE_META. If
// this scope has variables, we need to keep enumerating the remaining stages but will
// skip adding any further field or metadata dependencies.
if (scopeHasVariables) {
continue;
} else {
break;
}
}
if (!knowAllFields) {
deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
if (localDeps.needWholeDocument)
deps.needWholeDocument = true;
knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
}
if (!knowAllMeta) {
if (localDeps.getNeedTextScore())
deps.setNeedTextScore(true);
if (localDeps.getNeedSortKey())
deps.setNeedSortKey(true);
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
// If there are variables defined at this pipeline's scope, there may be dependencies upon
// them in subsequent stages. Keep enumerating.
if (knowAllMeta && knowAllFields && !scopeHasVariables) {
break;
}
}
if (!knowAllFields)
deps.needWholeDocument = true; // don't know all fields we need
if (metadataAvailable & DepsTracker::MetadataAvailable::kTextScore) {
// If there is a text score, assume we need to keep it if we can't prove we don't. If we are
// the first half of a pipeline which has been split, future stages might need it.
if (!knowAllMeta)
deps.setNeedTextScore(true);
} else {
// If there is no text score available, then we don't need to ask for it.
deps.setNeedTextScore(false);
}
return deps;
}
Status Pipeline::_pipelineCanRunOnMongoS() const {
for (auto&& stage : _sources) {
auto constraints = stage->constraints(_splitState);
auto hostRequirement = constraints.resolvedHostTypeRequirement(pCtx);
const bool needsShard = (hostRequirement == HostTypeRequirement::kAnyShard ||
hostRequirement == HostTypeRequirement::kPrimaryShard);
const bool mustWriteToDisk =
(constraints.diskRequirement == DiskUseRequirement::kWritesPersistentData);
const bool mayWriteTmpDataAndDiskUseIsAllowed =
(pCtx->allowDiskUse &&
constraints.diskRequirement == DiskUseRequirement::kWritesTmpData);
const bool needsDisk = (mustWriteToDisk || mayWriteTmpDataAndDiskUseIsAllowed);
const bool needsToBlock = (constraints.streamType == StreamType::kBlocking);
const bool blockingIsPermitted = !internalQueryProhibitBlockingMergeOnMongoS.load();
// If nothing prevents this stage from running on mongoS, continue to the next stage.
if (!needsShard && !needsDisk && (!needsToBlock || blockingIsPermitted)) {
continue;
}
// Otherwise, return an error with an explanation.
StringBuilder ss;
ss << stage->getSourceName();
if (needsShard) {
ss << " must run on a shard";
} else if (needsToBlock && !blockingIsPermitted) {
ss << " is a blocking stage; running these stages on mongoS is disabled";
} else if (mustWriteToDisk) {
ss << " must write to disk";
} else if (mayWriteTmpDataAndDiskUseIsAllowed) {
ss << " may write to disk when 'allowDiskUse' is enabled";
} else {
MONGO_UNREACHABLE;
}
return {ErrorCodes::IllegalOperation, ss.str()};
}
return Status::OK();
}
boost::intrusive_ptr Pipeline::popFrontWithCriteria(
StringData targetStageName, stdx::function predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
}
auto targetStage = _sources.front();
if (predicate && !predicate(targetStage.get())) {
return nullptr;
}
_sources.pop_front();
stitch();
return targetStage;
}
} // namespace mongo