diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-12-10 19:38:18 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-10 19:38:18 +0000 |
commit | fba2959a87c424463fdc39ea256f2feee20fa34c (patch) | |
tree | b80623e011cf54c6bc0cfc0cc8649d728d059d97 /src | |
parent | 9803c2a3b02dada80cb846eb56b335d1d99825f7 (diff) | |
download | mongo-fba2959a87c424463fdc39ea256f2feee20fa34c.tar.gz |
SERVER-44174 $push and $addToSet should restrict memory usage
(cherry picked from commit 504b518b9bd432a1d614d06f004712e70a1a754b)
(cherry picked from commit 2d0ad29a5fa8b328610e69f34aa26802b5ec7cc9)
(cherry picked from commit 83b3c795e154b7456f43269b5d95ac8e2e00d96e)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_add_to_set.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_push.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_test.cpp | 22 |
4 files changed, 80 insertions, 34 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 517f46fd16c..4d10fc051b5 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -105,7 +105,9 @@ private: class AccumulatorAddToSet final : public Accumulator { public: - explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -125,6 +127,7 @@ public: private: ValueUnorderedSet _set; + int _maxMemUsageBytes; }; @@ -236,7 +239,9 @@ public: class AccumulatorPush final : public Accumulator { public: - explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx); + static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; @@ -247,7 +252,8 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx); private: - std::vector<Value> vpValue; + std::vector<Value> _array; + int _maxMemUsageBytes; }; diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp index 06562d2de7c..fb591247866 100644 --- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp +++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp @@ -48,26 +48,30 @@ const char* AccumulatorAddToSet::getOpName() const { } void AccumulatorAddToSet::processInternal(const Value& input, bool merging) { + auto addValue = [this](auto&& val) { + bool inserted = _set.insert(val).second; + if (inserted) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$addToSet used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); + } + }; if (!merging) { if (!input.missing()) { - bool inserted = _set.insert(input).second; - if (inserted) { - _memUsageBytes += input.getApproximateSize(); - } + addValue(input); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); - const vector<Value>& array = input.getArray(); - for (size_t i = 0; i < array.size(); i++) { - bool inserted = _set.insert(array[i]).second; - if (inserted) { - _memUsageBytes += array[i].getApproximateSize(); - } + for (auto&& val : input.getArray()) { + addValue(val); } } } @@ -76,8 +80,11 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) { return Value(vector<Value>(_set.begin(), _set.end())); } -AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()) { +AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), + _set(expCtx->getValueComparator().makeUnorderedValueSet()), + _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp index 1be95ea098b..7e42d29546d 100644 --- a/src/mongo/db/pipeline/accumulator_push.cpp +++ b/src/mongo/db/pipeline/accumulator_push.cpp @@ -50,36 +50,47 @@ const char* AccumulatorPush::getOpName() const { void AccumulatorPush::processInternal(const Value& input, bool merging) { if (!merging) { if (!input.missing()) { - vpValue.push_back(input); + _array.push_back(input); _memUsageBytes += input.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); } } else { - // If we're merging, we need to take apart the arrays we - // receive and put their elements into the array we are collecting. - // If we didn't, then we'd get an array of arrays, with one array - // from each merge source. - verify(input.getType() == Array); + // If we're merging, we need to take apart the arrays we receive and put their elements into + // the array we are collecting. If we didn't, then we'd get an array of arrays, with one + // array from each merge source. + invariant(input.getType() == Array); const vector<Value>& vec = input.getArray(); - vpValue.insert(vpValue.end(), vec.begin(), vec.end()); - - for (size_t i = 0; i < vec.size(); i++) { - _memUsageBytes += vec[i].getApproximateSize(); + for (auto&& val : vec) { + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() + << "$push used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes + << " bytes", + _memUsageBytes < _maxMemUsageBytes); } + _array.insert(_array.end(), vec.begin(), vec.end()); } } Value AccumulatorPush::getValue(bool toBeMerged) { - return Value(vpValue); + return Value(_array); } -AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : Accumulator(expCtx) { +AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, + int maxMemoryUsageBytes) + : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes) { _memUsageBytes = sizeof(*this); } void AccumulatorPush::reset() { - vector<Value>().swap(vpValue); + vector<Value>().swap(_array); _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 16f93d61a36..55519bec5c7 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -346,6 +346,28 @@ TEST(Accumulators, AddToSetRespectsCollation) { Value(std::vector<Value>{Value("a"_sd)})}}); } +TEST(Accumulators, AddToSetRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20; + AccumulatorAddToSet addToSet(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + addToSet.process( + Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + +TEST(Accumulators, PushRespectsMaxMemoryConstraint) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const int maxMemoryBytes = 20; + AccumulatorPush push(expCtx, maxMemoryBytes); + ASSERT_THROWS_CODE( + push.process(Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), + false), + AssertionException, + ErrorCodes::ExceededMemoryLimit); +} + /* ------------------------- AccumulatorMergeObjects -------------------------- */ namespace AccumulatorMergeObjects { |