diff options
author | Arun Banala <arun.banala@mongodb.com> | 2020-08-13 21:51:25 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-14 18:58:38 +0000 |
commit | 296feba1b776611cdab7719f6a8c784de964fd85 (patch) | |
tree | a5e2d8c96ae1047ea09e21347abb1df124f4db9f /src/mongo/db/pipeline | |
parent | df35b8c64d3c502b25f358a2538fc202bbc20c76 (diff) | |
download | mongo-296feba1b776611cdab7719f6a8c784de964fd85.tar.gz |
SERVER-49539 Populate a bucket in $bucketAuto only when required
Diffstat (limited to 'src/mongo/db/pipeline')
3 files changed, 153 insertions, 145 deletions
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 29199b451c9..06717200cda 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -95,18 +95,17 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() { } invariant(populationResult.isEOF()); - populateBuckets(); - + initalizeBucketIteration(); _populated = true; - _bucketsIterator = _buckets.begin(); } - if (_bucketsIterator == _buckets.end()) { - dispose(); - return GetNextResult::makeEOF(); + if (_currentBucketDetails.currentBucketNum++ < _nBuckets) { + if (auto bucket = populateNextBucket()) { + return makeDocument(*bucket); + } } - - return makeDocument(*(_bucketsIterator++)); + dispose(); + return GetNextResult::makeEOF(); } boost::intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::optimize() { @@ -157,7 +156,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() { for (; next.isAdvanced(); next = pSource->getNext()) { auto nextDoc = next.releaseDocument(); _sorter->add(extractKey(nextDoc), nextDoc); - _nDocuments++; + ++_nDocuments; } return next; } @@ -206,7 +205,8 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>& } } -void DocumentSourceBucketAuto::populateBuckets() { +void DocumentSourceBucketAuto::initalizeBucketIteration() { + // Initialize the iterator on '_sorter'. invariant(_sorter); _sortedInput.reset(_sorter->done()); _sorter.reset(); @@ -218,112 +218,110 @@ void DocumentSourceBucketAuto::populateBuckets() { // Calculate the approximate bucket size. We attempt to fill each bucket with this many // documents. - long long approxBucketSize = std::round(double(_nDocuments) / double(_nBuckets)); + _currentBucketDetails.approxBucketSize = std::round(double(_nDocuments) / double(_nBuckets)); - if (approxBucketSize < 1) { - // If the number of buckets is larger than the number of documents, then we try to make as - // many buckets as possible by placing each document in its own bucket. - approxBucketSize = 1; + if (_currentBucketDetails.approxBucketSize < 1) { + // If the number of buckets is larger than the number of documents, then we try to make + // as many buckets as possible by placing each document in its own bucket. + _currentBucketDetails.approxBucketSize = 1; } +} - boost::optional<pair<Value, Document>> firstEntryInNextBucket; +boost::optional<pair<Value, Document>> +DocumentSourceBucketAuto::adjustBoundariesAndGetMinForNextBucket(Bucket* currentBucket) { + auto getNextValIfPresent = [this]() { + return _sortedInput->more() ? boost::optional<pair<Value, Document>>(_sortedInput->next()) + : boost::none; + }; - // Start creating and populating the buckets. - for (int i = 0; i < _nBuckets; i++) { - bool isLastBucket = (i == _nBuckets - 1); + auto nextValue = getNextValIfPresent(); + if (_granularityRounder) { + Value boundaryValue = _granularityRounder->roundUp(currentBucket->_max); + + // If there are any values that now fall into this bucket after we round the + // boundary, absorb them into this bucket too. + while (nextValue && + pExpCtx->getValueComparator().evaluate(boundaryValue > nextValue->first)) { + addDocumentToBucket(*nextValue, *currentBucket); + nextValue = getNextValIfPresent(); + } - // Get the first value to place in this bucket. - pair<Value, Document> currentValue; - if (firstEntryInNextBucket) { - currentValue = *firstEntryInNextBucket; - firstEntryInNextBucket = boost::none; - } else if (_sortedInput->more()) { - currentValue = _sortedInput->next(); + // Handle the special case where the largest value in the first bucket is zero. In this + // case, we take the minimum boundary of the next bucket and round it down. We then set the + // maximum boundary of the current bucket to be the rounded down value. This maintains that + // the maximum boundary of the current bucket is exclusive and the minimum boundary of the + // next bucket is inclusive. + double currentMax = boundaryValue.coerceToDouble(); + if (currentMax == 0.0 && nextValue) { + currentBucket->_max = _granularityRounder->roundDown(nextValue->first); } else { - // No more values to process. - break; + currentBucket->_max = boundaryValue; + } + } else { + // If there are any more values that are equal to the boundary value, then absorb + // them into the current bucket too. + while (nextValue && + pExpCtx->getValueComparator().evaluate(currentBucket->_max == nextValue->first)) { + addDocumentToBucket(*nextValue, *currentBucket); + nextValue = getNextValIfPresent(); } - // Initialize the current bucket. - Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields); - - // Evaluate each initializer against an empty document. Normally the - // initializer can refer to the group key, but in $bucketAuto there is no single - // group key per bucket. - Document emptyDoc; - for (size_t k = 0; k < _accumulatedFields.size(); ++k) { - Value initializerValue = - _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables); - currentBucket._accums[k]->startNewGroup(initializerValue); + // If there is a bucket that comes after the current bucket, then the current bucket's max + // boundary is updated to the next bucket's min. This makes it so that buckets' min + // boundaries are inclusive and max boundaries are exclusive (except for the last bucket, + // which has an inclusive max). + if (nextValue) { + currentBucket->_max = nextValue->first; } + } + return nextValue; +} - // Add the first value into the current bucket. - addDocumentToBucket(currentValue, currentBucket); +boost::optional<DocumentSourceBucketAuto::Bucket> DocumentSourceBucketAuto::populateNextBucket() { + // If there was a bucket before this, the 'currentMin' should be populated, or there are no more + // documents. + if (!_currentBucketDetails.currentMin && !_sortedInput->more()) { + return {}; + } - if (isLastBucket) { - // If this is the last bucket allowed, we need to put any remaining documents in - // the current bucket. - while (_sortedInput->more()) { - addDocumentToBucket(_sortedInput->next(), currentBucket); - } - } else { - // We go to approxBucketSize - 1 because we already added the first value in order - // to keep track of the minimum value. - for (long long j = 0; j < approxBucketSize - 1; j++) { - if (_sortedInput->more()) { - addDocumentToBucket(_sortedInput->next(), currentBucket); - } else { - // No more values to process. - break; - } - } + std::pair<Value, Document> currentValue = + _currentBucketDetails.currentMin ? *_currentBucketDetails.currentMin : _sortedInput->next(); - boost::optional<pair<Value, Document>> nextValue = _sortedInput->more() - ? boost::optional<pair<Value, Document>>(_sortedInput->next()) - : boost::none; - - if (_granularityRounder) { - Value boundaryValue = _granularityRounder->roundUp(currentBucket._max); - // If there are any values that now fall into this bucket after we round the - // boundary, absorb them into this bucket too. - while (nextValue && - pExpCtx->getValueComparator().evaluate(boundaryValue > nextValue->first)) { - addDocumentToBucket(*nextValue, currentBucket); - nextValue = _sortedInput->more() - ? boost::optional<pair<Value, Document>>(_sortedInput->next()) - : boost::none; - } - if (nextValue) { - currentBucket._max = boundaryValue; - } - } else { - // If there are any more values that are equal to the boundary value, then absorb - // them into the current bucket too. - while (nextValue && - pExpCtx->getValueComparator().evaluate(currentBucket._max == - nextValue->first)) { - addDocumentToBucket(*nextValue, currentBucket); - nextValue = _sortedInput->more() - ? boost::optional<pair<Value, Document>>(_sortedInput->next()) - : boost::none; - } - } - firstEntryInNextBucket = nextValue; - } + Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields); + + // If we have a granularity specified and if there is a bucket that came before the current + // bucket being added, then the current bucket's min boundary is updated to be the previous + // bucket's max boundary. This makes it so that bucket boundaries follow the granularity, have + // inclusive minimums, and have exclusive maximums. + if (_granularityRounder) { + currentBucket._min = _currentBucketDetails.previousMax.value_or( + _granularityRounder->roundDown(currentValue.first)); + } - // Add the current bucket to the vector of buckets. - addBucket(currentBucket); + // Evaluate each initializer against an empty document. Normally the initializer can refer to + // the group key, but in $bucketAuto there is no single group key per bucket. + Document emptyDoc; + for (size_t k = 0; k < _accumulatedFields.size(); ++k) { + Value initializerValue = + _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables); + currentBucket._accums[k]->startNewGroup(initializerValue); } - if (!_buckets.empty() && _granularityRounder) { - // If we we have a granularity, we round the first bucket's minimum down and the last - // bucket's maximum up. This way all of the bucket boundaries are rounded to numbers in the - // granularity specification. - Bucket& firstBucket = _buckets.front(); - Bucket& lastBucket = _buckets.back(); - firstBucket._min = _granularityRounder->roundDown(firstBucket._min); - lastBucket._max = _granularityRounder->roundUp(lastBucket._max); + + // Add 'approxBucketSize' number of documents to the current bucket. If this is the last bucket, + // add all the remaining documents. + addDocumentToBucket(currentValue, currentBucket); + const auto isLastBucket = (_currentBucketDetails.currentBucketNum == _nBuckets); + for (long long i = 1; + _sortedInput->more() && (i < _currentBucketDetails.approxBucketSize || isLastBucket); + i++) { + addDocumentToBucket(_sortedInput->next(), currentBucket); } + + // Modify the bucket details for next bucket. + _currentBucketDetails.currentMin = adjustBoundariesAndGetMinForNextBucket(¤tBucket); + _currentBucketDetails.previousMax = currentBucket._max; + return currentBucket; } DocumentSourceBucketAuto::Bucket::Bucket( @@ -338,37 +336,6 @@ DocumentSourceBucketAuto::Bucket::Bucket( } } -void DocumentSourceBucketAuto::addBucket(Bucket& newBucket) { - if (!_buckets.empty()) { - Bucket& previous = _buckets.back(); - if (_granularityRounder) { - // If we have a granularity specified and if there is a bucket that comes before the new - // bucket being added, then the new bucket's min boundary is updated to be the - // previous bucket's max boundary. This makes it so that bucket boundaries follow the - // granularity, have inclusive minimums, and have exclusive maximums. - - double prevMax = previous._max.coerceToDouble(); - if (prevMax == 0.0) { - // Handle the special case where the largest value in the first bucket is zero. In - // this case, we take the minimum boundary of the second bucket and round it down. - // We then set the maximum boundary of the first bucket to be the rounded down - // value. This maintains that the maximum boundary of the first bucket is exclusive - // and the minimum boundary of the second bucket is inclusive. - previous._max = _granularityRounder->roundDown(newBucket._min); - } - - newBucket._min = previous._max; - } else { - // If there is a bucket that comes before the new bucket being added, then the previous - // bucket's max boundary is updated to the new bucket's min. This makes it so that - // buckets' min boundaries are inclusive and max boundaries are exclusive (except for - // the last bucket, which has an inclusive max). - previous._max = newBucket._min; - } - } - _buckets.push_back(newBucket); -} - Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { const size_t nAccumulatedFields = _accumulatedFields.size(); MutableDocument out(1 + nAccumulatedFields); @@ -389,7 +356,6 @@ Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { void DocumentSourceBucketAuto::doDispose() { _sortedInput.reset(); - _bucketsIterator = _buckets.end(); } Value DocumentSourceBucketAuto::serialize( @@ -451,11 +417,11 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto( const boost::intrusive_ptr<GranularityRounder>& granularityRounder, uint64_t maxMemoryUsageBytes) : DocumentSource(kStageName, pExpCtx), - _nBuckets(numBuckets), _maxMemoryUsageBytes(maxMemoryUsageBytes), _groupByExpression(groupByExpression), - _granularityRounder(granularityRounder) { - + _granularityRounder(granularityRounder), + _nBuckets(numBuckets), + _currentBucketDetails{0} { invariant(!accumulationStatements.empty()); for (auto&& accumulationStatement : accumulationStatements) { _accumulatedFields.push_back(accumulationStatement); diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index fcc35a130d3..44bd1e3f8f0 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -118,6 +118,13 @@ private: std::vector<boost::intrusive_ptr<AccumulatorState>> _accums; }; + struct BucketDetails { + int currentBucketNum; + long long approxBucketSize = 0; + boost::optional<Value> previousMax; + boost::optional<std::pair<Value, Document>> currentMin; + }; + /** * Consumes all of the documents from the source in the pipeline and sorts them by their * 'groupBy' value. This method might not be able to finish populating the sorter in a single @@ -126,27 +133,26 @@ private: */ GetNextResult populateSorter(); + void initalizeBucketIteration(); + /** * Computes the 'groupBy' expression value for 'doc'. */ Value extractKey(const Document& doc); /** - * Calculates the bucket boundaries for the input documents and places them into buckets. + * Returns the next bucket if exists. boost::none if none exist. */ - void populateBuckets(); + boost::optional<Bucket> populateNextBucket(); + boost::optional<std::pair<Value, Document>> adjustBoundariesAndGetMinForNextBucket( + Bucket* currentBucket); /** * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'. */ void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket); /** - * Adds 'newBucket' to _buckets and updates any boundaries if necessary. - */ - void addBucket(Bucket& newBucket); - - /** * Makes a document using the information from bucket. This is what is returned when getNext() * is called. */ @@ -157,14 +163,13 @@ private: std::vector<AccumulationStatement> _accumulatedFields; - int _nBuckets; uint64_t _maxMemoryUsageBytes; bool _populated = false; - std::vector<Bucket> _buckets; - std::vector<Bucket>::iterator _bucketsIterator; boost::intrusive_ptr<Expression> _groupByExpression; boost::intrusive_ptr<GranularityRounder> _granularityRounder; + int _nBuckets; long long _nDocuments = 0; + BucketDetails _currentBucketDetails; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 09db9c459b4..96014e572b9 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -277,6 +277,24 @@ TEST_F(BucketAutoTests, Returns3Of10RequestedBucketsWhen3ValuesInSource) { ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 2}, count : 1}"))); } +TEST_F(BucketAutoTests, PopulatesLastBucketWithRemainingDocuments) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 5}}"); + auto results = getResults(bucketAutoSpec, + {Document{{"x", 0}}, + Document{{"x", 1}}, + Document{{"x", 2}}, + Document{{"x", 3}}, + Document{{"x", 4}}, + Document{{"x", 5}}, + Document{{"x", 6}}}); + ASSERT_EQUALS(results.size(), 5UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 1}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 1, max : 2}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 3}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[3], Document(fromjson("{_id : {min : 3, max : 4}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[4], Document(fromjson("{_id : {min : 4, max : 6}, count : 3}"))); +} + TEST_F(BucketAutoTests, EvaluatesAccumulatorsInOutputField) { auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output : {avg : {$avg : '$x'}}}}"); @@ -733,6 +751,25 @@ TEST_F(BucketAutoTests, ShouldRoundDownFirstMinimumBoundaryWithGranularitySpecif ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 25, max : 63}, count : 2}"))); } +TEST_F(BucketAutoTests, PopulatesLastBucketWithRemainingDocumentsWithGranularitySpecified) { + auto bucketAutoSpec = + fromjson("{$bucketAuto : {groupBy : '$x', buckets : 5, granularity : 'R5'}}"); + auto results = getResults(bucketAutoSpec, + {Document{{"x", 24}}, + Document{{"x", 15}}, + Document{{"x", 30}}, + Document{{"x", 9}}, + Document{{"x", 3}}, + Document{{"x", 7}}, + Document{{"x", 101}}}); + ASSERT_EQUALS(results.size(), 5UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min: 2.5, max: 4.0}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 4.0, max : 10}, count : 2}"))); + ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 10, max : 16}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[3], Document(fromjson("{_id : {min : 16, max : 25}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[4], Document(fromjson("{_id : {min : 25, max : 160}, count : 2}"))); +} + TEST_F(BucketAutoTests, ShouldAbsorbAllValuesSmallerThanAdjustedBoundaryWithGranularitySpecified) { auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, granularity : 'R5'}}"); |