summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHana Pearlman <hana.pearlman@mongodb.com>2021-01-26 16:58:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-10 17:32:15 +0000
commit461350791520a9f092630cbe3520e11dff09b746 (patch)
tree9fce7d7f94fd0cbad134a90b28c2f0f788fbc21e /src
parente09ce369e4912a945454a5494248046535c70460 (diff)
downloadmongo-461350791520a9f092630cbe3520e11dff09b746.tar.gz
SERVER-53486: Allow rewrite to push projections on bucketed fields into $unpackBucket
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp237
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h38
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp601
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp36
-rw-r--r--src/mongo/db/pipeline/pipeline.h12
5 files changed, 701 insertions, 223 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index c5ce3957bc7..6326ec32419 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -44,6 +44,119 @@ REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceInternalUnpackBucket::createFromBson);
+namespace {
+/**
+ * Removes metaField from the field set and returns a boolean indicating whether metaField should be
+ * included in the materialized measurements. Always returns false if metaField does not exist.
+ */
+auto eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior,
+ BucketSpec* bucketSpec) {
+ if (!bucketSpec->metaField) {
+ return false;
+ } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField);
+ itr != bucketSpec->fieldSet.end()) {
+ bucketSpec->fieldSet.erase(itr);
+ return unpackerBehavior == BucketUnpacker::Behavior::kInclude;
+ } else {
+ return unpackerBehavior == BucketUnpacker::Behavior::kExclude;
+ }
+}
+
+/**
+ * Determine if timestamp values should be included in the materialized measurements.
+ */
+auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, BucketSpec* bucketSpec) {
+ return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end());
+}
+
+/**
+ * A projection can be internalized if every field corresponds to a boolean value. Note that this
+ * correctly rejects dotted fieldnames, which are mapped to objects internally.
+ */
+bool canInternalizeProjectObj(const BSONObj& projObj) {
+ return std::all_of(projObj.begin(), projObj.end(), [](auto&& e) { return e.isBoolean(); });
+}
+
+/**
+ * If 'src' represents an inclusion or exclusion $project, return a BSONObj representing it and a
+ * bool indicating its type (true for inclusion, false for exclusion). Else return an empty BSONObj.
+ */
+auto getIncludeExcludeProjectAndType(DocumentSource* src) {
+ if (const auto proj = dynamic_cast<DocumentSourceSingleDocumentTransformation*>(src); proj &&
+ (proj->getType() == TransformerInterface::TransformerType::kInclusionProjection ||
+ proj->getType() == TransformerInterface::TransformerType::kExclusionProjection)) {
+ return std::pair{proj->getTransformer().serializeTransformation(boost::none).toBson(),
+ proj->getType() ==
+ TransformerInterface::TransformerType::kInclusionProjection};
+ }
+ return std::pair{BSONObj{}, false};
+}
+
+/**
+ * Determine which fields can be moved out of 'src', if it is a $project, and into
+ * $_internalUnpackBucket. Return the set of those field names, the remaining $project, and a bool
+ * indicating its type.
+ *
+ * For example, given {$project: {a: 1, b.c: 1, _id: 0}}, return the set ['a', 'b'], the project
+ * {a: 1, b.c: 1}, and 'true'. In this case, '_id' does not need to be included in either the set or
+ * the project, since the unpack will exclude any field not explicitly included in its field set.
+ */
+auto extractInternalizableFieldsRemainingProjectAndType(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSource* src) {
+ auto eraseIdIf = [](std::set<std::string>&& set, auto&& cond) {
+ if (cond)
+ set.erase("_id");
+ return std::move(set);
+ };
+
+ if (auto [remainingProj, isInclusion] = getIncludeExcludeProjectAndType(src);
+ remainingProj.isEmpty()) {
+ // There is nothing to internalize.
+ return std::tuple{std::set<std::string>{}, remainingProj, isInclusion};
+ } else if (canInternalizeProjectObj(remainingProj)) {
+ // We can internalize the whole object, so 'remainingProject' should be empty.
+ return std::tuple{eraseIdIf(remainingProj.getFieldNames<std::set<std::string>>(),
+ remainingProj.getBoolField("_id") != isInclusion),
+ BSONObj{},
+ isInclusion};
+ } else if (isInclusion) {
+ // We can't internalize the whole inclusion, so we must leave it unmodified in the pipeline
+ // for correctness. We do dependency analysis to get an internalizable $project to ensure
+ // we're handling dotted fields or fields referenced inside 'src'.
+ Pipeline::SourceContainer projectStage{src};
+ auto dependencyProj =
+ Pipeline::getDependenciesForContainer(expCtx, projectStage, boost::none)
+ .toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes);
+ return std::tuple{eraseIdIf(dependencyProj.getFieldNames<std::set<std::string>>(),
+ dependencyProj.getIntField("_id") != 1),
+ remainingProj,
+ isInclusion};
+ } else {
+ // We can internalize any fields that are not dotted, and leave the rest in 'remainingProj'.
+ std::set<std::string> topLevelFields;
+ std::for_each(remainingProj.begin(), remainingProj.end(), [&topLevelFields](auto&& elem) {
+ // '_id' may be included in this exclusion. If so, don't add it to 'topLevelFields'.
+ if (elem.isBoolean() && !elem.Bool()) {
+ topLevelFields.emplace(elem.fieldName());
+ }
+ });
+ return std::tuple{topLevelFields, remainingProj.removeFields(topLevelFields), isInclusion};
+ }
+}
+
+// Optimize the given pipeline after the $_internalUnpackBucket stage.
+void optimizeEndOfPipeline(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ // We must create a new SourceContainer representing the subsection of the pipeline we wish to
+ // optimize, since otherwise calls to optimizeAt() will overrun these limits.
+ auto endOfPipeline = Pipeline::SourceContainer(std::next(itr), container->end());
+ Pipeline::optimizeContainer(&endOfPipeline);
+ container->erase(std::next(itr), container->end());
+ container->splice(std::next(itr), endOfPipeline);
+}
+} // namespace
+
void BucketUnpacker::reset(BSONObj&& bucket) {
_fieldIters.clear();
_timeFieldIter = boost::none;
@@ -102,6 +215,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) {
}
}
+void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
+ _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec);
+ _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec);
+ _unpackerBehavior = behavior;
+ _spec = std::move(bucketSpec);
+}
+
Document BucketUnpacker::getNext() {
invariant(hasNext());
@@ -189,21 +309,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
"The $_internalUnpackBucket stage requires a timeField parameter",
specElem[kTimeFieldName].ok());
- // Determine if timestamp values should be included in the materialized measurements.
- auto includeTimeField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
- (bucketSpec.fieldSet.find(bucketSpec.timeField) != bucketSpec.fieldSet.end());
-
- // Check the include/exclude set to determine if measurements should be materialized with
- // metadata.
- auto includeMetaField = false;
- if (bucketSpec.metaField) {
- const auto metaFieldIt = bucketSpec.fieldSet.find(*bucketSpec.metaField);
- auto found = metaFieldIt != bucketSpec.fieldSet.end();
- if (found) {
- bucketSpec.fieldSet.erase(metaFieldIt);
- }
- includeMetaField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == found;
- }
+ auto includeTimeField = determineIncludeTimeField(unpackerBehavior, &bucketSpec);
+
+ auto includeMetaField =
+ eraseMetaFromFieldSetAndDetermineIncludeMeta(unpackerBehavior, &bucketSpec);
return make_intrusive<DocumentSourceInternalUnpackBucket>(
expCtx,
@@ -250,65 +359,64 @@ DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
return nextResult;
}
-namespace {
-/**
- * A projection can be internalized if it does not include any dotted field names and if every field
- * name corresponds to a boolean value.
- */
-bool canInternalizeProjectObj(const BSONObj& projObj) {
- const auto names = projObj.getFieldNames<std::set<std::string>>();
- return std::all_of(names.begin(), names.end(), [&projObj](auto&& name) {
- return name.find('.') == std::string::npos && projObj.getField(name).isBoolean();
- });
-}
-
-/**
- * If 'src' represents an inclusion or exclusion $project, return a BSONObj representing it, else
- * return an empty BSONObj. If 'inclusionOnly' is true, 'src' must be an inclusion $project.
- */
-auto getProjectObj(DocumentSource* src, bool inclusionOnly) {
- if (const auto projStage = dynamic_cast<DocumentSourceSingleDocumentTransformation*>(src);
- projStage &&
- (projStage->getType() == TransformerInterface::TransformerType::kInclusionProjection ||
- (!inclusionOnly &&
- projStage->getType() == TransformerInterface::TransformerType::kExclusionProjection))) {
- return projStage->getTransformer().serializeTransformation(boost::none).toBson();
+void DocumentSourceInternalUnpackBucket::internalizeProject(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ if (std::next(itr) == container->end() || !_bucketUnpacker.bucketSpec().fieldSet.empty()) {
+ // There is no project to internalize or there are already fields being included/excluded.
+ return;
+ }
+ auto [fields, remainingProject, isInclusion] =
+ extractInternalizableFieldsRemainingProjectAndType(getContext(), std::next(itr)->get());
+ if (fields.empty()) {
+ return;
}
- return BSONObj{};
+ // Update 'bucketUnpacker' state with the new fields and behavior. Update 'container' state by
+ // removing the old $project and potentially replacing it with 'remainingProject'.
+ auto spec = _bucketUnpacker.bucketSpec();
+ spec.fieldSet = std::move(fields);
+ _bucketUnpacker.setBucketSpecAndBehavior(std::move(spec),
+ isInclusion ? BucketUnpacker::Behavior::kInclude
+ : BucketUnpacker::Behavior::kExclude);
+ container->erase(std::next(itr));
+ if (!remainingProject.isEmpty()) {
+ container->insert(std::next(itr),
+ DocumentSourceProject::createFromBson(
+ BSON("$project" << remainingProject).firstElement(), getContext()));
+ }
}
/**
- * Given a source container and an iterator pointing to the $unpackBucket stage, builds a projection
- * BSONObj that can be entirely moved into the $unpackBucket stage, following these rules:
- * 1. If there is an inclusion projection immediately after the $unpackBucket which can be
- * internalized, an empty BSONObj will be returned.
+ * Given a source container and an iterator pointing to the $_internalUnpackBucket, builds a
+ * projection that can be entirely moved into the $_internalUnpackBucket, following these rules:
+ * 1. If there is an inclusion projection immediately after which can be internalized, an empty
+ BSONObj will be returned.
* 2. Otherwise, if there is a finite dependency set for the rest of the pipeline, an inclusion
* $project representing it and containing only root-level fields will be returned. An
* inclusion $project will be returned here even if there is a viable exclusion $project
* next in the pipeline.
* 3. Otherwise, an empty BSONObj will be returned.
*/
-auto buildProjectToInternalize(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) {
- // Check for a viable inclusion $project after the $unpackBucket. This handles case 1.
- if (auto projObj = getProjectObj(std::next(itr)->get(), true);
- !projObj.isEmpty() && canInternalizeProjectObj(projObj)) {
+BSONObj DocumentSourceInternalUnpackBucket::buildProjectToInternalize(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) const {
+ if (std::next(itr) == container->end()) {
return BSONObj{};
}
- // If there is a finite dependency set for the pipeline after the $unpackBucket, obtain an
- // inclusion $project representing its root-level fields. Otherwise, we get an empty BSONObj.
- Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end());
- auto deps = Pipeline::getDependenciesForContainer(expCtx, restOfPipeline, boost::none);
- auto dependencyProj = deps.toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes);
+ // Check for a viable inclusion $project after the $_internalUnpackBucket. This handles case 1.
+ if (auto [project, isInclusion] = getIncludeExcludeProjectAndType(std::next(itr)->get());
+ isInclusion && !project.isEmpty() && canInternalizeProjectObj(project)) {
+ return BSONObj{};
+ }
- // If 'dependencyProj' is not empty, we're in case 2. If it is empty, we're in case 3. There may
- // be a viable exclusion $project in the pipeline, but we don't need to check for it here.
- return dependencyProj;
+ // Attempt to get an inclusion $project representing the root-level dependencies of the pipeline
+ // after the $_internalUnpackBucket. If this $project is not empty, then the dependency set was
+ // finite, and we are in case 2. If it is empty, we're in case 3. There may be a viable
+ // exclusion $project in the pipeline, but we don't need to check for it here.
+ Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end());
+ auto deps = Pipeline::getDependenciesForContainer(getContext(), restOfPipeline, boost::none);
+ return deps.toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes);
}
-} // namespace
Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
@@ -319,15 +427,20 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi
}
// Attempt to build an internalizable $project based on dependency analysis.
- if (auto projObj = buildProjectToInternalize(getContext(), itr, container);
- !projObj.isEmpty()) {
+ if (auto projObj = buildProjectToInternalize(itr, container); !projObj.isEmpty()) {
// Give the new $project a chance to be optimized before internalizing.
container->insert(std::next(itr),
DocumentSourceProject::createFromBson(
BSON("$project" << projObj).firstElement(), pExpCtx));
- return std::next(itr);
}
- return std::next(itr);
+ // Optimize the pipeline after the $unpackBucket.
+ optimizeEndOfPipeline(std::next(itr), container);
+
+ // If there is a $project following the $_internalUnpackBucket, internalize as much of it as
+ // possible, and update the state of 'container' and '_bucketUnpacker' to reflect this.
+ internalizeProject(itr, container);
+
+ return container->end();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 7731e5032db..db7a880c589 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -97,18 +97,28 @@ public:
return _bucket;
}
+ bool includeMetaField() const {
+ return _includeMetaField;
+ }
+
+ bool includeTimeField() const {
+ return _includeTimeField;
+ }
+
+ void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
+
private:
- const BucketSpec _spec;
- const Behavior _unpackerBehavior;
+ BucketSpec _spec;
+ Behavior _unpackerBehavior;
// Iterates the timestamp section of the bucket to drive the unpacking iteration.
boost::optional<BSONObjIterator> _timeFieldIter;
// A flag used to mark that the timestamp value should be materialized in measurements.
- const bool _includeTimeField;
+ bool _includeTimeField;
- // A flag used to mark that a bucket's metadata element should be materialized in measurements.
- const bool _includeMetaField;
+ // A flag used to mark that a bucket's metadata value should be materialized in measurements.
+ bool _includeMetaField;
// The bucket being unpacked.
BSONObj _bucket;
@@ -141,6 +151,14 @@ public:
return kStageName.rawData();
}
+ bool includeMetaField() const {
+ return _bucketUnpacker.includeMetaField();
+ }
+
+ bool includeTimeField() const {
+ return _bucketUnpacker.includeTimeField();
+ }
+
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return {StreamType::kStreaming,
PositionRequirement::kNone,
@@ -162,6 +180,16 @@ public:
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
+ /*
+ * Given a source container and an iterator pointing to the unpack stage, attempt to internalize
+ * a following $project, and update the state for 'container' and '_bucketUnpacker'.
+ */
+ void internalizeProject(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+
+ BSONObj buildProjectToInternalize(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) const;
+
private:
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp
index fb34a31a234..d2318a69d44 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp
@@ -741,227 +741,558 @@ TEST_F(InternalUnpackBucketStageTest, ParserRejectsBothIncludeAndExcludeParamete
5408000);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectForGroupDependencies) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto groupSpecObj = fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}");
+/**************************** buildProjectToInternalize() tests ****************************/
+using InternalUnpackBucketBuildProjectToInternalizeTest = AggregationContextFixture;
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx());
- ASSERT_EQ(2u, pipeline->getSources().size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsIncludeProjectForGroupDependencies) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
- pipeline->optimizePipeline();
- ASSERT_EQ(3u, pipeline->getSources().size());
-
- auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(3u, serialized.size());
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: false, x: true, y: true}}"), serialized[1]),
- 0);
- ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[2]);
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, y: 1}"), project), 0);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectForProjectDependencies) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto projectSpecObj = fromjson("{$project: {_id: true, x: {f: '$y'}}}");
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsIncludeProjectForProjectDependencies) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: {f: '$y'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx());
- ASSERT_EQ(2u, pipeline->getSources().size());
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
- pipeline->optimizePipeline();
- ASSERT_EQ(3u, pipeline->getSources().size());
+ const UnorderedFieldsBSONObjComparator kComparator;
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 1, x: 1, y: 1}"), project), 0);
+}
- auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(3u, serialized.size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsIncludeProjectWhenInMiddleOfPipeline) {
+ auto pipeline = Pipeline::parse(
+ makeVector(
+ fromjson("{$match: {'meta.source': 'primary'}}"),
+ fromjson(
+ "{$_internalUnpackBucket: { exclude: [], timeField: 'foo', metaField: 'meta'}}"),
+ fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(3u, container.size());
+
+ auto project =
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(std::next(container.begin())->get())
+ ->buildProjectToInternalize(std::next(container.begin()), &container);
const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: true, x: true, y: true}}"), serialized[1]),
- 0);
- ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]);
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, y: 1}"), project), 0);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectWhenInMiddleOfPipeline) {
- auto matchSpecObj = fromjson("{$match: {'meta.source': 'primary'}}");
- auto unpackSpecObj =
- fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo', metaField: 'meta'}}");
- auto groupSpecObj = fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}");
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsIncludeProjectWhenGroupDependenciesAreDotted) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$group: {_id: '$x.y', f: {$first: '$a.b'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
- auto pipeline =
- Pipeline::parse(makeVector(matchSpecObj, unpackSpecObj, groupSpecObj), getExpCtx());
- ASSERT_EQ(3u, pipeline->getSources().size());
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
- pipeline->optimizePipeline();
- ASSERT_EQ(4u, pipeline->getSources().size());
+ const UnorderedFieldsBSONObjComparator kComparator;
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, a: 1}"), project), 0);
+}
- auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(4u, serialized.size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsIncludeProjectWhenProjectDependenciesAreDotted) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {'_id.a': true}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[0]);
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[1]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: false, x: true, y: true}}"), serialized[2]),
- 0);
- ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[3]);
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 1}"), project), 0);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectWhenDependenciesAreDotted) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto groupSpecObj = fromjson("{$group: {_id: '$x.y', f: {$first: '$a.b'}}}");
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenThereAreNoDependencies) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx());
- ASSERT_EQ(2u, pipeline->getSources().size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenSortDependenciesAreNotFinite) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$sort: {x: 1}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
- pipeline->optimizePipeline();
- ASSERT_EQ(3u, pipeline->getSources().size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenProjectDependenciesAreNotFinite) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$sort: {x: 1}}"),
+ fromjson("{$project: {_id: false, x: false}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(3u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
- auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(3u, serialized.size());
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenViableInclusionProjectExists) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: true, x: true}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
+
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenViableNonBoolInclusionProjectExists) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
+
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ DoesNotBuildProjectWhenViableExclusionProjectExists) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: false, x: false}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(2u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
+ ASSERT(project.isEmpty());
+}
+
+TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest,
+ BuildsInclusionProjectInsteadOfViableExclusionProject) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: false, x: false}}"),
+ fromjson("{$sort: {y: 1}}"),
+ fromjson("{$group: {_id: '$y', f: {$first: '$z'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+ ASSERT_EQ(4u, container.size());
+
+ auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->buildProjectToInternalize(container.begin(), &container);
const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: false, x: true, a: true}}"), serialized[1]),
- 0);
- ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[2]);
+ ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, y: 1, z: 1}"), project), 0);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenThereAreNoDependencies) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto groupSpecObj = fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}");
+/******************************* internalizeProject() tests *******************************/
+using InternalUnpackBucketInternalizeProjectTest = AggregationContextFixture;
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx());
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionProject) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: true, y: true, _id: true}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
- ASSERT_EQ(2u, pipeline->getSources().size());
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(2u, serialized.size());
-
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[1]);
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"),
+ serialized[0]);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenSortDependenciesAreNotFinite) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto sortSpecObj = fromjson("{$sort: {x: 1}}");
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionButExcludesId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: true, y: true, _id: false}}")),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, sortSpecObj), getExpCtx());
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['x', 'y'], timeField: 'foo'}}"),
+ serialized[0]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionThatImplicitlyIncludesId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: true, y: true}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"),
+ serialized[0]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesPartOfInclusionProject) {
+ auto projectSpecObj = fromjson("{$project: {_id: true, x: {y: true}, a: {b: '$c'}}}");
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ projectSpecObj),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
ASSERT_EQ(2u, serialized.size());
-
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[1]);
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['_id', 'a', 'c', 'x'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]);
}
-TEST_F(InternalUnpackBucketStageTest,
- OptimizeDoesNotAddProjectWhenProjectDependenciesAreNotFinite) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto sortSpecObj = fromjson("{$sort: {x: 1}}");
- auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}");
-
- auto pipeline =
- Pipeline::parse(makeVector(unpackSpecObj, sortSpecObj, projectSpecObj), getExpCtx());
- ASSERT_EQ(3u, pipeline->getSources().size());
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizesPartOfInclusionProjectButExcludesId) {
+ auto projectSpecObj = fromjson("{$project: {x: {y: true}, a: {b: '$c'}, _id: false}}");
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ projectSpecObj),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
- ASSERT_EQ(3u, pipeline->getSources().size());
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(3u, serialized.size());
+ ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['a', 'c', 'x'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesExclusionProject) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: false, x: false}}")),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[1]);
- ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]);
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { exclude: ['_id', 'x'], timeField: 'foo'}}"),
+ serialized[0]);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenViableInclusionProjectExists) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto projectSpecObj = fromjson("{$project: {_id: true, x: true}}");
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesExclusionProjectButIncludesId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: true, x: false}}")),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx());
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"),
+ serialized[0]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizesExclusionProjectThatImplicitlyIncludesId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: false}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"),
+ serialized[0]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesPartOfExclusionProjectExcludesId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: {y: false}, _id: false}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
ASSERT_EQ(2u, serialized.size());
-
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]);
+ ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['_id'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(fromjson("{$project: {x: {y: false}, _id: true}}"), serialized[1]);
}
-TEST_F(InternalUnpackBucketStageTest,
- OptimizeDoesNotAddProjectWhenViableNonBoolInclusionProjectExists) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizesPartOfExclusionProjectImplicitlyIncludesId) {
auto pipeline = Pipeline::parse(
- makeVector(unpackSpecObj, fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")), getExpCtx());
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: {y: false}, z: false}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['z'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(fromjson("{$project: {x: {y: false}, _id: true}}"), serialized[1]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizesPartOfExclusionProjectIncludesNestedId) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {x: false, _id: {y: false}}}")),
+ getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(fromjson("{$project: {_id: {y: false}}}"), serialized[1]);
+}
- const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: true, x: true, y: true}}"), serialized[1]),
- 0);
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesNonBoolInclusionProject) {
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"),
+ serialized[0]);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenViableExclusionProjectExists) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}");
+TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesWhenInMiddleOfPipeline) {
+ auto matchSpecObj = fromjson("{$match: {'meta.source': 'primary'}}");
+ auto pipeline = Pipeline::parse(
+ makeVector(matchSpecObj,
+ fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: false, x: true, y: true}}")),
+ getExpCtx());
+ ASSERT_EQ(3u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(std::next(container.begin())->get())
+ ->internalizeProject(std::next(container.begin()), &container);
- auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx());
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[0]);
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['x', 'y'], timeField: 'foo'}}"),
+ serialized[1]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest, DoesNotInternalizeWhenNoProjectFollows) {
+ auto unpackBucketSpecObj =
+ fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
+ auto groupSpecObj = fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, groupSpecObj), getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- pipeline->optimizePipeline();
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]);
+ ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[1]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ DoesNotInternalizeWhenUnpackBucketAlreadyExcludes) {
+ auto unpackBucketSpecObj =
+ fromjson("{$_internalUnpackBucket: { exclude: ['a'], timeField: 'foo'}}");
+ auto projectSpecObj = fromjson("{$project: {_id: true}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, projectSpecObj), getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
auto serialized = pipeline->serializeToBson();
ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]);
+ ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]);
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ DoesNotInternalizeWhenUnpackBucketAlreadyIncludes) {
+ auto unpackBucketSpecObj =
+ fromjson("{$_internalUnpackBucket: { include: ['a'], timeField: 'foo'}}");
+ auto projectSpecObj = fromjson("{$project: {_id: true}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, projectSpecObj), getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
+ dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get())
+ ->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(2u, serialized.size());
+ ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]);
ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]);
}
-TEST_F(InternalUnpackBucketStageTest, OptimizeAddsInclusionProjectInsteadOfViableExclusionProject) {
- auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}");
- auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}");
- auto sortSpecObj = fromjson("{$sort: {y: 1}}");
- auto groupSpecObj = fromjson("{$group: {_id: '$y', f: {$first: '$z'}}}");
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizeProjectUpdatesMetaAndTimeFieldStateInclusionProj) {
+ auto pipeline = Pipeline::parse(
+ makeVector(
+ fromjson(
+ "{$_internalUnpackBucket: { exclude: [], timeField: 'time', metaField: 'meta'}}"),
+ fromjson("{$project: {meta: true, _id: true}}")),
+ getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get());
+ unpack->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson(
+ "{$_internalUnpackBucket: { include: ['_id'], timeField: 'time', metaField: 'meta'}}"),
+ serialized[0]);
+ ASSERT_TRUE(unpack->includeMetaField());
+ ASSERT_FALSE(unpack->includeTimeField());
+}
+
+TEST_F(InternalUnpackBucketInternalizeProjectTest,
+ InternalizeProjectUpdatesMetaAndTimeFieldStateExclusionProj) {
+ auto unpackBucketSpecObj = fromjson(
+ "{$_internalUnpackBucket: { exclude: [], timeField: 'time', metaField: 'myMeta'}}");
+ auto pipeline = Pipeline::parse(
+ makeVector(unpackBucketSpecObj, fromjson("{$project: {myMeta: false}}")), getExpCtx());
+ ASSERT_EQ(2u, pipeline->getSources().size());
+ auto& container = pipeline->getSources();
+
+ auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get());
+ unpack->internalizeProject(container.begin(), &container);
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1u, serialized.size());
+ ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]);
+ ASSERT_FALSE(unpack->includeMetaField());
+ ASSERT_TRUE(unpack->includeTimeField());
+}
+TEST_F(InternalUnpackBucketStageTest, OptimizeInternalizesAndOptimizesEndOfPipeline) {
+ auto sortSpecObj = fromjson("{$sort: {'a': 1}}");
+ auto matchSpecObj = fromjson("{$match: {x: {$gt: 1}}}");
auto pipeline = Pipeline::parse(
- makeVector(unpackSpecObj, projectSpecObj, sortSpecObj, groupSpecObj), getExpCtx());
+ makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"),
+ fromjson("{$project: {_id: false, a: true, x: true}}"),
+ sortSpecObj,
+ matchSpecObj),
+ getExpCtx());
ASSERT_EQ(4u, pipeline->getSources().size());
pipeline->optimizePipeline();
- ASSERT_EQ(5u, pipeline->getSources().size());
auto serialized = pipeline->serializeToBson();
- ASSERT_EQ(5u, serialized.size());
-
- const UnorderedFieldsBSONObjComparator kComparator;
- ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]);
- ASSERT_EQ(
- kComparator.compare(fromjson("{$project: {_id: false, y: true, z: true}}"), serialized[1]),
- 0);
- ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]);
- ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[3]);
- ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[4]);
+ ASSERT_EQ(3u, serialized.size());
+ ASSERT_BSONOBJ_EQ(
+ fromjson("{$_internalUnpackBucket: { include: ['a', 'x'], timeField: 'foo'}}"),
+ serialized[0]);
+ ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[1]);
+ ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[2]);
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index b54b76ab0a8..875c3e9e957 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -241,31 +241,32 @@ void Pipeline::optimizePipeline() {
return;
}
- SourceContainer optimizedSources;
+ optimizeContainer(&_sources);
+}
- SourceContainer::iterator itr = _sources.begin();
+void Pipeline::optimizeContainer(SourceContainer* container) {
+ SourceContainer optimizedSources;
- // We could be swapping around stages during this process, so disconnect the pipeline to prevent
- // us from entering a state with dangling pointers.
- unstitch();
+ SourceContainer::iterator itr = container->begin();
try {
- while (itr != _sources.end()) {
+ while (itr != container->end()) {
invariant((*itr).get());
- itr = (*itr).get()->optimizeAt(itr, &_sources);
+ itr = (*itr).get()->optimizeAt(itr, container);
}
// Once we have reached our final number of stages, optimize each individually.
- for (auto&& source : _sources) {
+ for (auto&& source : *container) {
if (auto out = source->optimize()) {
optimizedSources.push_back(out);
}
}
- _sources.swap(optimizedSources);
+ container->swap(optimizedSources);
} catch (DBException& ex) {
ex.addContext("Failed to optimize pipeline");
throw;
}
- stitch();
+
+ stitch(container);
}
bool Pipeline::aggHasWriteStage(const BSONObj& cmd) {
@@ -421,20 +422,19 @@ vector<BSONObj> Pipeline::serializeToBson() const {
return asBson;
}
-void Pipeline::unstitch() {
- for (auto&& stage : _sources) {
- stage->setSource(nullptr);
- }
+void Pipeline::stitch() {
+ stitch(&_sources);
}
-void Pipeline::stitch() {
- if (_sources.empty()) {
+void Pipeline::stitch(SourceContainer* container) {
+ if (container->empty()) {
return;
}
+
// Chain together all the stages.
- DocumentSource* prevSource = _sources.front().get();
+ DocumentSource* prevSource = container->front().get();
prevSource->setSource(nullptr);
- for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end());
+ for (Pipeline::SourceContainer::iterator iter(++container->begin()), listEnd(container->end());
iter != listEnd;
++iter) {
intrusive_ptr<DocumentSource> pTemp(*iter);
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index fa3b3d6167a..e3ff4714577 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -238,6 +238,11 @@ public:
void optimizePipeline();
/**
+ * Modifies the container, optimizing it by combining and swapping stages.
+ */
+ static void optimizeContainer(SourceContainer* container);
+
+ /**
* Returns any other collections involved in the pipeline in addition to the collection the
* aggregation is run on. All namespaces returned are the names of collections, after views have
* been resolved.
@@ -353,10 +358,11 @@ private:
void stitch();
/**
- * Reset all stages' child pointers to nullptr. Used to prevent dangling pointers during the
- * optimization process, where we might swap or destroy stages.
+ * Stitch together the source pointers by calling setSource() for each source in 'container'.
+ * This function must be called any time the order of stages within the container changes, e.g.
+ * in optimizeContainer().
*/
- void unstitch();
+ static void stitch(SourceContainer* container);
/**
* Performs common validation for top-level or facet pipelines. Throws if the pipeline is