summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/accumulator.h8
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.cpp12
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.h5
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_group.h27
5 files changed, 85 insertions, 29 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index e59ccb543da..ce21891a982 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -81,6 +81,14 @@ public:
processInternal(input, merging);
}
+ /**
+ * Finish processing all the pending operations, and clean up memory. Some accumulators
+ * ($accumulator for example) might do a batch processing in order to improve performace. In
+ * those cases, the memory consumption could spike up. Calling this function can help flush
+ * those batch.
+ */
+ virtual void reduceMemoryConsumptionIfAble() {}
+
/** Marks the end of the evaluate() phase and return accumulated result.
* toBeMerged should be true when the outputs will be merged by process().
*/
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
index ac7ff3f83dc..66059c3d6da 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
@@ -330,7 +330,7 @@ Value AccumulatorJs::getValue(bool toBeMerged) {
invariant(_state);
// Ensure we've actually called accumulate/merge for every input document.
- reducePendingCalls();
+ reduceMemoryConsumptionIfAble();
invariant(_pendingCalls.empty());
// If toBeMerged then we return the current state, to be fed back in to accumulate / merge /
@@ -418,13 +418,13 @@ void AccumulatorJs::processInternal(const Value& input, bool merging) {
sizeof(std::pair<Value, bool>));
}
-void AccumulatorJs::reducePendingCalls() {
+void AccumulatorJs::reduceMemoryConsumptionIfAble() {
// _state should be nonempty because we populate it in startNewGroup.
invariant(_state);
- // $group and $bucketAuto never create empty groups. The only time an accumulator is asked to
- // accumulate an empty set is in ExpressionFromArray, but $accumulator is never used that way
- // ($accumulator is not registered as an expression the way $sum and $avg and others are).
- invariant(!_pendingCalls.empty());
+
+ if (_pendingCalls.empty()) {
+ return;
+ }
auto& expCtx = getExpressionContext();
auto jsExec = expCtx->getJsExecWithScope();
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.h b/src/mongo/db/pipeline/accumulator_js_reduce.h
index 5843f893a18..a6a63bf0ef6 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.h
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.h
@@ -99,6 +99,8 @@ public:
Value getValue(bool toBeMerged) final;
void reset() final;
void processInternal(const Value& input, bool merging) final;
+ void reduceMemoryConsumptionIfAble() final;
+
Document serialize(boost::intrusive_ptr<Expression> initializer,
boost::intrusive_ptr<Expression> argument,
@@ -140,9 +142,6 @@ private:
// other instances of $accumulator. False means the elements of _pendingCalls should be
// interpreted as inputs from accumulateArgs.
bool _pendingCallsMerging;
-
- // Call the user's accumulate/merge function for each element of _pendingCalls.
- void reducePendingCalls();
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 230162d2f51..9cc8afaf937 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -131,6 +131,38 @@ const char* DocumentSourceGroup::getSourceName() const {
return kStageName.rawData();
}
+bool DocumentSourceGroup::MemoryUsageTracker::shouldSpillWithAttemptToSaveMemory(
+ std::function<int()> saveMemory) {
+ if (!allowDiskUse && (memoryUsageBytes > maxMemoryUsageBytes)) {
+ memoryUsageBytes -= saveMemory();
+ }
+
+ if (memoryUsageBytes > maxMemoryUsageBytes) {
+ uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ "Exceeded memory limit for $group, but didn't allow external sort."
+ " Pass allowDiskUse:true to opt in.",
+ allowDiskUse);
+ memoryUsageBytes = 0;
+ return true;
+ }
+ return false;
+}
+
+int DocumentSourceGroup::freeMemory() {
+ invariant(_groups);
+ int totalMemorySaved = 0;
+ for (auto&& group : *_groups) {
+ for (auto&& groupObj : group.second) {
+ auto prevMemUsage = groupObj->memUsageForSorter();
+ groupObj->reduceMemoryConsumptionIfAble();
+
+ // Update the memory usage for this group.
+ totalMemorySaved += (prevMemUsage - groupObj->memUsageForSorter());
+ }
+ }
+ return totalMemorySaved;
+}
+
DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() {
if (!_initialized) {
const auto initializationResult = initialize();
@@ -342,12 +374,12 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
: DocumentSource(kStageName, pExpCtx),
_usedDisk(false),
_doingMerge(false),
- _maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes
- : internalDocumentSourceGroupMaxMemoryBytes.load()),
+ _memoryTracker{pExpCtx->allowDiskUse && !pExpCtx->inMongos,
+ maxMemoryUsageBytes ? *maxMemoryUsageBytes
+ : internalDocumentSourceGroupMaxMemoryBytes.load()},
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
- _spilled(false),
- _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
+ _spilled(false) {
if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) {
// We spill to disk in debug mode, regardless of allowDiskUse, to stress the system.
_fileName = pExpCtx->tempDir + "/" + nextFileName();
@@ -481,14 +513,10 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// Barring any pausing, this loop exhausts 'pSource' and populates '_groups'.
GetNextResult input = pSource->getNext();
+
for (; input.isAdvanced(); input = pSource->getNext()) {
- if (_memoryUsageBytes > _maxMemoryUsageBytes) {
- uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- "Exceeded memory limit for $group, but didn't allow external sort."
- " Pass allowDiskUse:true to opt in.",
- _allowDiskUse);
+ if (_memoryTracker.shouldSpillWithAttemptToSaveMemory([this]() { return freeMemory(); })) {
_sortedFiles.push_back(spill());
- _memoryUsageBytes = 0;
}
// We release the result document here so that it does not outlive the end of this loop
@@ -504,7 +532,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
const bool inserted = _groups->size() != oldSize;
if (inserted) {
- _memoryUsageBytes += id.getApproximateSize();
+ _memoryTracker.memoryUsageBytes += id.getApproximateSize();
// Initialize and add the accumulators
Value expandedId = expandId(id);
@@ -521,7 +549,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
} else {
for (auto&& groupObj : group) {
// subtract old mem usage. New usage added back after processing.
- _memoryUsageBytes -= groupObj->memUsageForSorter();
+ _memoryTracker.memoryUsageBytes -= groupObj->memUsageForSorter();
}
}
@@ -533,15 +561,15 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
_accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
_doingMerge);
- _memoryUsageBytes += group[i]->memUsageForSorter();
+ _memoryTracker.memoryUsageBytes += group[i]->memUsageForSorter();
}
if (kDebugBuild && !storageGlobalParams.readOnly) {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
- if (!inserted && // is a dup
- !pExpCtx->inMongos && // can't spill to disk in mongos
- !_allowDiskUse && // don't change behavior when testing external sort
- _sortedFiles.size() < 20) { // don't open too many FDs
+ if (!inserted && // is a dup
+ !pExpCtx->inMongos && // can't spill to disk in mongos
+ !_memoryTracker.allowDiskUse && // don't change behavior when testing external sort
+ _sortedFiles.size() < 20) { // don't open too many FDs
_sortedFiles.push_back(spill());
}
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 5d885e84a1f..ec202833ead 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -182,6 +182,21 @@ protected:
void doDispose() final;
private:
+ struct MemoryUsageTracker {
+ /**
+ * Cleans up any pending memory usage. Throws error, if memory usage is above
+ * 'maxMemoryUsageBytes' and cannot spill to disk. The 'saveMemory' function should return
+ * the amount of memory saved by the cleanup.
+ *
+ * Returns true, if the caller should spill to disk, false otherwise.
+ */
+ bool shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory);
+
+ const bool allowDiskUse;
+ const size_t maxMemoryUsageBytes;
+ size_t memoryUsageBytes = 0;
+ };
+
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
@@ -214,6 +229,12 @@ private:
*/
std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
+ /**
+ * If we ran out of memory, finish all the pending operations so that some memory
+ * can be freed.
+ */
+ int freeMemory();
+
Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
/**
@@ -236,8 +257,9 @@ private:
bool _usedDisk; // Keeps track of whether this $group spilled to disk.
bool _doingMerge;
- size_t _memoryUsageBytes = 0;
- size_t _maxMemoryUsageBytes;
+
+ MemoryUsageTracker _memoryTracker;
+
std::string _fileName;
std::streampos _nextSortedFileWriterOffset = 0;
bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over.
@@ -263,7 +285,6 @@ private:
// Only used when '_spilled' is true.
std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
- const bool _allowDiskUse;
std::pair<Value, Value> _firstPartOfNextGroup;
};