diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-12-09 18:51:42 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-09 18:51:42 +0000 |
commit | 83b3c795e154b7456f43269b5d95ac8e2e00d96e (patch) | |
tree | 4cbd988f1b157e7ea63305990640b93036a4c97c | |
parent | 36a8fa18355d0c2bd9c7372da9999f8f1440805b (diff) | |
download | mongo-83b3c795e154b7456f43269b5d95ac8e2e00d96e.tar.gz |
SERVER-44174 $push and $addToSet should restrict memory usage
(cherry picked from commit 504b518b9bd432a1d614d06f004712e70a1a754b)
(cherry picked from commit 2d0ad29a5fa8b328610e69f34aa26802b5ec7cc9)
-rw-r--r-- | buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml | 2 | ||||
-rw-r--r-- | jstests/aggregation/bugs/server9444.js | 79 | ||||
-rw-r--r-- | jstests/aggregation/spill_to_disk.js | 120 | ||||
-rw-r--r-- | jstests/aggregation/testshard1.js | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_add_to_set.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_push.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_test.cpp | 22 |
8 files changed, 201 insertions, 116 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index a7493f8bad8..b49d46735d0 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -15,7 +15,7 @@ selector: - jstests/aggregation/bugs/lookup_unwind_killcursor.js # This test makes assertions about whether aggregations will need to spill to disk, which assumes # all the data is located on a single shard. - - jstests/aggregation/bugs/server9444.js + - jstests/aggregation/spill_to_disk.js # TODO SERVER-32311: These tests use getAggPlanStage(), which can't handle sharded explain output. - jstests/aggregation/match_swapping_renamed_fields.js - jstests/aggregation/use_query_project_and_sort.js diff --git a/jstests/aggregation/bugs/server9444.js b/jstests/aggregation/bugs/server9444.js deleted file mode 100644 index b246c3e1ffd..00000000000 --- a/jstests/aggregation/bugs/server9444.js +++ /dev/null @@ -1,79 +0,0 @@ -// server-9444 support disk storage of intermediate results in aggregation -(function() { - 'use strict'; - - var t = db.server9444; - t.drop(); - - var sharded = (typeof(RUNNING_IN_SHARDED_AGG_TEST) != 'undefined'); // see end of testshard1.js - if (sharded) { - assert.commandWorked( - db.adminCommand({shardcollection: t.getFullName(), key: {"_id": 'hashed'}})); - } - - var memoryLimitMB = sharded ? 200 : 100; - - function loadData() { - var bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' - for (var i = 0; i < memoryLimitMB + 1; i++) - t.insert({_id: i, bigStr: i + bigStr, random: Math.random()}); - - assert.gt(t.stats().size, memoryLimitMB * 1024 * 1024); - } - loadData(); - - function test(pipeline, outOfMemoryCode) { - // ensure by default we error out if exceeding memory limit - var res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}}); - assert.commandFailed(res); - assert.eq(res.code, outOfMemoryCode); - - // ensure allowDiskUse: false does what it says - res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: false}); - assert.commandFailed(res); - assert.eq(res.code, outOfMemoryCode); - - // allowDiskUse only supports bool. In particular, numbers aren't allowed. - res = t.runCommand('aggregate', {pipeline: pipeline, cursor: {}, allowDiskUse: 1}); - assert.commandFailed(res); - - // ensure we work when allowDiskUse === true - res = t.aggregate(pipeline, {allowDiskUse: true}); - assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc - } - - var groupCode = 16945; - var sortCode = 16819; - var sortLimitCode = 16820; - - test([{$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], groupCode); - - // sorting with _id would use index which doesn't require extsort - test([{$sort: {random: 1}}], sortCode); - test([{$sort: {bigStr: 1}}], sortCode); // big key and value - - // make sure sort + large limit won't crash the server (SERVER-10136) - test([{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}], sortLimitCode); - - // test combining two extSorts in both same and different orders - test([{$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}, {$sort: {_id: 1}}], groupCode); - test([{$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}, {$sort: {_id: -1}}], groupCode); - test([{$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}, {$sort: {random: 1}}], groupCode); - test([{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], sortCode); - - var origDB = db; - if (sharded) { - // Stop balancer first before dropping so there will be no contention on the ns lock. - // It's alright to modify the global db variable since sharding tests never run in parallel. - db = db.getSiblingDB('config'); - sh.stopBalancer(); - } - - // don't leave large collection laying around - t.drop(); - - if (sharded) { - sh.startBalancer(); - db = origDB; - } -})(); diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js new file mode 100644 index 00000000000..0c3ab5e703d --- /dev/null +++ b/jstests/aggregation/spill_to_disk.js @@ -0,0 +1,120 @@ +// Tests the support for disk storage of intermediate results in aggregation. +// +// Run only when pipeline optimization is enabled, otherwise the type of sorter being used can be +// different (NoLimitSort vs TopKSort) causing an aggregation request to fail with different error +// codes. +// @tags: [requires_pipeline_optimization] +(function() { + 'use strict'; + + load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers' + + const coll = db.spill_to_disk; + coll.drop(); + + const sharded = FixtureHelpers.isSharded(coll); + + const memoryLimitMB = sharded ? 200 : 100; + + const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' + for (let i = 0; i < memoryLimitMB + 1; i++) + coll.insert({_id: i, bigStr: i + bigStr, random: Math.random()}); + + assert.gt(coll.stats().size, memoryLimitMB * 1024 * 1024); + + function test({pipeline, expectedCodes, canSpillToDisk}) { + // Test that by default we error out if exceeding memory limit. + assert.commandFailedWithCode( + db.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}}), + expectedCodes); + + // Test that 'allowDiskUse: false' does indeed prevent spilling to disk. + assert.commandFailedWithCode( + db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: false}), + expectedCodes); + + // Test that allowDiskUse only supports bool. In particular, numbers aren't allowed. + assert.commandFailed(db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: 1})); + + // If this command supports spilling to disk, ensure that it will succeed when disk use is + // allowed. + let res = db.runCommand( + {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true}); + if (canSpillToDisk) { + assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(), + coll.count()); // all tests output one doc per input doc + } else { + assert.commandFailedWithCode(res, [ErrorCodes.ExceededMemoryLimit, expectedCodes]); + } + } + + const groupCode = 16945; + const sortCode = 16819; + const sortLimitCode = 16820; + + test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}], + expectedCodes: groupCode, + canSpillToDisk: true + }); + + // Sorting with _id would use index which doesn't require external sort, so sort by 'random' + // instead. + test({pipeline: [{$sort: {random: 1}}], expectedCodes: sortCode, canSpillToDisk: true}); + test({ + pipeline: [{$sort: {bigStr: 1}}], // big key and value + expectedCodes: sortCode, + canSpillToDisk: true + }); + + // Test that sort + large limit won't crash the server (SERVER-10136) + test({ + pipeline: [{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}], + expectedCodes: sortLimitCode, + canSpillToDisk: true + }); + + // Test combining two external sorts in both same and different orders. + test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: 1}}], + expectedCodes: groupCode, + canSpillToDisk: true + }); + test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {_id: -1}}], + expectedCodes: groupCode, + canSpillToDisk: true + }); + test({ + pipeline: [{$group: {_id: '$_id', bigStr: {$min: '$bigStr'}}}, {$sort: {random: 1}}], + expectedCodes: groupCode, + canSpillToDisk: true + }); + test({ + pipeline: [{$sort: {random: 1}}, {$group: {_id: '$_id', bigStr: {$first: '$bigStr'}}}], + expectedCodes: sortCode, + canSpillToDisk: true + }); + + // Test accumulating all values into one array. On debug builds we will spill to disk for $group + // and + // so may hit the group error code before we hit ExceededMemoryLimit. + test({ + pipeline: [{$group: {_id: null, bigArray: {$push: '$bigStr'}}}], + expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit], + canSpillToDisk: false + }); + test({ + pipeline: [{ + $group: + {_id: null, bigArray: {$addToSet: {$concat: ['$bigStr', {$toString: "$_id"}]}}} + }], + expectedCodes: [groupCode, ErrorCodes.ExceededMemoryLimit], + canSpillToDisk: false + }); + + // don't leave large collection laying around + coll.drop(); +})(); diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js index 4ae3091b0e1..f500b90f81a 100644 --- a/jstests/aggregation/testshard1.js +++ b/jstests/aggregation/testshard1.js @@ -290,8 +290,6 @@ for (var shardName in res.shards) { // Call sub-tests designed to work sharded and unsharded. // They check for this variable to know to shard their collections. RUNNING_IN_SHARDED_AGG_TEST = true; // global -jsTestLog('running jstests/aggregation/bugs/server9444.js'); -load("jstests/aggregation/bugs/server9444.js"); // external sort jsTestLog('running jstests/aggregation/bugs/server11675.js'); load("jstests/aggregation/bugs/server11675.js"); // text support diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 517f46fd16c..4d10fc051b5 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -105,7 +105,9 @@ private: class AccumulatorAddToSet final : public Accumulator { public: - explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -125,6 +127,7 @@ public: private: ValueUnorderedSet _set; + int _maxMemUsageBytes; }; @@ -236,7 +239,9 @@ public: class AccumulatorPush final : public Accumulator { public: - explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -247,7 +252,8 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx); private: - std::vector<Value> vpValue; + std::vector<Value> _array; + int _maxMemUsageBytes; }; diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp index 06562d2de7c..fb591247866 100644 --- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp +++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp @@ -48,26 +48,30 @@ const char* AccumulatorAddToSet::getOpName() const { } void AccumulatorAddToSet::processInternal(const Value& input, bool merging) { + auto addValue = [this](auto&& val) { + bool inserted = _set.insert(val).second; + if (inserted) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$addToSet used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); + } + }; if (!merging) { if (!input.missing()) { - bool inserted = _set.insert(input).second; - if (inserted) { - _memUsageBytes += input.getApproximateSize(); - } + addValue(input); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); - const vector<Value>& array = input.getArray(); - for (size_t i = 0; i < array.size(); i++) { - bool inserted = _set.insert(array[i]).second; - if (inserted) { - _memUsageBytes += array[i].getApproximateSize(); - } + for (auto&& val : input.getArray()) { + addValue(val); } } } @@ -76,8 +80,11 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) { return Value(vector<Value>(_set.begin(), _set.end())); } -AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()) { +AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), + _set(expCtx->getValueComparator().makeUnorderedValueSet()), + _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp index 1be95ea098b..7e42d29546d 100644 --- a/src/mongo/db/pipeline/accumulator_push.cpp +++ b/src/mongo/db/pipeline/accumulator_push.cpp @@ -50,36 +50,47 @@ const char* AccumulatorPush::getOpName() const { void AccumulatorPush::processInternal(const Value& input, bool merging) { if (!merging) { if (!input.missing()) { - vpValue.push_back(input); + _array.push_back(input); _memUsageBytes += input.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); const vector<Value>& vec = input.getArray(); - vpValue.insert(vpValue.end(), vec.begin(), vec.end()); - - for (size_t i = 0; i < vec.size(); i++) { - _memUsageBytes += vec[i].getApproximateSize(); + for (auto&& val : vec) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); } + _array.insert(_array.end(), vec.begin(), vec.end()); } } Value AccumulatorPush::getValue(bool toBeMerged) { - return Value(vpValue); + return Value(_array); } -AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx) { +AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } void AccumulatorPush::reset() { - vector<Value>().swap(vpValue); + vector<Value>().swap(_array); _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 16f93d61a36..55519bec5c7 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -346,6 +346,28 @@ TEST(Accumulators, AddToSetRespectsCollation) { Value(std::vector<Value>{Value("a"_sd)})}}); } +TEST(Accumulators, AddToSetRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20; + AccumulatorAddToSet addToSet(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + addToSet.process( + Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + +TEST(Accumulators, PushRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20; + AccumulatorPush push(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + push.process(Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), + false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + /* ------------------------- AccumulatorMergeObjects -------------------------- */ namespace AccumulatorMergeObjects { |