summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source.h8
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h13
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h15
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline.h6
5 files changed, 34 insertions, 14 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 8f1c2fef4a0..134a8a86e3d 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -529,14 +529,18 @@ public:
/**
* Returns a source to be run on the shards, or NULL if no work should be done on the shards for
* this stage. Must not mutate the existing source object; if different behaviour is required in
- * the split-pipeline case, a new source should be created and configured appropriately.
+ * the split-pipeline case, a new source should be created and configured appropriately. It is
+ * an error for getShardSource() to return a pointer to the same object as getMergeSource(),
+ * since this can result in the source being stitched into both the shard and merge pipelines
+ * when the latter is executed on mongoS.
*/
virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
/**
* Returns a source that combines results from the shards, or NULL if no work should be done in
* the merge pipeline for this stage. Must not mutate the existing source object; if different
- * behaviour is required, a new source should be created and configured appropriately.
+ * behaviour is required, a new source should be created and configured appropriately. It is an
+ * error for getMergeSource() to return a pointer to the same object as getShardSource().
*/
virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0;
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index c18a6d301a6..f9ac84c555f 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -49,9 +49,10 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
- DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- HostTypeRequirement mergeType)
- : DocumentSource(expCtx), _mergeType(mergeType) {}
+ static boost::intrusive_ptr<DocumentSource> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, HostTypeRequirement mergeType) {
+ return new DocumentSourceInternalSplitPipeline(expCtx, mergeType);
+ }
const char* getSourceName() const final {
return kStageName.rawData();
@@ -62,7 +63,7 @@ public:
}
boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
+ return DocumentSourceInternalSplitPipeline::create(pExpCtx, _mergeType);
}
StageConstraints constraints() const final {
@@ -74,6 +75,10 @@ public:
GetNextResult getNext() final;
private:
+ DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ HostTypeRequirement mergeType)
+ : DocumentSource(expCtx), _mergeType(mergeType) {}
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
};
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 88acfa8b45a..f5f93fff1a4 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -68,13 +68,22 @@ public:
static boost::intrusive_ptr<DocumentSourceLimit> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Running on shard as well is an optimization.
+ /**
+ * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on
+ * the shards is an optimization, but is not strictly necessary in order to produce correct
+ * pipeline output.
+ */
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return this;
}
+
+ /**
+ * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the
+ * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the
+ * merging host in order to produce correct pipeline output.
+ */
boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
+ return DocumentSourceLimit::create(pExpCtx, _limit);
}
long long getLimit() const {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 5c2ce92237f..89975fc4cf2 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -272,8 +272,7 @@ void Pipeline::dispose(OperationContext* opCtx) {
pCtx->opCtx = opCtx;
// Make sure all stages are connected, in case we are being disposed via an error path and
- // were
- // not stitched at the time of the error.
+ // were not stitched at the time of the error.
stitch();
if (!_sources.empty()) {
@@ -332,6 +331,7 @@ void Pipeline::unsplitFromSharded(
_unsplitSources.reset();
_splitForSharded = false;
+
stitch();
}
@@ -351,6 +351,8 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel
// split this source into Merge and Shard _sources
intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource();
+ invariant(shardSource != mergeSource);
+
if (shardSource)
shardPipe->_sources.push_back(shardSource);
if (mergeSource)
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 6f46bf75476..35b18954fdd 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -196,7 +196,7 @@ public:
* Returns true if this pipeline is the part of a split pipeline which should be targeted to the
* shards.
*/
- bool isSplitForSharded() {
+ bool isSplitForSharded() const {
return _splitForSharded;
}
@@ -204,7 +204,7 @@ public:
* Returns true if this pipeline is the part of a split pipeline which is responsible for
* merging the results from the shards.
*/
- bool isSplitForMerge() {
+ bool isSplitForMerge() const {
return _splitForMerge;
}
@@ -259,7 +259,7 @@ public:
*/
DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const;
- const SourceContainer& getSources() {
+ const SourceContainer& getSources() const {
return _sources;
}