summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2019-12-10 19:38:18 +0000
committerevergreen <evergreen@mongodb.com>2019-12-10 19:38:18 +0000
commitfba2959a87c424463fdc39ea256f2feee20fa34c (patch)
treeb80623e011cf54c6bc0cfc0cc8649d728d059d97 /src
parent9803c2a3b02dada80cb846eb56b335d1d99825f7 (diff)
downloadmongo-fba2959a87c424463fdc39ea256f2feee20fa34c.tar.gz
SERVER-44174 $push and $addToSet should restrict memory usage
(cherry picked from commit 504b518b9bd432a1d614d06f004712e70a1a754b) (cherry picked from commit 2d0ad29a5fa8b328610e69f34aa26802b5ec7cc9) (cherry picked from commit 83b3c795e154b7456f43269b5d95ac8e2e00d96e)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/accumulator.h12
-rw-r--r--src/mongo/db/pipeline/accumulator_add_to_set.cpp41
-rw-r--r--src/mongo/db/pipeline/accumulator_push.cpp39
-rw-r--r--src/mongo/db/pipeline/accumulator_test.cpp22
4 files changed, 80 insertions, 34 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index 517f46fd16c..4d10fc051b5 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -105,7 +105,9 @@ private:
class AccumulatorAddToSet final : public Accumulator {
public:
- explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ explicit AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
void processInternal(const Value& input, bool merging) final;
Value getValue(bool toBeMerged) final;
@@ -125,6 +127,7 @@ public:
private:
ValueUnorderedSet _set;
+ int _maxMemUsageBytes;
};
@@ -236,7 +239,9 @@ public:
class AccumulatorPush final : public Accumulator {
public:
- explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static constexpr int kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ explicit AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
void processInternal(const Value& input, bool merging) final;
Value getValue(bool toBeMerged) final;
@@ -247,7 +252,8 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
- std::vector<Value> vpValue;
+ std::vector<Value> _array;
+ int _maxMemUsageBytes;
};
diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
index 06562d2de7c..fb591247866 100644
--- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp
+++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
@@ -48,26 +48,30 @@ const char* AccumulatorAddToSet::getOpName() const {
}
void AccumulatorAddToSet::processInternal(const Value& input, bool merging) {
+ auto addValue = [this](auto&& val) {
+ bool inserted = _set.insert(val).second;
+ if (inserted) {
+ _memUsageBytes += val.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$addToSet used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes
+ << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
+ }
+ };
if (!merging) {
if (!input.missing()) {
- bool inserted = _set.insert(input).second;
- if (inserted) {
- _memUsageBytes += input.getApproximateSize();
- }
+ addValue(input);
}
} else {
- // If we're merging, we need to take apart the arrays we
- // receive and put their elements into the array we are collecting.
- // If we didn't, then we'd get an array of arrays, with one array
- // from each merge source.
- verify(input.getType() == Array);
+ // If we're merging, we need to take apart the arrays we receive and put their elements into
+ // the array we are collecting. If we didn't, then we'd get an array of arrays, with one
+ // array from each merge source.
+ invariant(input.getType() == Array);
- const vector<Value>& array = input.getArray();
- for (size_t i = 0; i < array.size(); i++) {
- bool inserted = _set.insert(array[i]).second;
- if (inserted) {
- _memUsageBytes += array[i].getApproximateSize();
- }
+ for (auto&& val : input.getArray()) {
+ addValue(val);
}
}
}
@@ -76,8 +80,11 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) {
return Value(vector<Value>(_set.begin(), _set.end()));
}
-AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()) {
+AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes)
+ : Accumulator(expCtx),
+ _set(expCtx->getValueComparator().makeUnorderedValueSet()),
+ _maxMemUsageBytes(maxMemoryUsageBytes) {
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp
index 1be95ea098b..7e42d29546d 100644
--- a/src/mongo/db/pipeline/accumulator_push.cpp
+++ b/src/mongo/db/pipeline/accumulator_push.cpp
@@ -50,36 +50,47 @@ const char* AccumulatorPush::getOpName() const {
void AccumulatorPush::processInternal(const Value& input, bool merging) {
if (!merging) {
if (!input.missing()) {
- vpValue.push_back(input);
+ _array.push_back(input);
_memUsageBytes += input.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$push used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes
+ << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
}
} else {
- // If we're merging, we need to take apart the arrays we
- // receive and put their elements into the array we are collecting.
- // If we didn't, then we'd get an array of arrays, with one array
- // from each merge source.
- verify(input.getType() == Array);
+ // If we're merging, we need to take apart the arrays we receive and put their elements into
+ // the array we are collecting. If we didn't, then we'd get an array of arrays, with one
+ // array from each merge source.
+ invariant(input.getType() == Array);
const vector<Value>& vec = input.getArray();
- vpValue.insert(vpValue.end(), vec.begin(), vec.end());
-
- for (size_t i = 0; i < vec.size(); i++) {
- _memUsageBytes += vec[i].getApproximateSize();
+ for (auto&& val : vec) {
+ _memUsageBytes += val.getApproximateSize();
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream()
+ << "$push used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes
+ << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
}
+ _array.insert(_array.end(), vec.begin(), vec.end());
}
}
Value AccumulatorPush::getValue(bool toBeMerged) {
- return Value(vpValue);
+ return Value(_array);
}
-AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : Accumulator(expCtx) {
+AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int maxMemoryUsageBytes)
+ : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes) {
_memUsageBytes = sizeof(*this);
}
void AccumulatorPush::reset() {
- vector<Value>().swap(vpValue);
+ vector<Value>().swap(_array);
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp
index 16f93d61a36..55519bec5c7 100644
--- a/src/mongo/db/pipeline/accumulator_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_test.cpp
@@ -346,6 +346,28 @@ TEST(Accumulators, AddToSetRespectsCollation) {
Value(std::vector<Value>{Value("a"_sd)})}});
}
+TEST(Accumulators, AddToSetRespectsMaxMemoryConstraint) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const int maxMemoryBytes = 20;
+ AccumulatorAddToSet addToSet(expCtx, maxMemoryBytes);
+ ASSERT_THROWS_CODE(
+ addToSet.process(
+ Value("This is a large string. Certainly we must be over 20 bytes by now"_sd), false),
+ AssertionException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
+TEST(Accumulators, PushRespectsMaxMemoryConstraint) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const int maxMemoryBytes = 20;
+ AccumulatorPush push(expCtx, maxMemoryBytes);
+ ASSERT_THROWS_CODE(
+ push.process(Value("This is a large string. Certainly we must be over 20 bytes by now"_sd),
+ false),
+ AssertionException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
/* ------------------------- AccumulatorMergeObjects -------------------------- */
namespace AccumulatorMergeObjects {