summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp39
1 files changed, 35 insertions, 4 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 8c486e9ca4d..01171b86e6d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_unwind.h"
@@ -79,10 +80,11 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
pipeline->_sources.end(), parsedSources.begin(), parsedSources.end());
}
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
@@ -91,18 +93,47 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(stages, expCtx),
Pipeline::Deleter(expCtx->opCtx));
- auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ auto status = pipeline->validate();
if (!status.isOK()) {
return status;
}
+
pipeline->stitch();
return std::move(pipeline);
}
-Status Pipeline::ensureAllStagesAreInLegalPositions() const {
+Status Pipeline::validate() const {
+ // Verify that the specified namespace is valid for the initial stage of this pipeline.
+ const NamespaceString& nss = pCtx->ns;
+
+ if (_sources.empty()) {
+ if (nss.isCollectionlessAggregateNS()) {
+ return {ErrorCodes::InvalidNamespace,
+ "{aggregate: 1} is not valid for an empty pipeline."};
+ }
+ } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
+ // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
+ // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
+ const auto firstStage = _sources.front().get();
+
+ if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "{aggregate: 1} is not valid for '"
+ << firstStage->getSourceName()
+ << "'; a collection is required."};
+ }
+
+ if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "'" << firstStage->getSourceName()
+ << "' can only be run with {aggregate: 1}"};
+ }
+ }
+
+ // Verify that all stages of the pipeline are in legal positions.
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->isValidInitialSource() && i != 0) {
+ if (stage->isInitialSource() && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline."};