diff options
author | Jacob Evans <jacob.evans@10gen.com> | 2018-10-31 14:47:41 -0400 |
---|---|---|
committer | Jacob Evans <jacob.evans@10gen.com> | 2018-11-07 11:54:09 -0500 |
commit | 7fadb5dc1ddecb37617b2d927960cd15da8172b2 (patch) | |
tree | e0500f06f2daa4e5b3991be2268c231cd8f29d51 /src/mongo/db | |
parent | 9fb109b1d55374353954b998bd11bcb0d17b6e91 (diff) | |
download | mongo-7fadb5dc1ddecb37617b2d927960cd15da8172b2.tar.gz |
SERVER-28592 Move $sample earlier in the aggregation pipeline
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/document_source.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_single_document_transformation.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stage_constraints.h | 9 |
7 files changed, 97 insertions, 22 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 28baeed3fce..bb822fc8189 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -34,6 +34,7 @@ #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/field_path.h" @@ -210,17 +211,8 @@ StringMap<std::string> computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified( } // namespace -Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( - Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { - invariant(*itr == this); - - // If we are at the end of the pipeline, only optimize in the special case of a cache stage. - if (std::next(itr) == container->end()) { - return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this) - ? doOptimizeAt(itr, container) - : container->end(); - } - +bool DocumentSource::pushMatchBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) { // We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the @@ -241,12 +233,43 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( container->insert(std::next(itr), std::move(splitMatch.second)); } - // The stage before the new $match may be able to optimize further, if there is such a - // stage. - return std::prev(itr) == container->begin() ? std::prev(itr) - : std::prev(std::prev(itr)); + return true; } } + return false; +} + +bool DocumentSource::pushSampleBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + auto nextSample = dynamic_cast<DocumentSourceSample*>((*std::next(itr)).get()); + if (constraints().canSwapWithLimitAndSample && nextSample) { + + container->insert(itr, std::move(nextSample)); + container->erase(std::next(itr)); + + return true; + } + return false; +} + +Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + // If we are at the end of the pipeline, only optimize in the special case of a cache stage. + if (std::next(itr) == container->end()) { + return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this) + ? doOptimizeAt(itr, container) + : container->end(); + } + + // Attempt to swap 'itr' with a subsequent $match or subsequent $sample. + if (pushMatchBefore(itr, container) || pushSampleBefore(itr, container)) { + // The stage before the pushed before stage may be able to optimize further, if there is + // such a stage. + return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); + } + return doOptimizeAt(itr, container); } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index ec95df29d7e..1a4f4a43472 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -327,6 +327,22 @@ public: // stages. // +private: + /** + * Attempt to push a match stage from directly ahead of the current stage given by itr to before + * the current stage. Returns whether the optimization was performed. + */ + bool pushMatchBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + + /** + * Attempt to push a sample stage from directly ahead of the current stage given by itr to before + * the current stage. Returns whether the optimization was performed. + */ + bool pushSampleBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + +public: /** * The non-virtual public interface for optimization. Attempts to do some generic optimizations * such as pushing $matches as early in the pipeline as possible, then calls out to diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 405a3bd303c..8a94978ef15 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -129,8 +129,6 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints( TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); - constraints.canSwapWithMatch = true; - constraints.canSwapWithLimit = true; // This transformation could be part of a 'collectionless' change stream on an entire // database or cluster, mark as independent of any collection if so. constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection; diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 637cb41cbb0..1b9407b9d6b 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -65,7 +65,7 @@ public: TransactionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist); constraints.canSwapWithMatch = true; - constraints.canSwapWithLimit = true; + constraints.canSwapWithLimitAndSample = true; // This transformation could be part of a 'collectionless' change stream on an entire // database or cluster, mark as independent of any collection if so. constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index c825a7b9165..6ba8c9e8e78 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -215,7 +215,7 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( container->erase(sortItr); sortItr = std::next(itr); skipSum = 0; - } else if (!nextStage->constraints().canSwapWithLimit) { + } else if (!nextStage->constraints().canSwapWithLimitAndSample) { return std::next(itr); } else { ++sortItr; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 733008f4c1b..422fa4ba266 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -132,6 +132,41 @@ TEST(PipelineOptimizationTest, LimitDoesNotMoveBeforeProject) { "[{$project: {_id: true, a : true}}, {$limit : 5}]"); } +TEST(PipelineOptimizationTest, SampleLegallyPushedBefore) { + string inputPipe = + "[{$replaceRoot: { newRoot: \"$a\" }}, " + "{$project: { b: 1 }}, " + "{$addFields: { c: 1 }}, " + "{$sample: { size: 4 }}]"; + + string outputPipe = + "[{$sample: {size: 4}}, " + "{$replaceRoot: {newRoot: \"$a\"}}, " + "{$project: {_id: true, b : true}}, " + "{$addFields: {c : {$const : 1}}}]"; + + assertPipelineOptimizesTo(inputPipe, outputPipe); +} + +TEST(PipelineOptimizationTest, SampleNotIllegallyPushedBefore) { + string inputPipe = + "[{$project: { a : 1 }}, " + "{$match: { a: 1 }}, " + "{$sample: { size: 4 }}]"; + + string outputPipe = + "[{$match: {a: {$eq: 1}}}, " + "{$sample : {size: 4}}, " + "{$project: {_id: true, a : true}}]"; + + string serializedPipe = + "[{$match: {a: 1}}, " + "{$sample : {size: 4}}, " + "{$project: {_id: true, a : true}}]"; + + assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); +} + TEST(PipelineOptimizationTest, MoveMatchBeforeAddFieldsIfInvolvedFieldsNotRelated) { string inputPipe = "[{$addFields : {a : 1}}, {$match : {b : 1}}]"; diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index 6388c0cf2ba..9de486e1d37 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -284,8 +284,11 @@ struct StageConstraints { // $match predicates be swapped before itself. bool canSwapWithMatch = false; - // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is - // true if this stage does not add or remove documents from the pipeline. - bool canSwapWithLimit = false; + // Neither a $sample nor a $limit can be moved before any stage which will possibly change the + // number of documents in the stream. Further, no stage which will change the order of documents + // can be swapped with a $limit or $sample, and no stage which will change behavior based on the + // order of documents can be swapped with a $sample because our implementation of sample will do + // a random sort which shuffles the order. + bool canSwapWithLimitAndSample = false; }; } // namespace mongo |