summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2019-11-09 01:19:40 +0000
committerevergreen <evergreen@mongodb.com>2019-11-09 01:19:40 +0000
commit504b518b9bd432a1d614d06f004712e70a1a754b (patch)
tree8c481a964588f62c4a3a8717bb2ed4c5af42a58b
parentccbad450aec9759834dafe2bd5f8fb0032704335 (diff)
downloadmongo-504b518b9bd432a1d614d06f004712e70a1a754b.tar.gz
SERVER-44174 $push and $addToSet should restrict memory usage
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml2
-rw-r--r--jstests/aggregation/bugs/server9444.js69
-rw-r--r--jstests/aggregation/spill_to_disk.js116
-rw-r--r--src/mongo/db/pipeline/accumulator.h12
-rw-r--r--src/mongo/db/pipeline/accumulator_add_to_set.cpp40
-rw-r--r--src/mongo/db/pipeline/accumulator_push.cpp37
-rw-r--r--src/mongo/db/pipeline/accumulator_test.cpp22
7 files changed, 194 insertions, 104 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
index 53520f6a460..ab768bb7116 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
@@ -15,7 +15,7 @@ selector:
- jstests/aggregation/bugs/lookup_unwind_killcursor.js
# This test makes assertions about whether aggregations will need to spill to disk, which assumes
# all the data is located on a single shard.
- - jstests/aggregation/bugs/server9444.js
+ - jstests/aggregation/spill_to_disk.js
# TODO SERVER-32311: These tests use getAggPlanStage(), which can't handle sharded explain output.
- jstests/aggregation/match_swapping_renamed_fields.js
- jstests/aggregation/use_query_project_and_sort.js
diff --git a/jstests/aggregation/bugs/server9444.js b/jstests/aggregation/bugs/server9444.js
deleted file mode 100644
index 101e29269e7..00000000000
--- a/jstests/aggregation/bugs/server9444.js
+++ /dev/null
@@ -1,69 +0,0 @@
-// server-9444 support disk storage of intermediate results in aggregation
-//
-// Run only when pipeline optimization is enabled, otherwise the type of
-// sorter being used can be different (NoLimitSort vs TopKSort) causing
-// an aggregation request to fail with different error codes.
-// @tags: [requires_pipeline_optimization]
-(function() {
-'use strict';
-
-load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers'
-
-const t = db.server9444;
-t.drop();
-
-const sharded = FixtureHelpers.isSharded(t);
-
-var memoryLimitMB = sharded ? 200 : 100;
-
-function loadData() {
- var bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ','
- for (var i = 0; i < memoryLimitMB + 1; i++)
- t.insert({_id: i, bigStr: i + bigStr, random: Math.random()});
-
- assert.gt(t.stats().size, memoryLimitMB * 1024 * 1024);
-}
-loadData();
-
-function test(pipeline, outOfMemoryCode) {
- // ensure by default we error out if exceeding memory limit
- var res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}});
- assert.commandFailed(res);
- assert.eq(res.code, outOfMemoryCode);
-
- // ensure allowDiskUse: false does what it says
- res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: false});
- assert.commandFailed(res);
- assert.eq(res.code, outOfMemoryCode);
-
- // allowDiskUse only supports bool. In particular, numbers aren't allowed.
- res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: 1});
- assert.commandFailed(res);
-
- // ensure we work when allowDiskUse === true
- res = t.aggregate(pipeline, {allowDiskUse: true});
- assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc
-}
-
-var groupCode = 16945;
-var sortCode = 16819;
-var sortLimitCode = 16820;
-
-test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}], groupCode);
-
-// sorting with _id would use index which doesn't require extsort
-test([{$sort: {random: 1}}], sortCode);
-test([{$sort: {bigStr: 1}}], sortCode); // big key and value
-
-// make sure sort + large limit won't crash the server (SERVER-10136)
-test([{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}], sortLimitCode);
-
-// test combining two extSorts in both same and different orders
-test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: 1}}], groupCode);
-test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: -1}}], groupCode);
-test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {random: 1}}], groupCode);
-test([{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], sortCode);
-
-// don't leave large collection laying around
-t.drop();
-})();
diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js
new file mode 100644
index 00000000000..cc58149b881
--- /dev/null
+++ b/jstests/aggregation/spill_to_disk.js
@@ -0,0 +1,116 @@
+// Tests the support for disk storage of intermediate results in aggregation.
+//
+// Run only when pipeline optimization is enabled, otherwise the type of sorter being used can be
+// different (NoLimitSort vs TopKSort) causing an aggregation request to fail with different error
+// codes.
+// @tags: [requires_pipeline_optimization]
+(function() {
+'use strict';
+
+load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers'
+
+const coll = db.spill_to_disk;
+coll.drop();
+
+const sharded = FixtureHelpers.isSharded(coll);
+
+const memoryLimitMB = sharded ? 200 : 100;
+
+const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ','
+for (let i = 0; i < memoryLimitMB + 1; i++)
+ coll.insert({_id: i, bigStr: i + bigStr, random: Math.random()});
+
+assert.gt(coll.stats().size, memoryLimitMB * 1024 * 1024);
+
+function test({pipeline, expectedCodes, canSpillToDisk}) {
+ // Test that by default we error out if exceeding memory limit.
+ assert.commandFailedWithCode(
+ db.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}}), expectedCodes);
+
+ // Test that 'allowDiskUse: false' does indeed prevent spilling to disk.
+ assert.commandFailedWithCode(
+ db.runCommand(
+ {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: false}),
+ expectedCodes);
+
+ // Test that allowDiskUse only supports bool. In particular, numbers aren't allowed.
+ assert.commandFailed(db.runCommand(
+ {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: 1}));
+
+ // If this command supports spilling to disk, ensure that it will succeed when disk use is
+ // allowed.
+ let res = db.runCommand(
+ {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true});
+ if (canSpillToDisk) {
+ assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(),
+ coll.count()); // all tests output one doc per input doc
+ } else {
+ assert.commandFailedWithCode(res, [ErrorCodes.ExceededMemoryLimit, expectedCodes]);
+ }
+}
+
+const groupCode = 16945;
+const sortCode = 16819;
+const sortLimitCode = 16820;
+
+test({
+ pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}],
+ expectedCodes: groupCode,
+ canSpillToDisk: true
+});
+
+// Sorting with _id would use index which doesn't require external sort, so sort by 'random'
+// instead.
+test({pipeline: [{$sort: {random: 1}}], expectedCodes: sortCode, canSpillToDisk: true});
+test({
+ pipeline: [{$sort: {bigStr: 1}}], // big key and value
+ expectedCodes: sortCode,
+ canSpillToDisk: true
+});
+
+// Test that sort + large limit won't crash the server (SERVER-10136)
+test({
+ pipeline: [{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}],
+ expectedCodes: sortLimitCode,
+ canSpillToDisk: true
+});
+
+// Test combining two external sorts in both same and different orders.
+test({
+ pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: 1}}],
+ expectedCodes: groupCode,
+ canSpillToDisk: true
+});
+test({
+ pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: -1}}],
+ expectedCodes: groupCode,
+ canSpillToDisk: true
+});
+test({
+ pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {random: 1}}],
+ expectedCodes: groupCode,
+ canSpillToDisk: true
+});
+test({
+ pipeline: [{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}],
+ expectedCodes: sortCode,
+ canSpillToDisk: true
+});
+
+// Test accumulating all values into one array. On debug builds we will spill to disk for $group and
+// so may hit the group error code before we hit ExceededMemoryLimit.
+test({
+ pipeline: [{$group: {_id: null, bigArray: {$push: '$bigStr'}}}],
+ expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit],
+ canSpillToDisk: false
+});
+test({
+ pipeline:
+ [{$group: {_id: null, bigArray: {$addToSet: {$concat: ['$bigStr', {$toString: "$_id"}]}}}}],
+ expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit],
+ canSpillToDisk: false
+});
+
+// don't leave large collection laying around
+coll.drop();
+})();
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index 2bd9ed9f482..12c0c922ef2 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -148,7 +148,9 @@ genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext>
class AccumulatorAddToSet final : public Accumulator {
public:
- explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
void processInternal(const Value& input, bool merging) final;
Value getValue(bool toBeMerged) final;
@@ -168,6 +170,7 @@ public:
private:
ValueUnorderedSet _set;
+ int _maxMemUsageBytes;
};
class AccumulatorFirst final : public Accumulator {
@@ -282,7 +285,9 @@ public:
class AccumulatorPush final : public Accumulator {
public:
- explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
void processInternal(const Value& input, bool merging) final;
Value getValue(bool toBeMerged) final;
@@ -293,7 +298,8 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
- std::vector<Value> vpValue;
+ std::vector<Value> _array;
+ int _maxMemUsageBytes;
};
class AccumulatorAvg final : public Accumulator {
diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
index b8f366f6c4f..a899944a761 100644
--- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp
+++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
@@ -47,26 +47,29 @@ const char* AccumulatorAddToSet::getOpName() const {
}
void AccumulatorAddToSet::processInternal(const Value& input, bool merging) {
+ auto addValue = [this](auto&& val) {
+ bool inserted = _set.insert(val).second;
+ if (inserted) {
+ _memUsageBytes += val.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$addToSet used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
+ }
+ };
if (!merging) {
if (!input.missing()) {
- bool inserted = _set.insert(input).second;
- if (inserted) {
- _memUsageBytes += input.getApproximateSize();
- }
+ addValue(input);
}
} else {
- // If we're merging, we need to take apart the arrays we
- // receive and put their elements into the array we are collecting.
- // If we didn't, then we'd get an array of arrays, with one array
- // from each merge source.
- verify(input.getType() == Array);
+ // If we're merging, we need to take apart the arrays we receive and put their elements into
+ // the array we are collecting. If we didn't, then we'd get an array of arrays, with one
+ // array from each merge source.
+ invariant(input.getType() == Array);
- const vector<Value>& array = input.getArray();
- for (size_t i = 0; i < array.size(); i++) {
- bool inserted = _set.insert(array[i]).second;
- if (inserted) {
- _memUsageBytes += array[i].getApproximateSize();
- }
+ for (auto&& val : input.getArray()) {
+ addValue(val);
}
}
}
@@ -75,8 +78,11 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) {
return Value(vector<Value>(_set.begin(), _set.end()));
}
-AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()) {
+AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes)
+ : Accumulator(expCtx),
+ _set(expCtx->getValueComparator().makeUnorderedValueSet()),
+ _maxMemUsageBytes(maxMemoryUsageBytes) {
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp
index b0d93ff9a4f..4a0704de869 100644
--- a/src/mongo/db/pipeline/accumulator_push.cpp
+++ b/src/mongo/db/pipeline/accumulator_push.cpp
@@ -49,36 +49,45 @@ const char* AccumulatorPush::getOpName() const {
void AccumulatorPush::processInternal(const Value& input, bool merging) {
if (!merging) {
if (!input.missing()) {
- vpValue.push_back(input);
+ _array.push_back(input);
_memUsageBytes += input.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$push used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
}
} else {
- // If we're merging, we need to take apart the arrays we
- // receive and put their elements into the array we are collecting.
- // If we didn't, then we'd get an array of arrays, with one array
- // from each merge source.
- verify(input.getType() == Array);
+ // If we're merging, we need to take apart the arrays we receive and put their elements into
+ // the array we are collecting. If we didn't, then we'd get an array of arrays, with one
+ // array from each merge source.
+ invariant(input.getType() == Array);
const vector<Value>& vec = input.getArray();
- vpValue.insert(vpValue.end(), vec.begin(), vec.end());
-
- for (size_t i = 0; i < vec.size(); i++) {
- _memUsageBytes += vec[i].getApproximateSize();
+ for (auto&& val : vec) {
+ _memUsageBytes += val.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$push used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
}
+ _array.insert(_array.end(), vec.begin(), vec.end());
}
}
Value AccumulatorPush::getValue(bool toBeMerged) {
- return Value(vpValue);
+ return Value(_array);
}
-AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : Accumulator(expCtx) {
+AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes)
+ : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes) {
_memUsageBytes = sizeof(*this);
}
void AccumulatorPush::reset() {
- vector<Value>().swap(vpValue);
+ vector<Value>().swap(_array);
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp
index 14f9f74eda8..373a24a4155 100644
--- a/src/mongo/db/pipeline/accumulator_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_test.cpp
@@ -340,6 +340,28 @@ TEST(Accumulators, AddToSetRespectsCollation) {
Value(std::vector<Value>{Value("a"_sd)})}});
}
+TEST(Accumulators, AddToSetRespectsMaxMemoryConstraint) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const int maxMemoryBytes = 20ull;
+ auto addToSet = AccumulatorAddToSet(expCtx, maxMemoryBytes);
+ ASSERT_THROWS_CODE(
+ addToSet.process(
+ Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false),
+ AssertionException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
+TEST(Accumulators, PushRespectsMaxMemoryConstraint) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const int maxMemoryBytes = 20ull;
+ auto addToSet = AccumulatorPush(expCtx, maxMemoryBytes);
+ ASSERT_THROWS_CODE(
+ addToSet.process(
+ Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false),
+ AssertionException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
/* ------------------------- AccumulatorMergeObjects -------------------------- */
TEST(AccumulatorMergeObjects, MergingZeroObjectsShouldReturnEmptyDocument) {