summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-08-01 23:15:53 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-08-02 14:15:37 +0100
commit27d43e300e292043fefd7634de99160157955a17 (patch)
tree31844a3949fd1c900b3f7e93929faea1671b0c9b /src/mongo/db/pipeline/pipeline.cpp
parent20c85d4848b4e4b3c88e1788eaff362143fffd20 (diff)
downloadmongo-27d43e300e292043fefd7634de99160157955a17.tar.gz
SERVER-18940 Optimise sharded aggregations that are targeted to a single shard
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp32
1 files changed, 32 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 31cd015ea97..e5cd62a4670 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -289,21 +289,53 @@ void Pipeline::dispose(OperationContext* opCtx) {
}
std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
+ invariant(!_splitForSharded);
+ invariant(!_splitForMerge);
+ invariant(!_unsplitSources);
+
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx),
Pipeline::Deleter(pCtx->opCtx));
+ // Keep a copy of the original source list in case we need to reset the pipeline from split to
+ // unsplit later.
+ shardPipeline->_unsplitSources.emplace(_sources);
+
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
+ shardPipeline->_splitForSharded = true;
+ _splitForMerge = true;
+
return shardPipeline;
}
+void Pipeline::unsplitFromSharded(
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) {
+ invariant(_splitForSharded);
+ invariant(!_splitForMerge);
+ invariant(pipelineForMergingShard);
+ invariant(_unsplitSources);
+
+ // Clear the merge source list so that destroying the pipeline object won't dispose of the
+ // stages. We still have a reference to each of the stages which will be moved back to the shard
+ // pipeline via '_unsplitSources'.
+ pipelineForMergingShard->_sources.clear();
+ pipelineForMergingShard.reset();
+
+ // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional.
+ _sources = *_unsplitSources;
+ _unsplitSources.reset();
+
+ _splitForSharded = false;
+ stitch();
+}
+
void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
while (!mergePipe->_sources.empty()) {
intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();