diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-11-09 01:19:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-09 01:19:40 +0000 |
commit | 504b518b9bd432a1d614d06f004712e70a1a754b (patch) | |
tree | 8c481a964588f62c4a3a8717bb2ed4c5af42a58b | |
parent | ccbad450aec9759834dafe2bd5f8fb0032704335 (diff) | |
download | mongo-504b518b9bd432a1d614d06f004712e70a1a754b.tar.gz |
SERVER-44174 $push and $addToSet should restrict memory usage
-rw-r--r-- | buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml | 2 | ||||
-rw-r--r-- | jstests/aggregation/bugs/server9444.js | 69 | ||||
-rw-r--r-- | jstests/aggregation/spill_to_disk.js | 116 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_add_to_set.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_push.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_test.cpp | 22 |
7 files changed, 194 insertions, 104 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index 53520f6a460..ab768bb7116 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -15,7 +15,7 @@ selector: - jstests/aggregation/bugs/lookup_unwind_killcursor.js # This test makes assertions about whether aggregations will need to spill to disk, which assumes # all the data is located on a single shard. - - jstests/aggregation/bugs/server9444.js + - jstests/aggregation/spill_to_disk.js # TODO SERVER-32311: These tests use getAggPlanStage(), which can't handle sharded explain output. - jstests/aggregation/match_swapping_renamed_fields.js - jstests/aggregation/use_query_project_and_sort.js diff --git a/jstests/aggregation/bugs/server9444.js b/jstests/aggregation/bugs/server9444.js deleted file mode 100644 index 101e29269e7..00000000000 --- a/jstests/aggregation/bugs/server9444.js +++ /dev/null @@ -1,69 +0,0 @@ -// server-9444 support disk storage of intermediate results in aggregation -// -// Run only when pipeline optimization is enabled, otherwise the type of -// sorter being used can be different (NoLimitSort vs TopKSort) causing -// an aggregation request to fail with different error codes. -// @tags: [requires_pipeline_optimization] -(function() { -'use strict'; - -load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers' - -const t = db.server9444; -t.drop(); - -const sharded = FixtureHelpers.isSharded(t); - -var memoryLimitMB = sharded ? 200 : 100; - -function loadData() { - var bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' - for (var i = 0; i < memoryLimitMB + 1; i++) - t.insert({_id: i, bigStr: i + bigStr, random: Math.random()}); - - assert.gt(t.stats().size, memoryLimitMB * 1024 * 1024); -} -loadData(); - -function test(pipeline, outOfMemoryCode) { - // ensure by default we error out if exceeding memory limit - var res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}}); - assert.commandFailed(res); - assert.eq(res.code, outOfMemoryCode); - - // ensure allowDiskUse: false does what it says - res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: false}); - assert.commandFailed(res); - assert.eq(res.code, outOfMemoryCode); - - // allowDiskUse only supports bool. In particular, numbers aren't allowed. - res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: 1}); - assert.commandFailed(res); - - // ensure we work when allowDiskUse === true - res = t.aggregate(pipeline, {allowDiskUse: true}); - assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc -} - -var groupCode = 16945; -var sortCode = 16819; -var sortLimitCode = 16820; - -test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}], groupCode); - -// sorting with _id would use index which doesn't require extsort -test([{$sort: {random: 1}}], sortCode); -test([{$sort: {bigStr: 1}}], sortCode); // big key and value - -// make sure sort + large limit won't crash the server (SERVER-10136) -test([{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}], sortLimitCode); - -// test combining two extSorts in both same and different orders -test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: 1}}], groupCode); -test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: -1}}], groupCode); -test([{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {random: 1}}], groupCode); -test([{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], sortCode); - -// don't leave large collection laying around -t.drop(); -})(); diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js new file mode 100644 index 00000000000..cc58149b881 --- /dev/null +++ b/jstests/aggregation/spill_to_disk.js @@ -0,0 +1,116 @@ +// Tests the support for disk storage of intermediate results in aggregation. +// +// Run only when pipeline optimization is enabled, otherwise the type of sorter being used can be +// different (NoLimitSort vs TopKSort) causing an aggregation request to fail with different error +// codes. +// @tags: [requires_pipeline_optimization] +(function() { +'use strict'; + +load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers' + +const coll = db.spill_to_disk; +coll.drop(); + +const sharded = FixtureHelpers.isSharded(coll); + +const memoryLimitMB = sharded ? 200 : 100; + +const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' +for (let i = 0; i < memoryLimitMB + 1; i++) + coll.insert({_id: i, bigStr: i + bigStr, random: Math.random()}); + +assert.gt(coll.stats().size, memoryLimitMB * 1024 * 1024); + +function test({pipeline, expectedCodes, canSpillToDisk}) { + // Test that by default we error out if exceeding memory limit. + assert.commandFailedWithCode( + db.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}}), expectedCodes); + + // Test that 'allowDiskUse: false' does indeed prevent spilling to disk. + assert.commandFailedWithCode( + db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: false}), + expectedCodes); + + // Test that allowDiskUse only supports bool. In particular, numbers aren't allowed. + assert.commandFailed(db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: 1})); + + // If this command supports spilling to disk, ensure that it will succeed when disk use is + // allowed. + let res = db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true}); + if (canSpillToDisk) { + assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(), + coll.count()); // all tests output one doc per input doc + } else { + assert.commandFailedWithCode(res, [ErrorCodes.ExceededMemoryLimit, expectedCodes]); + } +} + +const groupCode = 16945; +const sortCode = 16819; +const sortLimitCode = 16820; + +test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}], + expectedCodes: groupCode, + canSpillToDisk: true +}); + +// Sorting with _id would use index which doesn't require external sort, so sort by 'random' +// instead. +test({pipeline: [{$sort: {random: 1}}], expectedCodes: sortCode, canSpillToDisk: true}); +test({ + pipeline: [{$sort: {bigStr: 1}}], // big key and value + expectedCodes: sortCode, + canSpillToDisk: true +}); + +// Test that sort + large limit won't crash the server (SERVER-10136) +test({ + pipeline: [{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}], + expectedCodes: sortLimitCode, + canSpillToDisk: true +}); + +// Test combining two external sorts in both same and different orders. +test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: 1}}], + expectedCodes: groupCode, + canSpillToDisk: true +}); +test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: -1}}], + expectedCodes: groupCode, + canSpillToDisk: true +}); +test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {random: 1}}], + expectedCodes: groupCode, + canSpillToDisk: true +}); +test({ + pipeline: [{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], + expectedCodes: sortCode, + canSpillToDisk: true +}); + +// Test accumulating all values into one array. On debug builds we will spill to disk for $group and +// so may hit the group error code before we hit ExceededMemoryLimit. +test({ + pipeline: [{$group: {_id: null, bigArray: {$push: '$bigStr'}}}], + expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit], + canSpillToDisk: false +}); +test({ + pipeline: + [{$group: {_id: null, bigArray: {$addToSet: {$concat: ['$bigStr', {$toString: "$_id"}]}}}}], + expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit], + canSpillToDisk: false +}); + +// don't leave large collection laying around +coll.drop(); +})(); diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 2bd9ed9f482..12c0c922ef2 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -148,7 +148,9 @@ genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> class AccumulatorAddToSet final : public Accumulator { public: - explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -168,6 +170,7 @@ public: private: ValueUnorderedSet _set; + int _maxMemUsageBytes; }; class AccumulatorFirst final : public Accumulator { @@ -282,7 +285,9 @@ public: class AccumulatorPush final : public Accumulator { public: - explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -293,7 +298,8 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx); private: - std::vector<Value> vpValue; + std::vector<Value> _array; + int _maxMemUsageBytes; }; class AccumulatorAvg final : public Accumulator { diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp index b8f366f6c4f..a899944a761 100644 --- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp +++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp @@ -47,26 +47,29 @@ const char* AccumulatorAddToSet::getOpName() const { } void AccumulatorAddToSet::processInternal(const Value& input, bool merging) { + auto addValue = [this](auto&& val) { + bool inserted = _set.insert(val).second; + if (inserted) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$addToSet used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes << " bytes", + _memUsageBytes < _maxMemUsageBytes); + } + }; if (!merging) { if (!input.missing()) { - bool inserted = _set.insert(input).second; - if (inserted) { - _memUsageBytes += input.getApproximateSize(); - } + addValue(input); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); - const vector<Value>& array = input.getArray(); - for (size_t i = 0; i < array.size(); i++) { - bool inserted = _set.insert(array[i]).second; - if (inserted) { - _memUsageBytes += array[i].getApproximateSize(); - } + for (auto&& val : input.getArray()) { + addValue(val); } } } @@ -75,8 +78,11 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) { return Value(vector<Value>(_set.begin(), _set.end())); } -AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()) { +AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), + _set(expCtx->getValueComparator().makeUnorderedValueSet()), + _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp index b0d93ff9a4f..4a0704de869 100644 --- a/src/mongo/db/pipeline/accumulator_push.cpp +++ b/src/mongo/db/pipeline/accumulator_push.cpp @@ -49,36 +49,45 @@ const char* AccumulatorPush::getOpName() const { void AccumulatorPush::processInternal(const Value& input, bool merging) { if (!merging) { if (!input.missing()) { - vpValue.push_back(input); + _array.push_back(input); _memUsageBytes += input.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes << " bytes", + _memUsageBytes < _maxMemUsageBytes); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); const vector<Value>& vec = input.getArray(); - vpValue.insert(vpValue.end(), vec.begin(), vec.end()); - - for (size_t i = 0; i < vec.size(); i++) { - _memUsageBytes += vec[i].getApproximateSize(); + for (auto&& val : vec) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes << " bytes", + _memUsageBytes < _maxMemUsageBytes); } + _array.insert(_array.end(), vec.begin(), vec.end()); } } Value AccumulatorPush::getValue(bool toBeMerged) { - return Value(vpValue); + return Value(_array); } -AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx) { +AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } void AccumulatorPush::reset() { - vector<Value>().swap(vpValue); + vector<Value>().swap(_array); _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 14f9f74eda8..373a24a4155 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -340,6 +340,28 @@ TEST(Accumulators, AddToSetRespectsCollation) { Value(std::vector<Value>{Value("a"_sd)})}}); } +TEST(Accumulators, AddToSetRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20ull; + auto addToSet = AccumulatorAddToSet(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + addToSet.process( + Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + +TEST(Accumulators, PushRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20ull; + auto addToSet = AccumulatorPush(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + addToSet.process( + Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + /* ------------------------- AccumulatorMergeObjects -------------------------- */ TEST(AccumulatorMergeObjects, MergingZeroObjectsShouldReturnEmptyDocument) { |