diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-08-01 23:15:53 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-08-02 14:15:37 +0100 |
commit | 27d43e300e292043fefd7634de99160157955a17 (patch) | |
tree | 31844a3949fd1c900b3f7e93929faea1671b0c9b /src/mongo/db/pipeline/pipeline.cpp | |
parent | 20c85d4848b4e4b3c88e1788eaff362143fffd20 (diff) | |
download | mongo-27d43e300e292043fefd7634de99160157955a17.tar.gz |
SERVER-18940 Optimise sharded aggregations that are targeted to a single shard
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 31cd015ea97..e5cd62a4670 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -289,21 +289,53 @@ void Pipeline::dispose(OperationContext* opCtx) { } std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { + invariant(!_splitForSharded); + invariant(!_splitForMerge); + invariant(!_unsplitSources); + // Create and initialize the shard spec we'll return. We start with an empty pipeline on the // shards and all work being done in the merger. Optimizations can move operations between // the pipelines to be more efficient. std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx), Pipeline::Deleter(pCtx->opCtx)); + // Keep a copy of the original source list in case we need to reset the pipeline from split to + // unsplit later. + shardPipeline->_unsplitSources.emplace(_sources); + // The order in which optimizations are applied can have significant impact on the // efficiency of the final pipeline. Be Careful! Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); + shardPipeline->_splitForSharded = true; + _splitForMerge = true; + return shardPipeline; } +void Pipeline::unsplitFromSharded( + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) { + invariant(_splitForSharded); + invariant(!_splitForMerge); + invariant(pipelineForMergingShard); + invariant(_unsplitSources); + + // Clear the merge source list so that destroying the pipeline object won't dispose of the + // stages. We still have a reference to each of the stages which will be moved back to the shard + // pipeline via '_unsplitSources'. + pipelineForMergingShard->_sources.clear(); + pipelineForMergingShard.reset(); + + // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional. + _sources = *_unsplitSources; + _unsplitSources.reset(); + + _splitForSharded = false; + stitch(); +} + void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { while (!mergePipe->_sources.empty()) { intrusive_ptr<DocumentSource> current = mergePipe->_sources.front(); |