diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 163 |
1 files changed, 91 insertions, 72 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index c8da8a74e76..3beb901383f 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -50,11 +50,16 @@ const char* DocumentSourceGroup::getSourceName() const { return "$group"; } -boost::optional<Document> DocumentSourceGroup::getNext() { +DocumentSource::GetNextResult DocumentSourceGroup::getNext() { pExpCtx->checkForInterrupt(); - if (!_initialized) - initialize(); + if (!_initialized) { + const auto initializationResult = initialize(); + if (initializationResult.isPaused()) { + return initializationResult; + } + invariant(initializationResult.isEOF()); + } for (auto&& accum : _currentAccumulators) { accum->reset(); // Prep accumulators for a new group. @@ -69,10 +74,10 @@ boost::optional<Document> DocumentSourceGroup::getNext() { } } -boost::optional<Document> DocumentSourceGroup::getNextSpilled() { +DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() { // We aren't streaming, and we have spilled to disk. if (!_sorterIterator) - return boost::none; + return GetNextResult::makeEOF(); _currentId = _firstPartOfNextGroup.first; const size_t numAccumulators = vpAccumulatorFactory.size(); @@ -103,24 +108,27 @@ boost::optional<Document> DocumentSourceGroup::getNextSpilled() { return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard); } -boost::optional<Document> DocumentSourceGroup::getNextStandard() { +DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() { // Not spilled, and not streaming. if (_groups->empty()) - return boost::none; + return GetNextResult::makeEOF(); Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard); if (++groupsIterator == _groups->end()) dispose(); - return out; + return std::move(out); } -boost::optional<Document> DocumentSourceGroup::getNextStreaming() { +DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() { // Streaming optimization is active. if (!_firstDocOfNextGroup) { - dispose(); - return boost::none; + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) { + return nextInput; + } + _firstDocOfNextGroup = nextInput.releaseDocument(); } Value id; @@ -134,14 +142,15 @@ boost::optional<Document> DocumentSourceGroup::getNextStreaming() { // Release our references to the previous input document before asking for the next. This // makes operations like $unwind more efficient. _variables->clearRoot(); - _firstDocOfNextGroup = boost::none; // Retrieve the next document. - _firstDocOfNextGroup = pSource->getNext(); - if (!_firstDocOfNextGroup) { - break; + auto nextInput = pSource->getNext(); + if (!nextInput.isAdvanced()) { + return nextInput; } + _firstDocOfNextGroup = nextInput.releaseDocument(); + _variables->setRoot(*_firstDocOfNextGroup); // Compute the id. If it does not match _currentId, we will exit the loop, leaving @@ -152,7 +161,7 @@ boost::optional<Document> DocumentSourceGroup::getNextStreaming() { Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard); _currentId = std::move(id); - return out; + return std::move(out); } void DocumentSourceGroup::dispose() { @@ -416,8 +425,7 @@ void getFieldPathListForSpilled(ExpressionObject* expressionObj, } } // namespace -void DocumentSourceGroup::initialize() { - _initialized = true; +DocumentSource::GetNextResult DocumentSourceGroup::initialize() { const size_t numAccumulators = vpAccumulatorFactory.size(); boost::optional<BSONObj> inputSort = findRelevantInputSort(); @@ -434,51 +442,47 @@ void DocumentSourceGroup::initialize() { } // We only need to load the first document. - _firstDocOfNextGroup = pSource->getNext(); - - if (!_firstDocOfNextGroup) { - return; + auto firstInput = pSource->getNext(); + if (!firstInput.isAdvanced()) { + // Leave '_firstDocOfNextGroup' uninitialized and return. + return firstInput; } - + _firstDocOfNextGroup = firstInput.releaseDocument(); _variables->setRoot(*_firstDocOfNextGroup); // Compute the _id value. _currentId = computeId(_variables.get()); - return; + _initialized = true; + return DocumentSource::GetNextResult::makeEOF(); } dassert(numAccumulators == vpExpression.size()); - // pushed to on spill() - vector<shared_ptr<Sorter<Value, Value>::Iterator>> sortedFiles; - int memoryUsageBytes = 0; - - // This loop consumes all input from pSource and buckets it based on pIdExpression. - while (boost::optional<Document> input = pSource->getNext()) { - if (memoryUsageBytes > _maxMemoryUsageBytes) { + // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'. + GetNextResult input = pSource->getNext(); + for (; input.isAdvanced(); input = pSource->getNext()) { + if (_memoryUsageBytes > _maxMemoryUsageBytes) { uassert(16945, "Exceeded memory limit for $group, but didn't allow external sort." " Pass allowDiskUse:true to opt in.", _extSortAllowed); - sortedFiles.push_back(spill()); - memoryUsageBytes = 0; + _sortedFiles.push_back(spill()); + _memoryUsageBytes = 0; } - _variables->setRoot(*input); + _variables->setRoot(input.releaseDocument()); - /* get the _id value */ Value id = computeId(_variables.get()); - /* - Look for the _id value in the map; if it's not there, add a - new entry with a blank accumulator. - */ + // Look for the _id value in the map. If it's not there, add a new entry with a blank + // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and + // looking it up in '_groups' multiple times. const size_t oldSize = _groups->size(); vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id]; const bool inserted = _groups->size() != oldSize; if (inserted) { - memoryUsageBytes += id.getApproximateSize(); + _memoryUsageBytes += id.getApproximateSize(); // Add the accumulators group.reserve(numAccumulators); @@ -489,7 +493,7 @@ void DocumentSourceGroup::initialize() { } else { for (size_t i = 0; i < numAccumulators; i++) { // subtract old mem usage. New usage added back after processing. - memoryUsageBytes -= group[i]->memUsageForSorter(); + _memoryUsageBytes -= group[i]->memUsageForSorter(); } } @@ -497,7 +501,7 @@ void DocumentSourceGroup::initialize() { dassert(numAccumulators == group.size()); for (size_t i = 0; i < numAccumulators; i++) { group[i]->process(vpExpression[i]->evaluate(_variables.get()), _doingMerge); - memoryUsageBytes += group[i]->memUsageForSorter(); + _memoryUsageBytes += group[i]->memUsageForSorter(); } // We are done with the ROOT document so release it. @@ -505,45 +509,58 @@ void DocumentSourceGroup::initialize() { if (kDebugBuild && !storageGlobalParams.readOnly) { // In debug mode, spill every time we have a duplicate id to stress merge logic. - if (!inserted // is a dup - && - !pExpCtx->inRouter // can't spill to disk in router - && - !_extSortAllowed // don't change behavior when testing external sort - && - sortedFiles.size() < 20 // don't open too many FDs - ) { - sortedFiles.push_back(spill()); + if (!inserted && // is a dup + !pExpCtx->inRouter && // can't spill to disk in router + !_extSortAllowed && // don't change behavior when testing external sort + _sortedFiles.size() < 20) { // don't open too many FDs + + _sortedFiles.push_back(spill()); } } } - // These blocks do any final steps necessary to prepare to output results. - if (!sortedFiles.empty()) { - _spilled = true; - if (!_groups->empty()) { - sortedFiles.push_back(spill()); + switch (input.getStatus()) { + case DocumentSource::GetNextResult::ReturnStatus::kAdvanced: { + MONGO_UNREACHABLE; // We consumed all advances above. } + case DocumentSource::GetNextResult::ReturnStatus::kPauseExecution: { + return input; // Propagate pause. + } + case DocumentSource::GetNextResult::ReturnStatus::kEOF: { + // Do any final steps necessary to prepare to output results. + if (!_sortedFiles.empty()) { + _spilled = true; + if (!_groups->empty()) { + _sortedFiles.push_back(spill()); + } - // We won't be using groups again so free its memory. - _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); + // We won't be using groups again so free its memory. + _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); - _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( - sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); + _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( + _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); - // prepare current to accumulate data - _currentAccumulators.reserve(numAccumulators); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators.push_back(vpAccumulatorFactory[i]()); - _currentAccumulators.back()->injectExpressionContext(pExpCtx); - } + // prepare current to accumulate data + _currentAccumulators.reserve(numAccumulators); + for (size_t i = 0; i < numAccumulators; i++) { + _currentAccumulators.push_back(vpAccumulatorFactory[i]()); + _currentAccumulators.back()->injectExpressionContext(pExpCtx); + } - verify(_sorterIterator->more()); // we put data in, we should get something out. - _firstPartOfNextGroup = _sorterIterator->next(); - } else { - // start the group iterator - groupsIterator = _groups->begin(); + verify(_sorterIterator->more()); // we put data in, we should get something out. + _firstPartOfNextGroup = _sorterIterator->next(); + } else { + // start the group iterator + groupsIterator = _groups->begin(); + } + + // This must happen last so that, unless control gets here, we will re-enter + // initialization after getting a GetNextResult::ResultState::kPauseExecution. + _initialized = true; + return input; + } } + MONGO_UNREACHABLE; } shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { @@ -654,7 +671,9 @@ boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { BSONObjSet DocumentSourceGroup::getOutputSorts() { if (!_initialized) { - initialize(); + initialize(); // Note this might not finish initializing, but that's OK. We just want to + // do some initialization to try to determine if we are streaming or spilled. + // False negatives are OK. } if (!(_streaming || _spilled)) { |