diff options
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 42 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto_test.cpp | 199 |
3 files changed, 263 insertions, 100 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 90f158aec1d..53ddc54332b 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -686,7 +686,7 @@ public: static boost::intrusive_ptr<DocumentSourceGroup> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<Expression>& groupByExpression, - std::vector<AccumulationStatement> accumulationStatement, + std::vector<AccumulationStatement> accumulationStatements, Variables::Id numVariables, size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); @@ -2154,20 +2154,37 @@ public: return this; } - static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + /** + * Convenience method to create a $bucketAuto stage. + * + * If 'accumulationStatements' is the empty vector, it will be filled in with the statement + * 'count: {$sum: 1}'. + */ static boost::intrusive_ptr<DocumentSourceBucketAuto> create( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - int numBuckets = 0, - uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements = {}, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr, + uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); + /** + * Parses a $bucketAuto stage from the user-supplied BSON. + */ static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: - explicit DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - int numBuckets, - uint64_t maxMemoryUsageBytes); + DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder, + uint64_t maxMemoryUsageBytes); // struct for holding information about a bucket. struct Bucket { @@ -2196,11 +2213,6 @@ private: void populateBuckets(); /** - * Add an accumulator, which will become a field in each output bucket. - */ - void addAccumulator(AccumulationStatement accumulationStatement); - - /** * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'. */ void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket); @@ -2216,10 +2228,6 @@ private: */ Document makeDocument(const Bucket& bucket); - void parseGroupByExpression(const BSONElement& groupByField, const VariablesParseState& vps); - - void setGranularity(std::string granularity); - std::unique_ptr<Sorter<Value, Document>> _sorter; std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput; diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 1af3fe20795..3d89da9d790 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -88,6 +88,10 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() { if (!_sorter) { SortOptions opts; opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; + if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) { + opts.extSortAllowed = true; + opts.tempDir = pExpCtx->tempDir; + } const auto& valueCmp = pExpCtx->getValueComparator(); auto comparator = [valueCmp](const Sorter<Value, Document>::Data& lhs, const Sorter<Value, Document>::Data& rhs) { @@ -347,23 +351,65 @@ Value DocumentSourceBucketAuto::serialize(bool explain) const { } intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create( - const intrusive_ptr<ExpressionContext>& pExpCtx, int numBuckets, uint64_t maxMemoryUsageBytes) { - return new DocumentSourceBucketAuto(pExpCtx, numBuckets, maxMemoryUsageBytes); + const intrusive_ptr<ExpressionContext>& pExpCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder, + uint64_t maxMemoryUsageBytes) { + uassert(40243, + str::stream() << "The $bucketAuto 'buckets' field must be greater than 0, but found: " + << numBuckets, + numBuckets > 0); + // If there is no output field specified, then add the default one. + if (accumulationStatements.empty()) { + accumulationStatements.emplace_back("count", + AccumulationStatement::getFactory("$sum"), + ExpressionConstant::create(pExpCtx, Value(1))); + } + return new DocumentSourceBucketAuto(pExpCtx, + groupByExpression, + numVariables, + numBuckets, + accumulationStatements, + granularityRounder, + maxMemoryUsageBytes); } -DocumentSourceBucketAuto::DocumentSourceBucketAuto(const intrusive_ptr<ExpressionContext>& pExpCtx, - int numBuckets, - uint64_t maxMemoryUsageBytes) - : DocumentSource(pExpCtx), _nBuckets(numBuckets), _maxMemoryUsageBytes(maxMemoryUsageBytes) {} +DocumentSourceBucketAuto::DocumentSourceBucketAuto( + const intrusive_ptr<ExpressionContext>& pExpCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder, + uint64_t maxMemoryUsageBytes) + : DocumentSource(pExpCtx), + _nBuckets(numBuckets), + _maxMemoryUsageBytes(maxMemoryUsageBytes), + _variables(stdx::make_unique<Variables>(numVariables)), + _groupByExpression(groupByExpression), + _granularityRounder(granularityRounder) { + + invariant(!accumulationStatements.empty()); + for (auto&& accumulationStatement : accumulationStatements) { + _fieldNames.push_back(std::move(accumulationStatement.fieldName)); + _accumulatorFactories.push_back(accumulationStatement.factory); + _expressions.push_back(accumulationStatement.expression); + } +} -void DocumentSourceBucketAuto::parseGroupByExpression(const BSONElement& groupByField, - const VariablesParseState& vps) { +namespace { + +boost::intrusive_ptr<Expression> parseGroupByExpression(const BSONElement& groupByField, + const VariablesParseState& vps) { if (groupByField.type() == BSONType::Object && groupByField.embeddedObject().firstElementFieldName()[0] == '$') { - _groupByExpression = Expression::parseObject(groupByField.embeddedObject(), vps); + return Expression::parseObject(groupByField.embeddedObject(), vps); } else if (groupByField.type() == BSONType::String && groupByField.valueStringData()[0] == '$') { - _groupByExpression = ExpressionFieldPath::parse(groupByField.str(), vps); + return ExpressionFieldPath::parse(groupByField.str(), vps); } else { uasserted( 40239, @@ -372,16 +418,7 @@ void DocumentSourceBucketAuto::parseGroupByExpression(const BSONElement& groupBy << groupByField.toString(false, false)); } } - -void DocumentSourceBucketAuto::addAccumulator(AccumulationStatement accumulationStatement) { - _fieldNames.push_back(accumulationStatement.fieldName); - _accumulatorFactories.push_back(accumulationStatement.factory); - _expressions.push_back(accumulationStatement.expression); -} - -void DocumentSourceBucketAuto::setGranularity(string granularity) { - _granularityRounder = GranularityRounder::getGranularityRounder(std::move(granularity)); -} +} // namespace intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { @@ -390,17 +427,17 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( << typeName(elem.type()), elem.type() == BSONType::Object); - intrusive_ptr<DocumentSourceBucketAuto> bucketAuto(DocumentSourceBucketAuto::create(pExpCtx)); - - const BSONObj bucketAutoObj = elem.embeddedObject(); VariablesIdGenerator idGenerator; VariablesParseState vps(&idGenerator); - bool outputFieldSpecified = false; + vector<AccumulationStatement> accumulationStatements; + boost::intrusive_ptr<Expression> groupByExpression; + boost::optional<int> numBuckets; + boost::intrusive_ptr<GranularityRounder> granularityRounder; - for (auto&& argument : bucketAutoObj) { + for (auto&& argument : elem.Obj()) { const auto argName = argument.fieldNameStringData(); if ("groupBy" == argName) { - bucketAuto->parseGroupByExpression(argument, vps); + groupByExpression = parseGroupByExpression(argument, vps); } else if ("buckets" == argName) { Value bucketsValue = Value(argument); @@ -417,13 +454,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( << Value(argument).coerceToDouble(), bucketsValue.integral()); - int numBuckets = bucketsValue.coerceToInt(); - uassert(40243, - str::stream() - << "The $bucketAuto 'buckets' field must be greater than 0, but found: " - << numBuckets, - numBuckets > 0); - bucketAuto->_nBuckets = numBuckets; + numBuckets = bucketsValue.coerceToInt(); } else if ("output" == argName) { uassert(40244, str::stream() @@ -431,9 +462,8 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( << typeName(argument.type()), argument.type() == BSONType::Object); - outputFieldSpecified = true; for (auto&& outputField : argument.embeddedObject()) { - bucketAuto->addAccumulator( + accumulationStatements.push_back( AccumulationStatement::parseAccumulationStatement(outputField, vps)); } } else if ("granularity" == argName) { @@ -442,7 +472,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( << "The $bucketAuto 'granularity' field must be a string, but found type: " << typeName(argument.type()), argument.type() == BSONType::String); - bucketAuto->setGranularity(argument.str()); + granularityRounder = GranularityRounder::getGranularityRounder(argument.str()); } else { uasserted(40245, str::stream() << "Unrecognized option to $bucketAuto: " << argName); } @@ -450,18 +480,14 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( uassert(40246, "$bucketAuto requires 'groupBy' and 'buckets' to be specified", - bucketAuto->_groupByExpression && bucketAuto->_nBuckets > 0); - - // If there is no output field specified, then add the default one. - if (!outputFieldSpecified) { - bucketAuto->addAccumulator({"count", - AccumulationStatement::getFactory("$sum"), - ExpressionConstant::create(pExpCtx, Value(1))}); - } - - bucketAuto->_variables.reset(new Variables(idGenerator.getIdCount())); - - return bucketAuto; + groupByExpression && numBuckets); + + return DocumentSourceBucketAuto::create(pExpCtx, + groupByExpression, + idGenerator.getIdCount(), + numBuckets.get(), + accumulationStatements, + granularityRounder); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index f572925b42f..484826aaea6 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" +#include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -341,6 +342,97 @@ TEST_F(BucketAutoTests, ShouldPropagatePauses) { ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); } +TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) { + auto expCtx = getExpCtx(); + unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); + expCtx->tempDir = tempDir.path(); + expCtx->extSortAllowed = true; + const size_t maxMemoryUsageBytes = 1000; + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + auto groupByExpression = ExpressionFieldPath::parse("$a", vps); + + const int numBuckets = 2; + auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx, + groupByExpression, + idGen.getIdCount(), + numBuckets, + {}, + nullptr, + maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}}, + Document{{"a", 1}, {"largeStr", largeStr}}, + Document{{"a", 2}, {"largeStr", largeStr}}, + Document{{"a", 3}, {"largeStr", largeStr}}}); + bucketAutoStage->setSource(mock.get()); + + auto next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"_id", Document{{"min", 0}, {"max", 2}}}, {"count", 2}})); + + next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"_id", Document{{"min", 2}, {"max", 3}}}, {"count", 2}})); + + ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); +} + +TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) { + auto expCtx = getExpCtx(); + + // Allow the $sort stage to spill to disk. + unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); + expCtx->tempDir = tempDir.path(); + expCtx->extSortAllowed = true; + const size_t maxMemoryUsageBytes = 1000; + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + auto groupByExpression = ExpressionFieldPath::parse("$a", vps); + + const int numBuckets = 2; + auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx, + groupByExpression, + idGen.getIdCount(), + numBuckets, + {}, + nullptr, + maxMemoryUsageBytes); + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 1}, {"largeStr", largeStr}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"a", 2}, {"largeStr", largeStr}}, + Document{{"a", 3}, {"largeStr", largeStr}}}); + bucketAutoStage->setSource(mock.get()); + + // There were 2 pauses, so we should expect 2 paused results before any results can be + // returned. + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + + // Now we expect to get the results back. + auto next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"_id", Document{{"min", 0}, {"max", 2}}}, {"count", 2}})); + + next = bucketAutoStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), + (Document{{"_id", Document{{"min", 2}, {"max", 3}}}, {"count", 2}})); + + ASSERT_TRUE(bucketAutoStage->getNext().isEOF()); +} + TEST_F(BucketAutoTests, SourceNameIsBucketAuto) { auto bucketAuto = createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}")); ASSERT_EQUALS(string(bucketAuto->getSourceName()), "$bucketAuto"); @@ -441,14 +533,6 @@ TEST_F(BucketAutoTests, ShouldBeAbleToReParseSerializedStage) { ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); } -TEST_F(BucketAutoTests, ReturnsNoBucketsWhenNoBucketsAreSpecifiedInCreate) { - auto mock = DocumentSourceMock::create(Document{{"x", 1}}); - auto bucketAuto = DocumentSourceBucketAuto::create(getExpCtx()); - - bucketAuto->setSource(mock.get()); - ASSERT(bucketAuto->getNext().isEOF()); -} - TEST_F(BucketAutoTests, FailsWithInvalidNumberOfBuckets) { auto spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 'test'}}"); ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40241); @@ -464,6 +548,14 @@ TEST_F(BucketAutoTests, FailsWithInvalidNumberOfBuckets) { spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : -1}}"); ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40243); + + // Use the create() helper. + const int numBuckets = 0; + ASSERT_THROWS_CODE( + DocumentSourceBucketAuto::create( + getExpCtx(), ExpressionConstant::create(getExpCtx(), Value(0)), 0, numBuckets), + UserException, + 40243); } TEST_F(BucketAutoTests, FailsWithNonExpressionGroupBy) { @@ -542,39 +634,76 @@ TEST_F(BucketAutoTests, FailsWithInvalidOutputFieldName) { ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40236); } +void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expCtx) { + const size_t maxMemoryUsageBytes = 1000; + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + auto groupByExpression = ExpressionFieldPath::parse("$a", vps); + + const int numBuckets = 2; + auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx, + groupByExpression, + idGen.getIdCount(), + numBuckets, + {}, + nullptr, + maxMemoryUsageBytes); + + string largeStr(maxMemoryUsageBytes, 'x'); + auto mock = DocumentSourceMock::create( + {Document{{"a", 0}, {"largeStr", largeStr}}, Document{{"a", 1}, {"largeStr", largeStr}}}); + bucketAutoStage->setSource(mock.get()); + + ASSERT_THROWS_CODE(bucketAutoStage->getNext(), UserException, 16819); +} + TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) { - const uint64_t maxMemoryUsageBytes = 1000; - deque<DocumentSource::GetNextResult> inputs; - auto largeStr = string(maxMemoryUsageBytes, 'b'); - auto inputDoc = Document{{"a", largeStr}}; - ASSERT_GTE(inputDoc.getApproximateSize(), maxMemoryUsageBytes); - inputs.emplace_back(std::move(inputDoc)); - inputs.emplace_back(Document{{"a", largeStr}}); - auto mock = DocumentSourceMock::create(inputs); - - const int numBuckets = 1; - auto bucketAuto = - DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes); - bucketAuto->setSource(mock.get()); - ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819); + auto expCtx = getExpCtx(); + + expCtx->extSortAllowed = false; + expCtx->inRouter = false; + assertCannotSpillToDisk(expCtx); + + expCtx->extSortAllowed = true; + expCtx->inRouter = true; + assertCannotSpillToDisk(expCtx); + + expCtx->extSortAllowed = false; + expCtx->inRouter = true; + assertCannotSpillToDisk(expCtx); } -TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocumentsEvenIfPaused) { - const uint64_t maxMemoryUsageBytes = 1000; - auto largeStr = string(maxMemoryUsageBytes / 2, 'b'); - auto inputDoc = Document{{"a", largeStr}}; - ASSERT_GT(inputDoc.getApproximateSize(), maxMemoryUsageBytes / 2); +TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { + auto expCtx = getExpCtx(); + expCtx->extSortAllowed = false; + const size_t maxMemoryUsageBytes = 1000; + + VariablesIdGenerator idGen; + VariablesParseState vps(&idGen); + auto groupByExpression = ExpressionFieldPath::parse("$a", vps); + + const int numBuckets = 2; + auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx, + groupByExpression, + idGen.getIdCount(), + numBuckets, + {}, + nullptr, + maxMemoryUsageBytes); - auto mock = DocumentSourceMock::create({std::move(inputDoc), + string largeStr(maxMemoryUsageBytes / 2, 'x'); + auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}}, DocumentSource::GetNextResult::makePauseExecution(), - Document{{"a", largeStr}}}); + Document{{"a", 1}, {"largeStr", largeStr}}, + Document{{"a", 2}, {"largeStr", largeStr}}}); + bucketAutoStage->setSource(mock.get()); - const int numBuckets = 1; - auto bucketAuto = - DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes); - bucketAuto->setSource(mock.get()); - ASSERT_TRUE(bucketAuto->getNext().isPaused()); - ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819); + // The first getNext() should pause. + ASSERT_TRUE(bucketAutoStage->getNext().isPaused()); + + // The next should realize it's used too much memory. + ASSERT_THROWS_CODE(bucketAutoStage->getNext(), UserException, 16819); } TEST_F(BucketAutoTests, ShouldRoundUpMaximumBoundariesWithGranularitySpecified) { |