diff options
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_split_pipeline.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_limit.h | 15 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 6 |
5 files changed, 34 insertions, 14 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 8f1c2fef4a0..134a8a86e3d 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -529,14 +529,18 @@ public: /** * Returns a source to be run on the shards, or NULL if no work should be done on the shards for * this stage. Must not mutate the existing source object; if different behaviour is required in - * the split-pipeline case, a new source should be created and configured appropriately. + * the split-pipeline case, a new source should be created and configured appropriately. It is + * an error for getShardSource() to return a pointer to the same object as getMergeSource(), + * since this can result in the source being stitched into both the shard and merge pipelines + * when the latter is executed on mongoS. */ virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0; /** * Returns a source that combines results from the shards, or NULL if no work should be done in * the merge pipeline for this stage. Must not mutate the existing source object; if different - * behaviour is required, a new source should be created and configured appropriately. + * behaviour is required, a new source should be created and configured appropriately. It is an + * error for getMergeSource() to return a pointer to the same object as getShardSource(). */ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index c18a6d301a6..f9ac84c555f 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -49,9 +49,10 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement, const boost::intrusive_ptr<ExpressionContext>&); - DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - HostTypeRequirement mergeType) - : DocumentSource(expCtx), _mergeType(mergeType) {} + static boost::intrusive_ptr<DocumentSource> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, HostTypeRequirement mergeType) { + return new DocumentSourceInternalSplitPipeline(expCtx, mergeType); + } const char* getSourceName() const final { return kStageName.rawData(); @@ -62,7 +63,7 @@ public: } boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; + return DocumentSourceInternalSplitPipeline::create(pExpCtx, _mergeType); } StageConstraints constraints() const final { @@ -74,6 +75,10 @@ public: GetNextResult getNext() final; private: + DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + HostTypeRequirement mergeType) + : DocumentSource(expCtx), _mergeType(mergeType) {} + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; }; diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 88acfa8b45a..f5f93fff1a4 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -68,13 +68,22 @@ public: static boost::intrusive_ptr<DocumentSourceLimit> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); - // Virtuals for SplittableDocumentSource - // Need to run on rounter. Running on shard as well is an optimization. + /** + * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on + * the shards is an optimization, but is not strictly necessary in order to produce correct + * pipeline output. + */ boost::intrusive_ptr<DocumentSource> getShardSource() final { return this; } + + /** + * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the + * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the + * merging host in order to produce correct pipeline output. + */ boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; + return DocumentSourceLimit::create(pExpCtx, _limit); } long long getLimit() const { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 5c2ce92237f..89975fc4cf2 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -272,8 +272,7 @@ void Pipeline::dispose(OperationContext* opCtx) { pCtx->opCtx = opCtx; // Make sure all stages are connected, in case we are being disposed via an error path and - // were - // not stitched at the time of the error. + // were not stitched at the time of the error. stitch(); if (!_sources.empty()) { @@ -332,6 +331,7 @@ void Pipeline::unsplitFromSharded( _unsplitSources.reset(); _splitForSharded = false; + stitch(); } @@ -351,6 +351,8 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel // split this source into Merge and Shard _sources intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); + invariant(shardSource != mergeSource); + if (shardSource) shardPipe->_sources.push_back(shardSource); if (mergeSource) diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 6f46bf75476..35b18954fdd 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -196,7 +196,7 @@ public: * Returns true if this pipeline is the part of a split pipeline which should be targeted to the * shards. */ - bool isSplitForSharded() { + bool isSplitForSharded() const { return _splitForSharded; } @@ -204,7 +204,7 @@ public: * Returns true if this pipeline is the part of a split pipeline which is responsible for * merging the results from the shards. */ - bool isSplitForMerge() { + bool isSplitForMerge() const { return _splitForMerge; } @@ -259,7 +259,7 @@ public: */ DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const; - const SourceContainer& getSources() { + const SourceContainer& getSources() const { return _sources; } |