diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 8c486e9ca4d..01171b86e6d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_unwind.h" @@ -79,10 +80,11 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse( pipeline->_sources.end(), parsedSources.begin(), parsedSources.end()); } - auto status = pipeline->ensureAllStagesAreInLegalPositions(); + auto status = pipeline->validate(); if (!status.isOK()) { return status; } + pipeline->stitch(); return std::move(pipeline); } @@ -91,18 +93,47 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx), Pipeline::Deleter(expCtx->opCtx)); - auto status = pipeline->ensureAllStagesAreInLegalPositions(); + auto status = pipeline->validate(); if (!status.isOK()) { return status; } + pipeline->stitch(); return std::move(pipeline); } -Status Pipeline::ensureAllStagesAreInLegalPositions() const { +Status Pipeline::validate() const { + // Verify that the specified namespace is valid for the initial stage of this pipeline. + const NamespaceString& nss = pCtx->ns; + + if (_sources.empty()) { + if (nss.isCollectionlessAggregateNS()) { + return {ErrorCodes::InvalidNamespace, + "{aggregate: 1} is not valid for an empty pipeline."}; + } + } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { + // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, + // {aggregate: 1} is only valid for collectionless sources, and vice-versa. + const auto firstStage = _sources.front().get(); + + if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "{aggregate: 1} is not valid for '" + << firstStage->getSourceName() + << "'; a collection is required."}; + } + + if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "'" << firstStage->getSourceName() + << "' can only be run with {aggregate: 1}"}; + } + } + + // Verify that all stages of the pipeline are in legal positions. size_t i = 0; for (auto&& stage : _sources) { - if (stage->isValidInitialSource() && i != 0) { + if (stage->isInitialSource() && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline."}; |