summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2016-09-13 17:36:52 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2016-09-13 17:46:37 -0400
commit4f211b037feac1d4c80d65df5a0fae12fcac1e4c (patch)
treee2b5ab9461919b265a7d929d0e22f457b7409671
parente41a6af318925f48e5626dd36a17531183e5aac2 (diff)
downloadmongo-4f211b037feac1d4c80d65df5a0fae12fcac1e4c.tar.gz
SERVER-25688 Allow $bucketAuto to spill to disk.
-rw-r--r--src/mongo/db/pipeline/document_source.h42
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp122
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp199
3 files changed, 263 insertions, 100 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 90f158aec1d..53ddc54332b 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -686,7 +686,7 @@ public:
static boost::intrusive_ptr<DocumentSourceGroup> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Expression>& groupByExpression,
- std::vector<AccumulationStatement> accumulationStatement,
+ std::vector<AccumulationStatement> accumulationStatements,
Variables::Id numVariables,
size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
@@ -2154,20 +2154,37 @@ public:
return this;
}
- static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ /**
+ * Convenience method to create a $bucketAuto stage.
+ *
+ * If 'accumulationStatements' is the empty vector, it will be filled in with the statement
+ * 'count: {$sum: 1}'.
+ */
static boost::intrusive_ptr<DocumentSourceBucketAuto> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- int numBuckets = 0,
- uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements = {},
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr,
+ uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
+ /**
+ * Parses a $bucketAuto stage from the user-supplied BSON.
+ */
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
- explicit DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- int numBuckets,
- uint64_t maxMemoryUsageBytes);
+ DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements,
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
+ uint64_t maxMemoryUsageBytes);
// struct for holding information about a bucket.
struct Bucket {
@@ -2196,11 +2213,6 @@ private:
void populateBuckets();
/**
- * Add an accumulator, which will become a field in each output bucket.
- */
- void addAccumulator(AccumulationStatement accumulationStatement);
-
- /**
* Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'.
*/
void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket);
@@ -2216,10 +2228,6 @@ private:
*/
Document makeDocument(const Bucket& bucket);
- void parseGroupByExpression(const BSONElement& groupByField, const VariablesParseState& vps);
-
- void setGranularity(std::string granularity);
-
std::unique_ptr<Sorter<Value, Document>> _sorter;
std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput;
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 1af3fe20795..3d89da9d790 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -88,6 +88,10 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
if (!_sorter) {
SortOptions opts;
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
+ if (pExpCtx->extSortAllowed && !pExpCtx->inRouter) {
+ opts.extSortAllowed = true;
+ opts.tempDir = pExpCtx->tempDir;
+ }
const auto& valueCmp = pExpCtx->getValueComparator();
auto comparator = [valueCmp](const Sorter<Value, Document>::Data& lhs,
const Sorter<Value, Document>::Data& rhs) {
@@ -347,23 +351,65 @@ Value DocumentSourceBucketAuto::serialize(bool explain) const {
}
intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create(
- const intrusive_ptr<ExpressionContext>& pExpCtx, int numBuckets, uint64_t maxMemoryUsageBytes) {
- return new DocumentSourceBucketAuto(pExpCtx, numBuckets, maxMemoryUsageBytes);
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements,
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
+ uint64_t maxMemoryUsageBytes) {
+ uassert(40243,
+ str::stream() << "The $bucketAuto 'buckets' field must be greater than 0, but found: "
+ << numBuckets,
+ numBuckets > 0);
+ // If there is no output field specified, then add the default one.
+ if (accumulationStatements.empty()) {
+ accumulationStatements.emplace_back("count",
+ AccumulationStatement::getFactory("$sum"),
+ ExpressionConstant::create(pExpCtx, Value(1)));
+ }
+ return new DocumentSourceBucketAuto(pExpCtx,
+ groupByExpression,
+ numVariables,
+ numBuckets,
+ accumulationStatements,
+ granularityRounder,
+ maxMemoryUsageBytes);
}
-DocumentSourceBucketAuto::DocumentSourceBucketAuto(const intrusive_ptr<ExpressionContext>& pExpCtx,
- int numBuckets,
- uint64_t maxMemoryUsageBytes)
- : DocumentSource(pExpCtx), _nBuckets(numBuckets), _maxMemoryUsageBytes(maxMemoryUsageBytes) {}
+DocumentSourceBucketAuto::DocumentSourceBucketAuto(
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements,
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
+ uint64_t maxMemoryUsageBytes)
+ : DocumentSource(pExpCtx),
+ _nBuckets(numBuckets),
+ _maxMemoryUsageBytes(maxMemoryUsageBytes),
+ _variables(stdx::make_unique<Variables>(numVariables)),
+ _groupByExpression(groupByExpression),
+ _granularityRounder(granularityRounder) {
+
+ invariant(!accumulationStatements.empty());
+ for (auto&& accumulationStatement : accumulationStatements) {
+ _fieldNames.push_back(std::move(accumulationStatement.fieldName));
+ _accumulatorFactories.push_back(accumulationStatement.factory);
+ _expressions.push_back(accumulationStatement.expression);
+ }
+}
-void DocumentSourceBucketAuto::parseGroupByExpression(const BSONElement& groupByField,
- const VariablesParseState& vps) {
+namespace {
+
+boost::intrusive_ptr<Expression> parseGroupByExpression(const BSONElement& groupByField,
+ const VariablesParseState& vps) {
if (groupByField.type() == BSONType::Object &&
groupByField.embeddedObject().firstElementFieldName()[0] == '$') {
- _groupByExpression = Expression::parseObject(groupByField.embeddedObject(), vps);
+ return Expression::parseObject(groupByField.embeddedObject(), vps);
} else if (groupByField.type() == BSONType::String &&
groupByField.valueStringData()[0] == '$') {
- _groupByExpression = ExpressionFieldPath::parse(groupByField.str(), vps);
+ return ExpressionFieldPath::parse(groupByField.str(), vps);
} else {
uasserted(
40239,
@@ -372,16 +418,7 @@ void DocumentSourceBucketAuto::parseGroupByExpression(const BSONElement& groupBy
<< groupByField.toString(false, false));
}
}
-
-void DocumentSourceBucketAuto::addAccumulator(AccumulationStatement accumulationStatement) {
- _fieldNames.push_back(accumulationStatement.fieldName);
- _accumulatorFactories.push_back(accumulationStatement.factory);
- _expressions.push_back(accumulationStatement.expression);
-}
-
-void DocumentSourceBucketAuto::setGranularity(string granularity) {
- _granularityRounder = GranularityRounder::getGranularityRounder(std::move(granularity));
-}
+} // namespace
intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
@@ -390,17 +427,17 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
<< typeName(elem.type()),
elem.type() == BSONType::Object);
- intrusive_ptr<DocumentSourceBucketAuto> bucketAuto(DocumentSourceBucketAuto::create(pExpCtx));
-
- const BSONObj bucketAutoObj = elem.embeddedObject();
VariablesIdGenerator idGenerator;
VariablesParseState vps(&idGenerator);
- bool outputFieldSpecified = false;
+ vector<AccumulationStatement> accumulationStatements;
+ boost::intrusive_ptr<Expression> groupByExpression;
+ boost::optional<int> numBuckets;
+ boost::intrusive_ptr<GranularityRounder> granularityRounder;
- for (auto&& argument : bucketAutoObj) {
+ for (auto&& argument : elem.Obj()) {
const auto argName = argument.fieldNameStringData();
if ("groupBy" == argName) {
- bucketAuto->parseGroupByExpression(argument, vps);
+ groupByExpression = parseGroupByExpression(argument, vps);
} else if ("buckets" == argName) {
Value bucketsValue = Value(argument);
@@ -417,13 +454,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
<< Value(argument).coerceToDouble(),
bucketsValue.integral());
- int numBuckets = bucketsValue.coerceToInt();
- uassert(40243,
- str::stream()
- << "The $bucketAuto 'buckets' field must be greater than 0, but found: "
- << numBuckets,
- numBuckets > 0);
- bucketAuto->_nBuckets = numBuckets;
+ numBuckets = bucketsValue.coerceToInt();
} else if ("output" == argName) {
uassert(40244,
str::stream()
@@ -431,9 +462,8 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
<< typeName(argument.type()),
argument.type() == BSONType::Object);
- outputFieldSpecified = true;
for (auto&& outputField : argument.embeddedObject()) {
- bucketAuto->addAccumulator(
+ accumulationStatements.push_back(
AccumulationStatement::parseAccumulationStatement(outputField, vps));
}
} else if ("granularity" == argName) {
@@ -442,7 +472,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
<< "The $bucketAuto 'granularity' field must be a string, but found type: "
<< typeName(argument.type()),
argument.type() == BSONType::String);
- bucketAuto->setGranularity(argument.str());
+ granularityRounder = GranularityRounder::getGranularityRounder(argument.str());
} else {
uasserted(40245, str::stream() << "Unrecognized option to $bucketAuto: " << argName);
}
@@ -450,18 +480,14 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
uassert(40246,
"$bucketAuto requires 'groupBy' and 'buckets' to be specified",
- bucketAuto->_groupByExpression && bucketAuto->_nBuckets > 0);
-
- // If there is no output field specified, then add the default one.
- if (!outputFieldSpecified) {
- bucketAuto->addAccumulator({"count",
- AccumulationStatement::getFactory("$sum"),
- ExpressionConstant::create(pExpCtx, Value(1))});
- }
-
- bucketAuto->_variables.reset(new Variables(idGenerator.getIdCount()));
-
- return bucketAuto;
+ groupByExpression && numBuckets);
+
+ return DocumentSourceBucketAuto::create(pExpCtx,
+ groupByExpression,
+ idGenerator.getIdCount(),
+ numBuckets.get(),
+ accumulationStatements,
+ granularityRounder);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index f572925b42f..484826aaea6 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -341,6 +342,97 @@ TEST_F(BucketAutoTests, ShouldPropagatePauses) {
ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
}
+TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) {
+ auto expCtx = getExpCtx();
+ unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
+ expCtx->tempDir = tempDir.path();
+ expCtx->extSortAllowed = true;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ auto groupByExpression = ExpressionFieldPath::parse("$a", vps);
+
+ const int numBuckets = 2;
+ auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx,
+ groupByExpression,
+ idGen.getIdCount(),
+ numBuckets,
+ {},
+ nullptr,
+ maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}},
+ Document{{"a", 1}, {"largeStr", largeStr}},
+ Document{{"a", 2}, {"largeStr", largeStr}},
+ Document{{"a", 3}, {"largeStr", largeStr}}});
+ bucketAutoStage->setSource(mock.get());
+
+ auto next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"_id", Document{{"min", 0}, {"max", 2}}}, {"count", 2}}));
+
+ next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"_id", Document{{"min", 2}, {"max", 3}}}, {"count", 2}}));
+
+ ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
+}
+
+TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) {
+ auto expCtx = getExpCtx();
+
+ // Allow the $sort stage to spill to disk.
+ unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
+ expCtx->tempDir = tempDir.path();
+ expCtx->extSortAllowed = true;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ auto groupByExpression = ExpressionFieldPath::parse("$a", vps);
+
+ const int numBuckets = 2;
+ auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx,
+ groupByExpression,
+ idGen.getIdCount(),
+ numBuckets,
+ {},
+ nullptr,
+ maxMemoryUsageBytes);
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 1}, {"largeStr", largeStr}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"a", 2}, {"largeStr", largeStr}},
+ Document{{"a", 3}, {"largeStr", largeStr}}});
+ bucketAutoStage->setSource(mock.get());
+
+ // There were 2 pauses, so we should expect 2 paused results before any results can be
+ // returned.
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+
+ // Now we expect to get the results back.
+ auto next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"_id", Document{{"min", 0}, {"max", 2}}}, {"count", 2}}));
+
+ next = bucketAutoStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(),
+ (Document{{"_id", Document{{"min", 2}, {"max", 3}}}, {"count", 2}}));
+
+ ASSERT_TRUE(bucketAutoStage->getNext().isEOF());
+}
+
TEST_F(BucketAutoTests, SourceNameIsBucketAuto) {
auto bucketAuto = createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"));
ASSERT_EQUALS(string(bucketAuto->getSourceName()), "$bucketAuto");
@@ -441,14 +533,6 @@ TEST_F(BucketAutoTests, ShouldBeAbleToReParseSerializedStage) {
ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
}
-TEST_F(BucketAutoTests, ReturnsNoBucketsWhenNoBucketsAreSpecifiedInCreate) {
- auto mock = DocumentSourceMock::create(Document{{"x", 1}});
- auto bucketAuto = DocumentSourceBucketAuto::create(getExpCtx());
-
- bucketAuto->setSource(mock.get());
- ASSERT(bucketAuto->getNext().isEOF());
-}
-
TEST_F(BucketAutoTests, FailsWithInvalidNumberOfBuckets) {
auto spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 'test'}}");
ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40241);
@@ -464,6 +548,14 @@ TEST_F(BucketAutoTests, FailsWithInvalidNumberOfBuckets) {
spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : -1}}");
ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40243);
+
+ // Use the create() helper.
+ const int numBuckets = 0;
+ ASSERT_THROWS_CODE(
+ DocumentSourceBucketAuto::create(
+ getExpCtx(), ExpressionConstant::create(getExpCtx(), Value(0)), 0, numBuckets),
+ UserException,
+ 40243);
}
TEST_F(BucketAutoTests, FailsWithNonExpressionGroupBy) {
@@ -542,39 +634,76 @@ TEST_F(BucketAutoTests, FailsWithInvalidOutputFieldName) {
ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40236);
}
+void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ const size_t maxMemoryUsageBytes = 1000;
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ auto groupByExpression = ExpressionFieldPath::parse("$a", vps);
+
+ const int numBuckets = 2;
+ auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx,
+ groupByExpression,
+ idGen.getIdCount(),
+ numBuckets,
+ {},
+ nullptr,
+ maxMemoryUsageBytes);
+
+ string largeStr(maxMemoryUsageBytes, 'x');
+ auto mock = DocumentSourceMock::create(
+ {Document{{"a", 0}, {"largeStr", largeStr}}, Document{{"a", 1}, {"largeStr", largeStr}}});
+ bucketAutoStage->setSource(mock.get());
+
+ ASSERT_THROWS_CODE(bucketAutoStage->getNext(), UserException, 16819);
+}
+
TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) {
- const uint64_t maxMemoryUsageBytes = 1000;
- deque<DocumentSource::GetNextResult> inputs;
- auto largeStr = string(maxMemoryUsageBytes, 'b');
- auto inputDoc = Document{{"a", largeStr}};
- ASSERT_GTE(inputDoc.getApproximateSize(), maxMemoryUsageBytes);
- inputs.emplace_back(std::move(inputDoc));
- inputs.emplace_back(Document{{"a", largeStr}});
- auto mock = DocumentSourceMock::create(inputs);
-
- const int numBuckets = 1;
- auto bucketAuto =
- DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes);
- bucketAuto->setSource(mock.get());
- ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819);
+ auto expCtx = getExpCtx();
+
+ expCtx->extSortAllowed = false;
+ expCtx->inRouter = false;
+ assertCannotSpillToDisk(expCtx);
+
+ expCtx->extSortAllowed = true;
+ expCtx->inRouter = true;
+ assertCannotSpillToDisk(expCtx);
+
+ expCtx->extSortAllowed = false;
+ expCtx->inRouter = true;
+ assertCannotSpillToDisk(expCtx);
}
-TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocumentsEvenIfPaused) {
- const uint64_t maxMemoryUsageBytes = 1000;
- auto largeStr = string(maxMemoryUsageBytes / 2, 'b');
- auto inputDoc = Document{{"a", largeStr}};
- ASSERT_GT(inputDoc.getApproximateSize(), maxMemoryUsageBytes / 2);
+TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
+ auto expCtx = getExpCtx();
+ expCtx->extSortAllowed = false;
+ const size_t maxMemoryUsageBytes = 1000;
+
+ VariablesIdGenerator idGen;
+ VariablesParseState vps(&idGen);
+ auto groupByExpression = ExpressionFieldPath::parse("$a", vps);
+
+ const int numBuckets = 2;
+ auto bucketAutoStage = DocumentSourceBucketAuto::create(expCtx,
+ groupByExpression,
+ idGen.getIdCount(),
+ numBuckets,
+ {},
+ nullptr,
+ maxMemoryUsageBytes);
- auto mock = DocumentSourceMock::create({std::move(inputDoc),
+ string largeStr(maxMemoryUsageBytes / 2, 'x');
+ auto mock = DocumentSourceMock::create({Document{{"a", 0}, {"largeStr", largeStr}},
DocumentSource::GetNextResult::makePauseExecution(),
- Document{{"a", largeStr}}});
+ Document{{"a", 1}, {"largeStr", largeStr}},
+ Document{{"a", 2}, {"largeStr", largeStr}}});
+ bucketAutoStage->setSource(mock.get());
- const int numBuckets = 1;
- auto bucketAuto =
- DocumentSourceBucketAuto::create(getExpCtx(), numBuckets, maxMemoryUsageBytes);
- bucketAuto->setSource(mock.get());
- ASSERT_TRUE(bucketAuto->getNext().isPaused());
- ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819);
+ // The first getNext() should pause.
+ ASSERT_TRUE(bucketAutoStage->getNext().isPaused());
+
+ // The next should realize it's used too much memory.
+ ASSERT_THROWS_CODE(bucketAutoStage->getNext(), UserException, 16819);
}
TEST_F(BucketAutoTests, ShouldRoundUpMaximumBoundariesWithGranularitySpecified) {