diff options
author | Hana Pearlman <hana.pearlman@mongodb.com> | 2021-01-26 16:58:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-10 17:32:15 +0000 |
commit | 461350791520a9f092630cbe3520e11dff09b746 (patch) | |
tree | 9fce7d7f94fd0cbad134a90b28c2f0f788fbc21e /src | |
parent | e09ce369e4912a945454a5494248046535c70460 (diff) | |
download | mongo-461350791520a9f092630cbe3520e11dff09b746.tar.gz |
SERVER-53486: Allow rewrite to push projections on bucketed fields into $unpackBucket
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 237 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 38 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp | 601 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 12 |
5 files changed, 701 insertions, 223 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index c5ce3957bc7..6326ec32419 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -44,6 +44,119 @@ REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket, LiteParsedDocumentSourceDefault::parse, DocumentSourceInternalUnpackBucket::createFromBson); +namespace { +/** + * Removes metaField from the field set and returns a boolean indicating whether metaField should be + * included in the materialized measurements. Always returns false if metaField does not exist. + */ +auto eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior, + BucketSpec* bucketSpec) { + if (!bucketSpec->metaField) { + return false; + } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField); + itr != bucketSpec->fieldSet.end()) { + bucketSpec->fieldSet.erase(itr); + return unpackerBehavior == BucketUnpacker::Behavior::kInclude; + } else { + return unpackerBehavior == BucketUnpacker::Behavior::kExclude; + } +} + +/** + * Determine if timestamp values should be included in the materialized measurements. + */ +auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, BucketSpec* bucketSpec) { + return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end()); +} + +/** + * A projection can be internalized if every field corresponds to a boolean value. Note that this + * correctly rejects dotted fieldnames, which are mapped to objects internally. + */ +bool canInternalizeProjectObj(const BSONObj& projObj) { + return std::all_of(projObj.begin(), projObj.end(), [](auto&& e) { return e.isBoolean(); }); +} + +/** + * If 'src' represents an inclusion or exclusion $project, return a BSONObj representing it and a + * bool indicating its type (true for inclusion, false for exclusion). Else return an empty BSONObj. + */ +auto getIncludeExcludeProjectAndType(DocumentSource* src) { + if (const auto proj = dynamic_cast<DocumentSourceSingleDocumentTransformation*>(src); proj && + (proj->getType() == TransformerInterface::TransformerType::kInclusionProjection || + proj->getType() == TransformerInterface::TransformerType::kExclusionProjection)) { + return std::pair{proj->getTransformer().serializeTransformation(boost::none).toBson(), + proj->getType() == + TransformerInterface::TransformerType::kInclusionProjection}; + } + return std::pair{BSONObj{}, false}; +} + +/** + * Determine which fields can be moved out of 'src', if it is a $project, and into + * $_internalUnpackBucket. Return the set of those field names, the remaining $project, and a bool + * indicating its type. + * + * For example, given {$project: {a: 1, b.c: 1, _id: 0}}, return the set ['a', 'b'], the project + * {a: 1, b.c: 1}, and 'true'. In this case, '_id' does not need to be included in either the set or + * the project, since the unpack will exclude any field not explicitly included in its field set. + */ +auto extractInternalizableFieldsRemainingProjectAndType( + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSource* src) { + auto eraseIdIf = [](std::set<std::string>&& set, auto&& cond) { + if (cond) + set.erase("_id"); + return std::move(set); + }; + + if (auto [remainingProj, isInclusion] = getIncludeExcludeProjectAndType(src); + remainingProj.isEmpty()) { + // There is nothing to internalize. + return std::tuple{std::set<std::string>{}, remainingProj, isInclusion}; + } else if (canInternalizeProjectObj(remainingProj)) { + // We can internalize the whole object, so 'remainingProject' should be empty. + return std::tuple{eraseIdIf(remainingProj.getFieldNames<std::set<std::string>>(), + remainingProj.getBoolField("_id") != isInclusion), + BSONObj{}, + isInclusion}; + } else if (isInclusion) { + // We can't internalize the whole inclusion, so we must leave it unmodified in the pipeline + // for correctness. We do dependency analysis to get an internalizable $project to ensure + // we're handling dotted fields or fields referenced inside 'src'. + Pipeline::SourceContainer projectStage{src}; + auto dependencyProj = + Pipeline::getDependenciesForContainer(expCtx, projectStage, boost::none) + .toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes); + return std::tuple{eraseIdIf(dependencyProj.getFieldNames<std::set<std::string>>(), + dependencyProj.getIntField("_id") != 1), + remainingProj, + isInclusion}; + } else { + // We can internalize any fields that are not dotted, and leave the rest in 'remainingProj'. + std::set<std::string> topLevelFields; + std::for_each(remainingProj.begin(), remainingProj.end(), [&topLevelFields](auto&& elem) { + // '_id' may be included in this exclusion. If so, don't add it to 'topLevelFields'. + if (elem.isBoolean() && !elem.Bool()) { + topLevelFields.emplace(elem.fieldName()); + } + }); + return std::tuple{topLevelFields, remainingProj.removeFields(topLevelFields), isInclusion}; + } +} + +// Optimize the given pipeline after the $_internalUnpackBucket stage. +void optimizeEndOfPipeline(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + // We must create a new SourceContainer representing the subsection of the pipeline we wish to + // optimize, since otherwise calls to optimizeAt() will overrun these limits. + auto endOfPipeline = Pipeline::SourceContainer(std::next(itr), container->end()); + Pipeline::optimizeContainer(&endOfPipeline); + container->erase(std::next(itr), container->end()); + container->splice(std::next(itr), endOfPipeline); +} +} // namespace + void BucketUnpacker::reset(BSONObj&& bucket) { _fieldIters.clear(); _timeFieldIter = boost::none; @@ -102,6 +215,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) { } } +void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { + _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec); + _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec); + _unpackerBehavior = behavior; + _spec = std::move(bucketSpec); +} + Document BucketUnpacker::getNext() { invariant(hasNext()); @@ -189,21 +309,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF "The $_internalUnpackBucket stage requires a timeField parameter", specElem[kTimeFieldName].ok()); - // Determine if timestamp values should be included in the materialized measurements. - auto includeTimeField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == - (bucketSpec.fieldSet.find(bucketSpec.timeField) != bucketSpec.fieldSet.end()); - - // Check the include/exclude set to determine if measurements should be materialized with - // metadata. - auto includeMetaField = false; - if (bucketSpec.metaField) { - const auto metaFieldIt = bucketSpec.fieldSet.find(*bucketSpec.metaField); - auto found = metaFieldIt != bucketSpec.fieldSet.end(); - if (found) { - bucketSpec.fieldSet.erase(metaFieldIt); - } - includeMetaField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == found; - } + auto includeTimeField = determineIncludeTimeField(unpackerBehavior, &bucketSpec); + + auto includeMetaField = + eraseMetaFromFieldSetAndDetermineIncludeMeta(unpackerBehavior, &bucketSpec); return make_intrusive<DocumentSourceInternalUnpackBucket>( expCtx, @@ -250,65 +359,64 @@ DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { return nextResult; } -namespace { -/** - * A projection can be internalized if it does not include any dotted field names and if every field - * name corresponds to a boolean value. - */ -bool canInternalizeProjectObj(const BSONObj& projObj) { - const auto names = projObj.getFieldNames<std::set<std::string>>(); - return std::all_of(names.begin(), names.end(), [&projObj](auto&& name) { - return name.find('.') == std::string::npos && projObj.getField(name).isBoolean(); - }); -} - -/** - * If 'src' represents an inclusion or exclusion $project, return a BSONObj representing it, else - * return an empty BSONObj. If 'inclusionOnly' is true, 'src' must be an inclusion $project. - */ -auto getProjectObj(DocumentSource* src, bool inclusionOnly) { - if (const auto projStage = dynamic_cast<DocumentSourceSingleDocumentTransformation*>(src); - projStage && - (projStage->getType() == TransformerInterface::TransformerType::kInclusionProjection || - (!inclusionOnly && - projStage->getType() == TransformerInterface::TransformerType::kExclusionProjection))) { - return projStage->getTransformer().serializeTransformation(boost::none).toBson(); +void DocumentSourceInternalUnpackBucket::internalizeProject(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + if (std::next(itr) == container->end() || !_bucketUnpacker.bucketSpec().fieldSet.empty()) { + // There is no project to internalize or there are already fields being included/excluded. + return; + } + auto [fields, remainingProject, isInclusion] = + extractInternalizableFieldsRemainingProjectAndType(getContext(), std::next(itr)->get()); + if (fields.empty()) { + return; } - return BSONObj{}; + // Update 'bucketUnpacker' state with the new fields and behavior. Update 'container' state by + // removing the old $project and potentially replacing it with 'remainingProject'. + auto spec = _bucketUnpacker.bucketSpec(); + spec.fieldSet = std::move(fields); + _bucketUnpacker.setBucketSpecAndBehavior(std::move(spec), + isInclusion ? BucketUnpacker::Behavior::kInclude + : BucketUnpacker::Behavior::kExclude); + container->erase(std::next(itr)); + if (!remainingProject.isEmpty()) { + container->insert(std::next(itr), + DocumentSourceProject::createFromBson( + BSON("$project" << remainingProject).firstElement(), getContext())); + } } /** - * Given a source container and an iterator pointing to the $unpackBucket stage, builds a projection - * BSONObj that can be entirely moved into the $unpackBucket stage, following these rules: - * 1. If there is an inclusion projection immediately after the $unpackBucket which can be - * internalized, an empty BSONObj will be returned. + * Given a source container and an iterator pointing to the $_internalUnpackBucket, builds a + * projection that can be entirely moved into the $_internalUnpackBucket, following these rules: + * 1. If there is an inclusion projection immediately after which can be internalized, an empty + BSONObj will be returned. * 2. Otherwise, if there is a finite dependency set for the rest of the pipeline, an inclusion * $project representing it and containing only root-level fields will be returned. An * inclusion $project will be returned here even if there is a viable exclusion $project * next in the pipeline. * 3. Otherwise, an empty BSONObj will be returned. */ -auto buildProjectToInternalize(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) { - // Check for a viable inclusion $project after the $unpackBucket. This handles case 1. - if (auto projObj = getProjectObj(std::next(itr)->get(), true); - !projObj.isEmpty() && canInternalizeProjectObj(projObj)) { +BSONObj DocumentSourceInternalUnpackBucket::buildProjectToInternalize( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) const { + if (std::next(itr) == container->end()) { return BSONObj{}; } - // If there is a finite dependency set for the pipeline after the $unpackBucket, obtain an - // inclusion $project representing its root-level fields. Otherwise, we get an empty BSONObj. - Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end()); - auto deps = Pipeline::getDependenciesForContainer(expCtx, restOfPipeline, boost::none); - auto dependencyProj = deps.toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes); + // Check for a viable inclusion $project after the $_internalUnpackBucket. This handles case 1. + if (auto [project, isInclusion] = getIncludeExcludeProjectAndType(std::next(itr)->get()); + isInclusion && !project.isEmpty() && canInternalizeProjectObj(project)) { + return BSONObj{}; + } - // If 'dependencyProj' is not empty, we're in case 2. If it is empty, we're in case 3. There may - // be a viable exclusion $project in the pipeline, but we don't need to check for it here. - return dependencyProj; + // Attempt to get an inclusion $project representing the root-level dependencies of the pipeline + // after the $_internalUnpackBucket. If this $project is not empty, then the dependency set was + // finite, and we are in case 2. If it is empty, we're in case 3. There may be a viable + // exclusion $project in the pipeline, but we don't need to check for it here. + Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end()); + auto deps = Pipeline::getDependenciesForContainer(getContext(), restOfPipeline, boost::none); + return deps.toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes); } -} // namespace Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { @@ -319,15 +427,20 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi } // Attempt to build an internalizable $project based on dependency analysis. - if (auto projObj = buildProjectToInternalize(getContext(), itr, container); - !projObj.isEmpty()) { + if (auto projObj = buildProjectToInternalize(itr, container); !projObj.isEmpty()) { // Give the new $project a chance to be optimized before internalizing. container->insert(std::next(itr), DocumentSourceProject::createFromBson( BSON("$project" << projObj).firstElement(), pExpCtx)); - return std::next(itr); } - return std::next(itr); + // Optimize the pipeline after the $unpackBucket. + optimizeEndOfPipeline(std::next(itr), container); + + // If there is a $project following the $_internalUnpackBucket, internalize as much of it as + // possible, and update the state of 'container' and '_bucketUnpacker' to reflect this. + internalizeProject(itr, container); + + return container->end(); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 7731e5032db..db7a880c589 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -97,18 +97,28 @@ public: return _bucket; } + bool includeMetaField() const { + return _includeMetaField; + } + + bool includeTimeField() const { + return _includeTimeField; + } + + void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); + private: - const BucketSpec _spec; - const Behavior _unpackerBehavior; + BucketSpec _spec; + Behavior _unpackerBehavior; // Iterates the timestamp section of the bucket to drive the unpacking iteration. boost::optional<BSONObjIterator> _timeFieldIter; // A flag used to mark that the timestamp value should be materialized in measurements. - const bool _includeTimeField; + bool _includeTimeField; - // A flag used to mark that a bucket's metadata element should be materialized in measurements. - const bool _includeMetaField; + // A flag used to mark that a bucket's metadata value should be materialized in measurements. + bool _includeMetaField; // The bucket being unpacked. BSONObj _bucket; @@ -141,6 +151,14 @@ public: return kStageName.rawData(); } + bool includeMetaField() const { + return _bucketUnpacker.includeMetaField(); + } + + bool includeTimeField() const { + return _bucketUnpacker.includeTimeField(); + } + StageConstraints constraints(Pipeline::SplitState pipeState) const final { return {StreamType::kStreaming, PositionRequirement::kNone, @@ -162,6 +180,16 @@ public: Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; + /* + * Given a source container and an iterator pointing to the unpack stage, attempt to internalize + * a following $project, and update the state for 'container' and '_bucketUnpacker'. + */ + void internalizeProject(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + + BSONObj buildProjectToInternalize(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) const; + private: GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp index fb34a31a234..d2318a69d44 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp @@ -741,227 +741,558 @@ TEST_F(InternalUnpackBucketStageTest, ParserRejectsBothIncludeAndExcludeParamete 5408000); } -TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectForGroupDependencies) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto groupSpecObj = fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}"); +/**************************** buildProjectToInternalize() tests ****************************/ +using InternalUnpackBucketBuildProjectToInternalizeTest = AggregationContextFixture; - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx()); - ASSERT_EQ(2u, pipeline->getSources().size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsIncludeProjectForGroupDependencies) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); - pipeline->optimizePipeline(); - ASSERT_EQ(3u, pipeline->getSources().size()); - - auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(3u, serialized.size()); + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: false, x: true, y: true}}"), serialized[1]), - 0); - ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[2]); + ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, y: 1}"), project), 0); } -TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectForProjectDependencies) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto projectSpecObj = fromjson("{$project: {_id: true, x: {f: '$y'}}}"); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsIncludeProjectForProjectDependencies) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: {f: '$y'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx()); - ASSERT_EQ(2u, pipeline->getSources().size()); + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); - pipeline->optimizePipeline(); - ASSERT_EQ(3u, pipeline->getSources().size()); + const UnorderedFieldsBSONObjComparator kComparator; + ASSERT_EQ(kComparator.compare(fromjson("{_id: 1, x: 1, y: 1}"), project), 0); +} - auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(3u, serialized.size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsIncludeProjectWhenInMiddleOfPipeline) { + auto pipeline = Pipeline::parse( + makeVector( + fromjson("{$match: {'meta.source': 'primary'}}"), + fromjson( + "{$_internalUnpackBucket: { exclude: [], timeField: 'foo', metaField: 'meta'}}"), + fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(3u, container.size()); + + auto project = + dynamic_cast<DocumentSourceInternalUnpackBucket*>(std::next(container.begin())->get()) + ->buildProjectToInternalize(std::next(container.begin()), &container); const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: true, x: true, y: true}}"), serialized[1]), - 0); - ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]); + ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, y: 1}"), project), 0); } -TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectWhenInMiddleOfPipeline) { - auto matchSpecObj = fromjson("{$match: {'meta.source': 'primary'}}"); - auto unpackSpecObj = - fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo', metaField: 'meta'}}"); - auto groupSpecObj = fromjson("{$group: {_id: '$x', f: {$first: '$y'}}}"); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsIncludeProjectWhenGroupDependenciesAreDotted) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$group: {_id: '$x.y', f: {$first: '$a.b'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); - auto pipeline = - Pipeline::parse(makeVector(matchSpecObj, unpackSpecObj, groupSpecObj), getExpCtx()); - ASSERT_EQ(3u, pipeline->getSources().size()); + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); - pipeline->optimizePipeline(); - ASSERT_EQ(4u, pipeline->getSources().size()); + const UnorderedFieldsBSONObjComparator kComparator; + ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, x: 1, a: 1}"), project), 0); +} - auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(4u, serialized.size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsIncludeProjectWhenProjectDependenciesAreDotted) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {'_id.a': true}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[0]); - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[1]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: false, x: true, y: true}}"), serialized[2]), - 0); - ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[3]); + ASSERT_EQ(kComparator.compare(fromjson("{_id: 1}"), project), 0); } -TEST_F(InternalUnpackBucketStageTest, OptimizeAddsIncludeProjectWhenDependenciesAreDotted) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto groupSpecObj = fromjson("{$group: {_id: '$x.y', f: {$first: '$a.b'}}}"); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenThereAreNoDependencies) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx()); - ASSERT_EQ(2u, pipeline->getSources().size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenSortDependenciesAreNotFinite) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$sort: {x: 1}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} - pipeline->optimizePipeline(); - ASSERT_EQ(3u, pipeline->getSources().size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenProjectDependenciesAreNotFinite) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$sort: {x: 1}}"), + fromjson("{$project: {_id: false, x: false}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(3u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} - auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(3u, serialized.size()); +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenViableInclusionProjectExists) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: true, x: true}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} + +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenViableNonBoolInclusionProjectExists) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} + +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + DoesNotBuildProjectWhenViableExclusionProjectExists) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: false, x: false}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(2u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); + ASSERT(project.isEmpty()); +} + +TEST_F(InternalUnpackBucketBuildProjectToInternalizeTest, + BuildsInclusionProjectInsteadOfViableExclusionProject) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: false, x: false}}"), + fromjson("{$sort: {y: 1}}"), + fromjson("{$group: {_id: '$y', f: {$first: '$z'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + ASSERT_EQ(4u, container.size()); + + auto project = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->buildProjectToInternalize(container.begin(), &container); const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: false, x: true, a: true}}"), serialized[1]), - 0); - ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[2]); + ASSERT_EQ(kComparator.compare(fromjson("{_id: 0, y: 1, z: 1}"), project), 0); } -TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenThereAreNoDependencies) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto groupSpecObj = fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}"); +/******************************* internalizeProject() tests *******************************/ +using InternalUnpackBucketInternalizeProjectTest = AggregationContextFixture; - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx()); +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionProject) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: true, y: true, _id: true}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); - ASSERT_EQ(2u, pipeline->getSources().size()); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(2u, serialized.size()); - - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[1]); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"), + serialized[0]); } -TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenSortDependenciesAreNotFinite) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto sortSpecObj = fromjson("{$sort: {x: 1}}"); +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionButExcludesId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: true, y: true, _id: false}}")), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, sortSpecObj), getExpCtx()); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['x', 'y'], timeField: 'foo'}}"), + serialized[0]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesInclusionThatImplicitlyIncludesId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: true, y: true}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"), + serialized[0]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesPartOfInclusionProject) { + auto projectSpecObj = fromjson("{$project: {_id: true, x: {y: true}, a: {b: '$c'}}}"); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + projectSpecObj), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); ASSERT_EQ(2u, serialized.size()); - - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[1]); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['_id', 'a', 'c', 'x'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]); } -TEST_F(InternalUnpackBucketStageTest, - OptimizeDoesNotAddProjectWhenProjectDependenciesAreNotFinite) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto sortSpecObj = fromjson("{$sort: {x: 1}}"); - auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}"); - - auto pipeline = - Pipeline::parse(makeVector(unpackSpecObj, sortSpecObj, projectSpecObj), getExpCtx()); - ASSERT_EQ(3u, pipeline->getSources().size()); +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizesPartOfInclusionProjectButExcludesId) { + auto projectSpecObj = fromjson("{$project: {x: {y: true}, a: {b: '$c'}, _id: false}}"); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + projectSpecObj), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); - ASSERT_EQ(3u, pipeline->getSources().size()); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(3u, serialized.size()); + ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['a', 'c', 'x'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesExclusionProject) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: false, x: false}}")), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[1]); - ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { exclude: ['_id', 'x'], timeField: 'foo'}}"), + serialized[0]); } -TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenViableInclusionProjectExists) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto projectSpecObj = fromjson("{$project: {_id: true, x: true}}"); +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesExclusionProjectButIncludesId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: true, x: false}}")), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx()); + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"), + serialized[0]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizesExclusionProjectThatImplicitlyIncludesId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: false}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"), + serialized[0]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesPartOfExclusionProjectExcludesId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: {y: false}, _id: false}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); ASSERT_EQ(2u, serialized.size()); - - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]); + ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['_id'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(fromjson("{$project: {x: {y: false}, _id: true}}"), serialized[1]); } -TEST_F(InternalUnpackBucketStageTest, - OptimizeDoesNotAddProjectWhenViableNonBoolInclusionProjectExists) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizesPartOfExclusionProjectImplicitlyIncludesId) { auto pipeline = Pipeline::parse( - makeVector(unpackSpecObj, fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")), getExpCtx()); + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: {y: false}, z: false}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['z'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(fromjson("{$project: {x: {y: false}, _id: true}}"), serialized[1]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizesPartOfExclusionProjectIncludesNestedId) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {x: false, _id: {y: false}}}")), + getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(fromjson("{$_internalUnpackBucket: { exclude: ['x'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(fromjson("{$project: {_id: {y: false}}}"), serialized[1]); +} - const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: true, x: true, y: true}}"), serialized[1]), - 0); +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesNonBoolInclusionProject) { + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: 1, x: 1.0, y: 1.5}}")), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['_id', 'x', 'y'], timeField: 'foo'}}"), + serialized[0]); } -TEST_F(InternalUnpackBucketStageTest, OptimizeDoesNotAddProjectWhenViableExclusionProjectExists) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}"); +TEST_F(InternalUnpackBucketInternalizeProjectTest, InternalizesWhenInMiddleOfPipeline) { + auto matchSpecObj = fromjson("{$match: {'meta.source': 'primary'}}"); + auto pipeline = Pipeline::parse( + makeVector(matchSpecObj, + fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: false, x: true, y: true}}")), + getExpCtx()); + ASSERT_EQ(3u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(std::next(container.begin())->get()) + ->internalizeProject(std::next(container.begin()), &container); - auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, projectSpecObj), getExpCtx()); + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[0]); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['x', 'y'], timeField: 'foo'}}"), + serialized[1]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, DoesNotInternalizeWhenNoProjectFollows) { + auto unpackBucketSpecObj = + fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); + auto groupSpecObj = fromjson("{$group: {_id: {$const: null}, count: { $sum: {$const: 1 }}}}"); + auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, groupSpecObj), getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - pipeline->optimizePipeline(); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]); + ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[1]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, + DoesNotInternalizeWhenUnpackBucketAlreadyExcludes) { + auto unpackBucketSpecObj = + fromjson("{$_internalUnpackBucket: { exclude: ['a'], timeField: 'foo'}}"); + auto projectSpecObj = fromjson("{$project: {_id: true}}"); + auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, projectSpecObj), getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); auto serialized = pipeline->serializeToBson(); ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]); + ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, + DoesNotInternalizeWhenUnpackBucketAlreadyIncludes) { + auto unpackBucketSpecObj = + fromjson("{$_internalUnpackBucket: { include: ['a'], timeField: 'foo'}}"); + auto projectSpecObj = fromjson("{$project: {_id: true}}"); + auto pipeline = Pipeline::parse(makeVector(unpackBucketSpecObj, projectSpecObj), getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); + dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()) + ->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(2u, serialized.size()); + ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]); ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[1]); } -TEST_F(InternalUnpackBucketStageTest, OptimizeAddsInclusionProjectInsteadOfViableExclusionProject) { - auto unpackSpecObj = fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"); - auto projectSpecObj = fromjson("{$project: {_id: false, x: false}}"); - auto sortSpecObj = fromjson("{$sort: {y: 1}}"); - auto groupSpecObj = fromjson("{$group: {_id: '$y', f: {$first: '$z'}}}"); +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizeProjectUpdatesMetaAndTimeFieldStateInclusionProj) { + auto pipeline = Pipeline::parse( + makeVector( + fromjson( + "{$_internalUnpackBucket: { exclude: [], timeField: 'time', metaField: 'meta'}}"), + fromjson("{$project: {meta: true, _id: true}}")), + getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()); + unpack->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson( + "{$_internalUnpackBucket: { include: ['_id'], timeField: 'time', metaField: 'meta'}}"), + serialized[0]); + ASSERT_TRUE(unpack->includeMetaField()); + ASSERT_FALSE(unpack->includeTimeField()); +} + +TEST_F(InternalUnpackBucketInternalizeProjectTest, + InternalizeProjectUpdatesMetaAndTimeFieldStateExclusionProj) { + auto unpackBucketSpecObj = fromjson( + "{$_internalUnpackBucket: { exclude: [], timeField: 'time', metaField: 'myMeta'}}"); + auto pipeline = Pipeline::parse( + makeVector(unpackBucketSpecObj, fromjson("{$project: {myMeta: false}}")), getExpCtx()); + ASSERT_EQ(2u, pipeline->getSources().size()); + auto& container = pipeline->getSources(); + + auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.begin()->get()); + unpack->internalizeProject(container.begin(), &container); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1u, serialized.size()); + ASSERT_BSONOBJ_EQ(unpackBucketSpecObj, serialized[0]); + ASSERT_FALSE(unpack->includeMetaField()); + ASSERT_TRUE(unpack->includeTimeField()); +} +TEST_F(InternalUnpackBucketStageTest, OptimizeInternalizesAndOptimizesEndOfPipeline) { + auto sortSpecObj = fromjson("{$sort: {'a': 1}}"); + auto matchSpecObj = fromjson("{$match: {x: {$gt: 1}}}"); auto pipeline = Pipeline::parse( - makeVector(unpackSpecObj, projectSpecObj, sortSpecObj, groupSpecObj), getExpCtx()); + makeVector(fromjson("{$_internalUnpackBucket: { exclude: [], timeField: 'foo'}}"), + fromjson("{$project: {_id: false, a: true, x: true}}"), + sortSpecObj, + matchSpecObj), + getExpCtx()); ASSERT_EQ(4u, pipeline->getSources().size()); pipeline->optimizePipeline(); - ASSERT_EQ(5u, pipeline->getSources().size()); auto serialized = pipeline->serializeToBson(); - ASSERT_EQ(5u, serialized.size()); - - const UnorderedFieldsBSONObjComparator kComparator; - ASSERT_BSONOBJ_EQ(unpackSpecObj, serialized[0]); - ASSERT_EQ( - kComparator.compare(fromjson("{$project: {_id: false, y: true, z: true}}"), serialized[1]), - 0); - ASSERT_BSONOBJ_EQ(projectSpecObj, serialized[2]); - ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[3]); - ASSERT_BSONOBJ_EQ(groupSpecObj, serialized[4]); + ASSERT_EQ(3u, serialized.size()); + ASSERT_BSONOBJ_EQ( + fromjson("{$_internalUnpackBucket: { include: ['a', 'x'], timeField: 'foo'}}"), + serialized[0]); + ASSERT_BSONOBJ_EQ(matchSpecObj, serialized[1]); + ASSERT_BSONOBJ_EQ(sortSpecObj, serialized[2]); } } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index b54b76ab0a8..875c3e9e957 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -241,31 +241,32 @@ void Pipeline::optimizePipeline() { return; } - SourceContainer optimizedSources; + optimizeContainer(&_sources); +} - SourceContainer::iterator itr = _sources.begin(); +void Pipeline::optimizeContainer(SourceContainer* container) { + SourceContainer optimizedSources; - // We could be swapping around stages during this process, so disconnect the pipeline to prevent - // us from entering a state with dangling pointers. - unstitch(); + SourceContainer::iterator itr = container->begin(); try { - while (itr != _sources.end()) { + while (itr != container->end()) { invariant((*itr).get()); - itr = (*itr).get()->optimizeAt(itr, &_sources); + itr = (*itr).get()->optimizeAt(itr, container); } // Once we have reached our final number of stages, optimize each individually. - for (auto&& source : _sources) { + for (auto&& source : *container) { if (auto out = source->optimize()) { optimizedSources.push_back(out); } } - _sources.swap(optimizedSources); + container->swap(optimizedSources); } catch (DBException& ex) { ex.addContext("Failed to optimize pipeline"); throw; } - stitch(); + + stitch(container); } bool Pipeline::aggHasWriteStage(const BSONObj& cmd) { @@ -421,20 +422,19 @@ vector<BSONObj> Pipeline::serializeToBson() const { return asBson; } -void Pipeline::unstitch() { - for (auto&& stage : _sources) { - stage->setSource(nullptr); - } +void Pipeline::stitch() { + stitch(&_sources); } -void Pipeline::stitch() { - if (_sources.empty()) { +void Pipeline::stitch(SourceContainer* container) { + if (container->empty()) { return; } + // Chain together all the stages. - DocumentSource* prevSource = _sources.front().get(); + DocumentSource* prevSource = container->front().get(); prevSource->setSource(nullptr); - for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end()); + for (Pipeline::SourceContainer::iterator iter(++container->begin()), listEnd(container->end()); iter != listEnd; ++iter) { intrusive_ptr<DocumentSource> pTemp(*iter); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index fa3b3d6167a..e3ff4714577 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -238,6 +238,11 @@ public: void optimizePipeline(); /** + * Modifies the container, optimizing it by combining and swapping stages. + */ + static void optimizeContainer(SourceContainer* container); + + /** * Returns any other collections involved in the pipeline in addition to the collection the * aggregation is run on. All namespaces returned are the names of collections, after views have * been resolved. @@ -353,10 +358,11 @@ private: void stitch(); /** - * Reset all stages' child pointers to nullptr. Used to prevent dangling pointers during the - * optimization process, where we might swap or destroy stages. + * Stitch together the source pointers by calling setSource() for each source in 'container'. + * This function must be called any time the order of stages within the container changes, e.g. + * in optimizeContainer(). */ - void unstitch(); + static void stitch(SourceContainer* container); /** * Performs common validation for top-level or facet pipelines. Throws if the pipeline is |