summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Evans <jacob.evans@10gen.com>2018-10-31 14:47:41 -0400
committerJacob Evans <jacob.evans@10gen.com>2018-11-07 11:54:09 -0500
commit7fadb5dc1ddecb37617b2d927960cd15da8172b2 (patch)
treee0500f06f2daa4e5b3991be2268c231cd8f29d51
parent9fb109b1d55374353954b998bd11bcb0d17b6e91 (diff)
downloadmongo-7fadb5dc1ddecb37617b2d927960cd15da8172b2.tar.gz
SERVER-28592 Move $sample earlier in the aggregation pipeline
-rw-r--r--src/mongo/db/pipeline/document_source.cpp53
-rw-r--r--src/mongo/db/pipeline/document_source.h16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp35
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h9
7 files changed, 97 insertions, 22 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 28baeed3fce..bb822fc8189 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/field_path.h"
@@ -210,17 +211,8 @@ StringMap<std::string> computeNewNamesAssumingAnyPathsNotRenamedAreUnmodified(
} // namespace
-Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
- Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
- invariant(*itr == this);
-
- // If we are at the end of the pipeline, only optimize in the special case of a cache stage.
- if (std::next(itr) == container->end()) {
- return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this)
- ? doOptimizeAt(itr, container)
- : container->end();
- }
-
+bool DocumentSource::pushMatchBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) {
// We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the
@@ -241,12 +233,43 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
container->insert(std::next(itr), std::move(splitMatch.second));
}
- // The stage before the new $match may be able to optimize further, if there is such a
- // stage.
- return std::prev(itr) == container->begin() ? std::prev(itr)
- : std::prev(std::prev(itr));
+ return true;
}
}
+ return false;
+}
+
+bool DocumentSource::pushSampleBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ auto nextSample = dynamic_cast<DocumentSourceSample*>((*std::next(itr)).get());
+ if (constraints().canSwapWithLimitAndSample && nextSample) {
+
+ container->insert(itr, std::move(nextSample));
+ container->erase(std::next(itr));
+
+ return true;
+ }
+ return false;
+}
+
+Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ // If we are at the end of the pipeline, only optimize in the special case of a cache stage.
+ if (std::next(itr) == container->end()) {
+ return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this)
+ ? doOptimizeAt(itr, container)
+ : container->end();
+ }
+
+ // Attempt to swap 'itr' with a subsequent $match or subsequent $sample.
+ if (pushMatchBefore(itr, container) || pushSampleBefore(itr, container)) {
+ // The stage before the pushed before stage may be able to optimize further, if there is
+ // such a stage.
+ return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr));
+ }
+
return doOptimizeAt(itr, container);
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index ec95df29d7e..1a4f4a43472 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -327,6 +327,22 @@ public:
// stages.
//
+private:
+ /**
+ * Attempt to push a match stage from directly ahead of the current stage given by itr to before
+ * the current stage. Returns whether the optimization was performed.
+ */
+ bool pushMatchBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+
+ /**
+ * Attempt to push a sample stage from directly ahead of the current stage given by itr to before
+ * the current stage. Returns whether the optimization was performed.
+ */
+ bool pushSampleBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+
+public:
/**
* The non-virtual public interface for optimization. Attempts to do some generic optimizations
* such as pushing $matches as early in the pipeline as possible, then calls out to
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 405a3bd303c..8a94978ef15 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -129,8 +129,6 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints(
TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage);
- constraints.canSwapWithMatch = true;
- constraints.canSwapWithLimit = true;
// This transformation could be part of a 'collectionless' change stream on an entire
// database or cluster, mark as independent of any collection if so.
constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection;
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 637cb41cbb0..1b9407b9d6b 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -65,7 +65,7 @@ public:
TransactionRequirement::kAllowed,
ChangeStreamRequirement::kWhitelist);
constraints.canSwapWithMatch = true;
- constraints.canSwapWithLimit = true;
+ constraints.canSwapWithLimitAndSample = true;
// This transformation could be part of a 'collectionless' change stream on an entire
// database or cluster, mark as independent of any collection if so.
constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection;
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index c825a7b9165..6ba8c9e8e78 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -215,7 +215,7 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
container->erase(sortItr);
sortItr = std::next(itr);
skipSum = 0;
- } else if (!nextStage->constraints().canSwapWithLimit) {
+ } else if (!nextStage->constraints().canSwapWithLimitAndSample) {
return std::next(itr);
} else {
++sortItr;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 733008f4c1b..422fa4ba266 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -132,6 +132,41 @@ TEST(PipelineOptimizationTest, LimitDoesNotMoveBeforeProject) {
"[{$project: {_id: true, a : true}}, {$limit : 5}]");
}
+TEST(PipelineOptimizationTest, SampleLegallyPushedBefore) {
+ string inputPipe =
+ "[{$replaceRoot: { newRoot: \"$a\" }}, "
+ "{$project: { b: 1 }}, "
+ "{$addFields: { c: 1 }}, "
+ "{$sample: { size: 4 }}]";
+
+ string outputPipe =
+ "[{$sample: {size: 4}}, "
+ "{$replaceRoot: {newRoot: \"$a\"}}, "
+ "{$project: {_id: true, b : true}}, "
+ "{$addFields: {c : {$const : 1}}}]";
+
+ assertPipelineOptimizesTo(inputPipe, outputPipe);
+}
+
+TEST(PipelineOptimizationTest, SampleNotIllegallyPushedBefore) {
+ string inputPipe =
+ "[{$project: { a : 1 }}, "
+ "{$match: { a: 1 }}, "
+ "{$sample: { size: 4 }}]";
+
+ string outputPipe =
+ "[{$match: {a: {$eq: 1}}}, "
+ "{$sample : {size: 4}}, "
+ "{$project: {_id: true, a : true}}]";
+
+ string serializedPipe =
+ "[{$match: {a: 1}}, "
+ "{$sample : {size: 4}}, "
+ "{$project: {_id: true, a : true}}]";
+
+ assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe);
+}
+
TEST(PipelineOptimizationTest, MoveMatchBeforeAddFieldsIfInvolvedFieldsNotRelated) {
string inputPipe = "[{$addFields : {a : 1}}, {$match : {b : 1}}]";
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
index 6388c0cf2ba..9de486e1d37 100644
--- a/src/mongo/db/pipeline/stage_constraints.h
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -284,8 +284,11 @@ struct StageConstraints {
// $match predicates be swapped before itself.
bool canSwapWithMatch = false;
- // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is
- // true if this stage does not add or remove documents from the pipeline.
- bool canSwapWithLimit = false;
+ // Neither a $sample nor a $limit can be moved before any stage which will possibly change the
+ // number of documents in the stream. Further, no stage which will change the order of documents
+ // can be swapped with a $limit or $sample, and no stage which will change behavior based on the
+ // order of documents can be swapped with a $sample because our implementation of sample will do
+ // a random sort which shuffles the order.
+ bool canSwapWithLimitAndSample = false;
};
} // namespace mongo