summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp236
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h25
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp37
3 files changed, 153 insertions, 145 deletions
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 29199b451c9..06717200cda 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -95,18 +95,17 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() {
}
invariant(populationResult.isEOF());
- populateBuckets();
-
+ initalizeBucketIteration();
_populated = true;
- _bucketsIterator = _buckets.begin();
}
- if (_bucketsIterator == _buckets.end()) {
- dispose();
- return GetNextResult::makeEOF();
+ if (_currentBucketDetails.currentBucketNum++ < _nBuckets) {
+ if (auto bucket = populateNextBucket()) {
+ return makeDocument(*bucket);
+ }
}
-
- return makeDocument(*(_bucketsIterator++));
+ dispose();
+ return GetNextResult::makeEOF();
}
boost::intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::optimize() {
@@ -157,7 +156,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
for (; next.isAdvanced(); next = pSource->getNext()) {
auto nextDoc = next.releaseDocument();
_sorter->add(extractKey(nextDoc), nextDoc);
- _nDocuments++;
+ ++_nDocuments;
}
return next;
}
@@ -206,7 +205,8 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>&
}
}
-void DocumentSourceBucketAuto::populateBuckets() {
+void DocumentSourceBucketAuto::initalizeBucketIteration() {
+ // Initialize the iterator on '_sorter'.
invariant(_sorter);
_sortedInput.reset(_sorter->done());
_sorter.reset();
@@ -218,112 +218,110 @@ void DocumentSourceBucketAuto::populateBuckets() {
// Calculate the approximate bucket size. We attempt to fill each bucket with this many
// documents.
- long long approxBucketSize = std::round(double(_nDocuments) / double(_nBuckets));
+ _currentBucketDetails.approxBucketSize = std::round(double(_nDocuments) / double(_nBuckets));
- if (approxBucketSize < 1) {
- // If the number of buckets is larger than the number of documents, then we try to make as
- // many buckets as possible by placing each document in its own bucket.
- approxBucketSize = 1;
+ if (_currentBucketDetails.approxBucketSize < 1) {
+ // If the number of buckets is larger than the number of documents, then we try to make
+ // as many buckets as possible by placing each document in its own bucket.
+ _currentBucketDetails.approxBucketSize = 1;
}
+}
- boost::optional<pair<Value, Document>> firstEntryInNextBucket;
+boost::optional<pair<Value, Document>>
+DocumentSourceBucketAuto::adjustBoundariesAndGetMinForNextBucket(Bucket* currentBucket) {
+ auto getNextValIfPresent = [this]() {
+ return _sortedInput->more() ? boost::optional<pair<Value, Document>>(_sortedInput->next())
+ : boost::none;
+ };
- // Start creating and populating the buckets.
- for (int i = 0; i < _nBuckets; i++) {
- bool isLastBucket = (i == _nBuckets - 1);
+ auto nextValue = getNextValIfPresent();
+ if (_granularityRounder) {
+ Value boundaryValue = _granularityRounder->roundUp(currentBucket->_max);
+
+ // If there are any values that now fall into this bucket after we round the
+ // boundary, absorb them into this bucket too.
+ while (nextValue &&
+ pExpCtx->getValueComparator().evaluate(boundaryValue > nextValue->first)) {
+ addDocumentToBucket(*nextValue, *currentBucket);
+ nextValue = getNextValIfPresent();
+ }
- // Get the first value to place in this bucket.
- pair<Value, Document> currentValue;
- if (firstEntryInNextBucket) {
- currentValue = *firstEntryInNextBucket;
- firstEntryInNextBucket = boost::none;
- } else if (_sortedInput->more()) {
- currentValue = _sortedInput->next();
+ // Handle the special case where the largest value in the first bucket is zero. In this
+ // case, we take the minimum boundary of the next bucket and round it down. We then set the
+ // maximum boundary of the current bucket to be the rounded down value. This maintains that
+ // the maximum boundary of the current bucket is exclusive and the minimum boundary of the
+ // next bucket is inclusive.
+ double currentMax = boundaryValue.coerceToDouble();
+ if (currentMax == 0.0 && nextValue) {
+ currentBucket->_max = _granularityRounder->roundDown(nextValue->first);
} else {
- // No more values to process.
- break;
+ currentBucket->_max = boundaryValue;
+ }
+ } else {
+ // If there are any more values that are equal to the boundary value, then absorb
+ // them into the current bucket too.
+ while (nextValue &&
+ pExpCtx->getValueComparator().evaluate(currentBucket->_max == nextValue->first)) {
+ addDocumentToBucket(*nextValue, *currentBucket);
+ nextValue = getNextValIfPresent();
}
- // Initialize the current bucket.
- Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields);
-
- // Evaluate each initializer against an empty document. Normally the
- // initializer can refer to the group key, but in $bucketAuto there is no single
- // group key per bucket.
- Document emptyDoc;
- for (size_t k = 0; k < _accumulatedFields.size(); ++k) {
- Value initializerValue =
- _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables);
- currentBucket._accums[k]->startNewGroup(initializerValue);
+ // If there is a bucket that comes after the current bucket, then the current bucket's max
+ // boundary is updated to the next bucket's min. This makes it so that buckets' min
+ // boundaries are inclusive and max boundaries are exclusive (except for the last bucket,
+ // which has an inclusive max).
+ if (nextValue) {
+ currentBucket->_max = nextValue->first;
}
+ }
+ return nextValue;
+}
- // Add the first value into the current bucket.
- addDocumentToBucket(currentValue, currentBucket);
+boost::optional<DocumentSourceBucketAuto::Bucket> DocumentSourceBucketAuto::populateNextBucket() {
+ // If there was a bucket before this, the 'currentMin' should be populated, or there are no more
+ // documents.
+ if (!_currentBucketDetails.currentMin && !_sortedInput->more()) {
+ return {};
+ }
- if (isLastBucket) {
- // If this is the last bucket allowed, we need to put any remaining documents in
- // the current bucket.
- while (_sortedInput->more()) {
- addDocumentToBucket(_sortedInput->next(), currentBucket);
- }
- } else {
- // We go to approxBucketSize - 1 because we already added the first value in order
- // to keep track of the minimum value.
- for (long long j = 0; j < approxBucketSize - 1; j++) {
- if (_sortedInput->more()) {
- addDocumentToBucket(_sortedInput->next(), currentBucket);
- } else {
- // No more values to process.
- break;
- }
- }
+ std::pair<Value, Document> currentValue =
+ _currentBucketDetails.currentMin ? *_currentBucketDetails.currentMin : _sortedInput->next();
- boost::optional<pair<Value, Document>> nextValue = _sortedInput->more()
- ? boost::optional<pair<Value, Document>>(_sortedInput->next())
- : boost::none;
-
- if (_granularityRounder) {
- Value boundaryValue = _granularityRounder->roundUp(currentBucket._max);
- // If there are any values that now fall into this bucket after we round the
- // boundary, absorb them into this bucket too.
- while (nextValue &&
- pExpCtx->getValueComparator().evaluate(boundaryValue > nextValue->first)) {
- addDocumentToBucket(*nextValue, currentBucket);
- nextValue = _sortedInput->more()
- ? boost::optional<pair<Value, Document>>(_sortedInput->next())
- : boost::none;
- }
- if (nextValue) {
- currentBucket._max = boundaryValue;
- }
- } else {
- // If there are any more values that are equal to the boundary value, then absorb
- // them into the current bucket too.
- while (nextValue &&
- pExpCtx->getValueComparator().evaluate(currentBucket._max ==
- nextValue->first)) {
- addDocumentToBucket(*nextValue, currentBucket);
- nextValue = _sortedInput->more()
- ? boost::optional<pair<Value, Document>>(_sortedInput->next())
- : boost::none;
- }
- }
- firstEntryInNextBucket = nextValue;
- }
+ Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields);
+
+ // If we have a granularity specified and if there is a bucket that came before the current
+ // bucket being added, then the current bucket's min boundary is updated to be the previous
+ // bucket's max boundary. This makes it so that bucket boundaries follow the granularity, have
+ // inclusive minimums, and have exclusive maximums.
+ if (_granularityRounder) {
+ currentBucket._min = _currentBucketDetails.previousMax.value_or(
+ _granularityRounder->roundDown(currentValue.first));
+ }
- // Add the current bucket to the vector of buckets.
- addBucket(currentBucket);
+ // Evaluate each initializer against an empty document. Normally the initializer can refer to
+ // the group key, but in $bucketAuto there is no single group key per bucket.
+ Document emptyDoc;
+ for (size_t k = 0; k < _accumulatedFields.size(); ++k) {
+ Value initializerValue =
+ _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables);
+ currentBucket._accums[k]->startNewGroup(initializerValue);
}
- if (!_buckets.empty() && _granularityRounder) {
- // If we we have a granularity, we round the first bucket's minimum down and the last
- // bucket's maximum up. This way all of the bucket boundaries are rounded to numbers in the
- // granularity specification.
- Bucket& firstBucket = _buckets.front();
- Bucket& lastBucket = _buckets.back();
- firstBucket._min = _granularityRounder->roundDown(firstBucket._min);
- lastBucket._max = _granularityRounder->roundUp(lastBucket._max);
+
+ // Add 'approxBucketSize' number of documents to the current bucket. If this is the last bucket,
+ // add all the remaining documents.
+ addDocumentToBucket(currentValue, currentBucket);
+ const auto isLastBucket = (_currentBucketDetails.currentBucketNum == _nBuckets);
+ for (long long i = 1;
+ _sortedInput->more() && (i < _currentBucketDetails.approxBucketSize || isLastBucket);
+ i++) {
+ addDocumentToBucket(_sortedInput->next(), currentBucket);
}
+
+ // Modify the bucket details for next bucket.
+ _currentBucketDetails.currentMin = adjustBoundariesAndGetMinForNextBucket(&currentBucket);
+ _currentBucketDetails.previousMax = currentBucket._max;
+ return currentBucket;
}
DocumentSourceBucketAuto::Bucket::Bucket(
@@ -338,37 +336,6 @@ DocumentSourceBucketAuto::Bucket::Bucket(
}
}
-void DocumentSourceBucketAuto::addBucket(Bucket& newBucket) {
- if (!_buckets.empty()) {
- Bucket& previous = _buckets.back();
- if (_granularityRounder) {
- // If we have a granularity specified and if there is a bucket that comes before the new
- // bucket being added, then the new bucket's min boundary is updated to be the
- // previous bucket's max boundary. This makes it so that bucket boundaries follow the
- // granularity, have inclusive minimums, and have exclusive maximums.
-
- double prevMax = previous._max.coerceToDouble();
- if (prevMax == 0.0) {
- // Handle the special case where the largest value in the first bucket is zero. In
- // this case, we take the minimum boundary of the second bucket and round it down.
- // We then set the maximum boundary of the first bucket to be the rounded down
- // value. This maintains that the maximum boundary of the first bucket is exclusive
- // and the minimum boundary of the second bucket is inclusive.
- previous._max = _granularityRounder->roundDown(newBucket._min);
- }
-
- newBucket._min = previous._max;
- } else {
- // If there is a bucket that comes before the new bucket being added, then the previous
- // bucket's max boundary is updated to the new bucket's min. This makes it so that
- // buckets' min boundaries are inclusive and max boundaries are exclusive (except for
- // the last bucket, which has an inclusive max).
- previous._max = newBucket._min;
- }
- }
- _buckets.push_back(newBucket);
-}
-
Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) {
const size_t nAccumulatedFields = _accumulatedFields.size();
MutableDocument out(1 + nAccumulatedFields);
@@ -389,7 +356,6 @@ Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) {
void DocumentSourceBucketAuto::doDispose() {
_sortedInput.reset();
- _bucketsIterator = _buckets.end();
}
Value DocumentSourceBucketAuto::serialize(
@@ -451,11 +417,11 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto(
const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
uint64_t maxMemoryUsageBytes)
: DocumentSource(kStageName, pExpCtx),
- _nBuckets(numBuckets),
_maxMemoryUsageBytes(maxMemoryUsageBytes),
_groupByExpression(groupByExpression),
- _granularityRounder(granularityRounder) {
-
+ _granularityRounder(granularityRounder),
+ _nBuckets(numBuckets),
+ _currentBucketDetails{0} {
invariant(!accumulationStatements.empty());
for (auto&& accumulationStatement : accumulationStatements) {
_accumulatedFields.push_back(accumulationStatement);
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index fcc35a130d3..44bd1e3f8f0 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -118,6 +118,13 @@ private:
std::vector<boost::intrusive_ptr<AccumulatorState>> _accums;
};
+ struct BucketDetails {
+ int currentBucketNum;
+ long long approxBucketSize = 0;
+ boost::optional<Value> previousMax;
+ boost::optional<std::pair<Value, Document>> currentMin;
+ };
+
/**
* Consumes all of the documents from the source in the pipeline and sorts them by their
* 'groupBy' value. This method might not be able to finish populating the sorter in a single
@@ -126,27 +133,26 @@ private:
*/
GetNextResult populateSorter();
+ void initalizeBucketIteration();
+
/**
* Computes the 'groupBy' expression value for 'doc'.
*/
Value extractKey(const Document& doc);
/**
- * Calculates the bucket boundaries for the input documents and places them into buckets.
+ * Returns the next bucket if exists. boost::none if none exist.
*/
- void populateBuckets();
+ boost::optional<Bucket> populateNextBucket();
+ boost::optional<std::pair<Value, Document>> adjustBoundariesAndGetMinForNextBucket(
+ Bucket* currentBucket);
/**
* Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'.
*/
void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket);
/**
- * Adds 'newBucket' to _buckets and updates any boundaries if necessary.
- */
- void addBucket(Bucket& newBucket);
-
- /**
* Makes a document using the information from bucket. This is what is returned when getNext()
* is called.
*/
@@ -157,14 +163,13 @@ private:
std::vector<AccumulationStatement> _accumulatedFields;
- int _nBuckets;
uint64_t _maxMemoryUsageBytes;
bool _populated = false;
- std::vector<Bucket> _buckets;
- std::vector<Bucket>::iterator _bucketsIterator;
boost::intrusive_ptr<Expression> _groupByExpression;
boost::intrusive_ptr<GranularityRounder> _granularityRounder;
+ int _nBuckets;
long long _nDocuments = 0;
+ BucketDetails _currentBucketDetails;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 09db9c459b4..96014e572b9 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -277,6 +277,24 @@ TEST_F(BucketAutoTests, Returns3Of10RequestedBucketsWhen3ValuesInSource) {
ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 2}, count : 1}")));
}
+TEST_F(BucketAutoTests, PopulatesLastBucketWithRemainingDocuments) {
+ auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 5}}");
+ auto results = getResults(bucketAutoSpec,
+ {Document{{"x", 0}},
+ Document{{"x", 1}},
+ Document{{"x", 2}},
+ Document{{"x", 3}},
+ Document{{"x", 4}},
+ Document{{"x", 5}},
+ Document{{"x", 6}}});
+ ASSERT_EQUALS(results.size(), 5UL);
+ ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 1}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 1, max : 2}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 3}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[3], Document(fromjson("{_id : {min : 3, max : 4}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[4], Document(fromjson("{_id : {min : 4, max : 6}, count : 3}")));
+}
+
TEST_F(BucketAutoTests, EvaluatesAccumulatorsInOutputField) {
auto bucketAutoSpec =
fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output : {avg : {$avg : '$x'}}}}");
@@ -733,6 +751,25 @@ TEST_F(BucketAutoTests, ShouldRoundDownFirstMinimumBoundaryWithGranularitySpecif
ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 25, max : 63}, count : 2}")));
}
+TEST_F(BucketAutoTests, PopulatesLastBucketWithRemainingDocumentsWithGranularitySpecified) {
+ auto bucketAutoSpec =
+ fromjson("{$bucketAuto : {groupBy : '$x', buckets : 5, granularity : 'R5'}}");
+ auto results = getResults(bucketAutoSpec,
+ {Document{{"x", 24}},
+ Document{{"x", 15}},
+ Document{{"x", 30}},
+ Document{{"x", 9}},
+ Document{{"x", 3}},
+ Document{{"x", 7}},
+ Document{{"x", 101}}});
+ ASSERT_EQUALS(results.size(), 5UL);
+ ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min: 2.5, max: 4.0}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 4.0, max : 10}, count : 2}")));
+ ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 10, max : 16}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[3], Document(fromjson("{_id : {min : 16, max : 25}, count : 1}")));
+ ASSERT_DOCUMENT_EQ(results[4], Document(fromjson("{_id : {min : 25, max : 160}, count : 2}")));
+}
+
TEST_F(BucketAutoTests, ShouldAbsorbAllValuesSmallerThanAdjustedBoundaryWithGranularitySpecified) {
auto bucketAutoSpec =
fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, granularity : 'R5'}}");