diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_js_reduce.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_js_reduce.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 27 |
5 files changed, 85 insertions, 29 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index e59ccb543da..ce21891a982 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -81,6 +81,14 @@ public: processInternal(input, merging); } + /** + * Finish processing all the pending operations, and clean up memory. Some accumulators + * ($accumulator for example) might do a batch processing in order to improve performace. In + * those cases, the memory consumption could spike up. Calling this function can help flush + * those batch. + */ + virtual void reduceMemoryConsumptionIfAble() {} + /** Marks the end of the evaluate() phase and return accumulated result. * toBeMerged should be true when the outputs will be merged by process(). */ diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp index ac7ff3f83dc..66059c3d6da 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp +++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp @@ -330,7 +330,7 @@ Value AccumulatorJs::getValue(bool toBeMerged) { invariant(_state); // Ensure we've actually called accumulate/merge for every input document. - reducePendingCalls(); + reduceMemoryConsumptionIfAble(); invariant(_pendingCalls.empty()); // If toBeMerged then we return the current state, to be fed back in to accumulate / merge / @@ -418,13 +418,13 @@ void AccumulatorJs::processInternal(const Value& input, bool merging) { sizeof(std::pair<Value, bool>)); } -void AccumulatorJs::reducePendingCalls() { +void AccumulatorJs::reduceMemoryConsumptionIfAble() { // _state should be nonempty because we populate it in startNewGroup. invariant(_state); - // $group and $bucketAuto never create empty groups. The only time an accumulator is asked to - // accumulate an empty set is in ExpressionFromArray, but $accumulator is never used that way - // ($accumulator is not registered as an expression the way $sum and $avg and others are). - invariant(!_pendingCalls.empty()); + + if (_pendingCalls.empty()) { + return; + } auto& expCtx = getExpressionContext(); auto jsExec = expCtx->getJsExecWithScope(); diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.h b/src/mongo/db/pipeline/accumulator_js_reduce.h index 5843f893a18..a6a63bf0ef6 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.h +++ b/src/mongo/db/pipeline/accumulator_js_reduce.h @@ -99,6 +99,8 @@ public: Value getValue(bool toBeMerged) final; void reset() final; void processInternal(const Value& input, bool merging) final; + void reduceMemoryConsumptionIfAble() final; + Document serialize(boost::intrusive_ptr<Expression> initializer, boost::intrusive_ptr<Expression> argument, @@ -140,9 +142,6 @@ private: // other instances of $accumulator. False means the elements of _pendingCalls should be // interpreted as inputs from accumulateArgs. bool _pendingCallsMerging; - - // Call the user's accumulate/merge function for each element of _pendingCalls. - void reducePendingCalls(); }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 230162d2f51..9cc8afaf937 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -131,6 +131,38 @@ const char* DocumentSourceGroup::getSourceName() const { return kStageName.rawData(); } +bool DocumentSourceGroup::MemoryUsageTracker::shouldSpillWithAttemptToSaveMemory( + std::function<int()> saveMemory) { + if (!allowDiskUse && (memoryUsageBytes > maxMemoryUsageBytes)) { + memoryUsageBytes -= saveMemory(); + } + + if (memoryUsageBytes > maxMemoryUsageBytes) { + uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + "Exceeded memory limit for $group, but didn't allow external sort." + " Pass allowDiskUse:true to opt in.", + allowDiskUse); + memoryUsageBytes = 0; + return true; + } + return false; +} + +int DocumentSourceGroup::freeMemory() { + invariant(_groups); + int totalMemorySaved = 0; + for (auto&& group : *_groups) { + for (auto&& groupObj : group.second) { + auto prevMemUsage = groupObj->memUsageForSorter(); + groupObj->reduceMemoryConsumptionIfAble(); + + // Update the memory usage for this group. + totalMemorySaved += (prevMemUsage - groupObj->memUsageForSorter()); + } + } + return totalMemorySaved; +} + DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() { if (!_initialized) { const auto initializationResult = initialize(); @@ -342,12 +374,12 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& : DocumentSource(kStageName, pExpCtx), _usedDisk(false), _doingMerge(false), - _maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes - : internalDocumentSourceGroupMaxMemoryBytes.load()), + _memoryTracker{pExpCtx->allowDiskUse && !pExpCtx->inMongos, + maxMemoryUsageBytes ? *maxMemoryUsageBytes + : internalDocumentSourceGroupMaxMemoryBytes.load()}, _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), - _spilled(false), - _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) { + _spilled(false) { if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) { // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system. _fileName = pExpCtx->tempDir + "/" + nextFileName(); @@ -481,14 +513,10 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'. GetNextResult input = pSource->getNext(); + for (; input.isAdvanced(); input = pSource->getNext()) { - if (_memoryUsageBytes > _maxMemoryUsageBytes) { - uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - "Exceeded memory limit for $group, but didn't allow external sort." - " Pass allowDiskUse:true to opt in.", - _allowDiskUse); + if (_memoryTracker.shouldSpillWithAttemptToSaveMemory([this]() { return freeMemory(); })) { _sortedFiles.push_back(spill()); - _memoryUsageBytes = 0; } // We release the result document here so that it does not outlive the end of this loop @@ -504,7 +532,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { const bool inserted = _groups->size() != oldSize; if (inserted) { - _memoryUsageBytes += id.getApproximateSize(); + _memoryTracker.memoryUsageBytes += id.getApproximateSize(); // Initialize and add the accumulators Value expandedId = expandId(id); @@ -521,7 +549,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { } else { for (auto&& groupObj : group) { // subtract old mem usage. New usage added back after processing. - _memoryUsageBytes -= groupObj->memUsageForSorter(); + _memoryTracker.memoryUsageBytes -= groupObj->memUsageForSorter(); } } @@ -533,15 +561,15 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), _doingMerge); - _memoryUsageBytes += group[i]->memUsageForSorter(); + _memoryTracker.memoryUsageBytes += group[i]->memUsageForSorter(); } if (kDebugBuild && !storageGlobalParams.readOnly) { // In debug mode, spill every time we have a duplicate id to stress merge logic. - if (!inserted && // is a dup - !pExpCtx->inMongos && // can't spill to disk in mongos - !_allowDiskUse && // don't change behavior when testing external sort - _sortedFiles.size() < 20) { // don't open too many FDs + if (!inserted && // is a dup + !pExpCtx->inMongos && // can't spill to disk in mongos + !_memoryTracker.allowDiskUse && // don't change behavior when testing external sort + _sortedFiles.size() < 20) { // don't open too many FDs _sortedFiles.push_back(spill()); } diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 5d885e84a1f..ec202833ead 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -182,6 +182,21 @@ protected: void doDispose() final; private: + struct MemoryUsageTracker { + /** + * Cleans up any pending memory usage. Throws error, if memory usage is above + * 'maxMemoryUsageBytes' and cannot spill to disk. The 'saveMemory' function should return + * the amount of memory saved by the cleanup. + * + * Returns true, if the caller should spill to disk, false otherwise. + */ + bool shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory); + + const bool allowDiskUse; + const size_t maxMemoryUsageBytes; + size_t memoryUsageBytes = 0; + }; + explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); @@ -214,6 +229,12 @@ private: */ std::shared_ptr<Sorter<Value, Value>::Iterator> spill(); + /** + * If we ran out of memory, finish all the pending operations so that some memory + * can be freed. + */ + int freeMemory(); + Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); /** @@ -236,8 +257,9 @@ private: bool _usedDisk; // Keeps track of whether this $group spilled to disk. bool _doingMerge; - size_t _memoryUsageBytes = 0; - size_t _maxMemoryUsageBytes; + + MemoryUsageTracker _memoryTracker; + std::string _fileName; std::streampos _nextSortedFileWriterOffset = 0; bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over. @@ -263,7 +285,6 @@ private: // Only used when '_spilled' is true. std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - const bool _allowDiskUse; std::pair<Value, Value> _firstPartOfNextGroup; }; |