summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_group.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp163
1 files changed, 91 insertions, 72 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index c8da8a74e76..3beb901383f 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -50,11 +50,16 @@ const char* DocumentSourceGroup::getSourceName() const {
return "$group";
}
-boost::optional<Document> DocumentSourceGroup::getNext() {
+DocumentSource::GetNextResult DocumentSourceGroup::getNext() {
pExpCtx->checkForInterrupt();
- if (!_initialized)
- initialize();
+ if (!_initialized) {
+ const auto initializationResult = initialize();
+ if (initializationResult.isPaused()) {
+ return initializationResult;
+ }
+ invariant(initializationResult.isEOF());
+ }
for (auto&& accum : _currentAccumulators) {
accum->reset(); // Prep accumulators for a new group.
@@ -69,10 +74,10 @@ boost::optional<Document> DocumentSourceGroup::getNext() {
}
}
-boost::optional<Document> DocumentSourceGroup::getNextSpilled() {
+DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() {
// We aren't streaming, and we have spilled to disk.
if (!_sorterIterator)
- return boost::none;
+ return GetNextResult::makeEOF();
_currentId = _firstPartOfNextGroup.first;
const size_t numAccumulators = vpAccumulatorFactory.size();
@@ -103,24 +108,27 @@ boost::optional<Document> DocumentSourceGroup::getNextSpilled() {
return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard);
}
-boost::optional<Document> DocumentSourceGroup::getNextStandard() {
+DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() {
// Not spilled, and not streaming.
if (_groups->empty())
- return boost::none;
+ return GetNextResult::makeEOF();
Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard);
if (++groupsIterator == _groups->end())
dispose();
- return out;
+ return std::move(out);
}
-boost::optional<Document> DocumentSourceGroup::getNextStreaming() {
+DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() {
// Streaming optimization is active.
if (!_firstDocOfNextGroup) {
- dispose();
- return boost::none;
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced()) {
+ return nextInput;
+ }
+ _firstDocOfNextGroup = nextInput.releaseDocument();
}
Value id;
@@ -134,14 +142,15 @@ boost::optional<Document> DocumentSourceGroup::getNextStreaming() {
// Release our references to the previous input document before asking for the next. This
// makes operations like $unwind more efficient.
_variables->clearRoot();
- _firstDocOfNextGroup = boost::none;
// Retrieve the next document.
- _firstDocOfNextGroup = pSource->getNext();
- if (!_firstDocOfNextGroup) {
- break;
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced()) {
+ return nextInput;
}
+ _firstDocOfNextGroup = nextInput.releaseDocument();
+
_variables->setRoot(*_firstDocOfNextGroup);
// Compute the id. If it does not match _currentId, we will exit the loop, leaving
@@ -152,7 +161,7 @@ boost::optional<Document> DocumentSourceGroup::getNextStreaming() {
Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard);
_currentId = std::move(id);
- return out;
+ return std::move(out);
}
void DocumentSourceGroup::dispose() {
@@ -416,8 +425,7 @@ void getFieldPathListForSpilled(ExpressionObject* expressionObj,
}
} // namespace
-void DocumentSourceGroup::initialize() {
- _initialized = true;
+DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
const size_t numAccumulators = vpAccumulatorFactory.size();
boost::optional<BSONObj> inputSort = findRelevantInputSort();
@@ -434,51 +442,47 @@ void DocumentSourceGroup::initialize() {
}
// We only need to load the first document.
- _firstDocOfNextGroup = pSource->getNext();
-
- if (!_firstDocOfNextGroup) {
- return;
+ auto firstInput = pSource->getNext();
+ if (!firstInput.isAdvanced()) {
+ // Leave '_firstDocOfNextGroup' uninitialized and return.
+ return firstInput;
}
-
+ _firstDocOfNextGroup = firstInput.releaseDocument();
_variables->setRoot(*_firstDocOfNextGroup);
// Compute the _id value.
_currentId = computeId(_variables.get());
- return;
+ _initialized = true;
+ return DocumentSource::GetNextResult::makeEOF();
}
dassert(numAccumulators == vpExpression.size());
- // pushed to on spill()
- vector<shared_ptr<Sorter<Value, Value>::Iterator>> sortedFiles;
- int memoryUsageBytes = 0;
-
- // This loop consumes all input from pSource and buckets it based on pIdExpression.
- while (boost::optional<Document> input = pSource->getNext()) {
- if (memoryUsageBytes > _maxMemoryUsageBytes) {
+ // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'.
+ GetNextResult input = pSource->getNext();
+ for (; input.isAdvanced(); input = pSource->getNext()) {
+ if (_memoryUsageBytes > _maxMemoryUsageBytes) {
uassert(16945,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
_extSortAllowed);
- sortedFiles.push_back(spill());
- memoryUsageBytes = 0;
+ _sortedFiles.push_back(spill());
+ _memoryUsageBytes = 0;
}
- _variables->setRoot(*input);
+ _variables->setRoot(input.releaseDocument());
- /* get the _id value */
Value id = computeId(_variables.get());
- /*
- Look for the _id value in the map; if it's not there, add a
- new entry with a blank accumulator.
- */
+ // Look for the _id value in the map. If it's not there, add a new entry with a blank
+ // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and
+ // looking it up in '_groups' multiple times.
const size_t oldSize = _groups->size();
vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id];
const bool inserted = _groups->size() != oldSize;
if (inserted) {
- memoryUsageBytes += id.getApproximateSize();
+ _memoryUsageBytes += id.getApproximateSize();
// Add the accumulators
group.reserve(numAccumulators);
@@ -489,7 +493,7 @@ void DocumentSourceGroup::initialize() {
} else {
for (size_t i = 0; i < numAccumulators; i++) {
// subtract old mem usage. New usage added back after processing.
- memoryUsageBytes -= group[i]->memUsageForSorter();
+ _memoryUsageBytes -= group[i]->memUsageForSorter();
}
}
@@ -497,7 +501,7 @@ void DocumentSourceGroup::initialize() {
dassert(numAccumulators == group.size());
for (size_t i = 0; i < numAccumulators; i++) {
group[i]->process(vpExpression[i]->evaluate(_variables.get()), _doingMerge);
- memoryUsageBytes += group[i]->memUsageForSorter();
+ _memoryUsageBytes += group[i]->memUsageForSorter();
}
// We are done with the ROOT document so release it.
@@ -505,45 +509,58 @@ void DocumentSourceGroup::initialize() {
if (kDebugBuild && !storageGlobalParams.readOnly) {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
- if (!inserted // is a dup
- &&
- !pExpCtx->inRouter // can't spill to disk in router
- &&
- !_extSortAllowed // don't change behavior when testing external sort
- &&
- sortedFiles.size() < 20 // don't open too many FDs
- ) {
- sortedFiles.push_back(spill());
+ if (!inserted && // is a dup
+ !pExpCtx->inRouter && // can't spill to disk in router
+ !_extSortAllowed && // don't change behavior when testing external sort
+ _sortedFiles.size() < 20) { // don't open too many FDs
+
+ _sortedFiles.push_back(spill());
}
}
}
- // These blocks do any final steps necessary to prepare to output results.
- if (!sortedFiles.empty()) {
- _spilled = true;
- if (!_groups->empty()) {
- sortedFiles.push_back(spill());
+ switch (input.getStatus()) {
+ case DocumentSource::GetNextResult::ReturnStatus::kAdvanced: {
+ MONGO_UNREACHABLE; // We consumed all advances above.
}
+ case DocumentSource::GetNextResult::ReturnStatus::kPauseExecution: {
+ return input; // Propagate pause.
+ }
+ case DocumentSource::GetNextResult::ReturnStatus::kEOF: {
+ // Do any final steps necessary to prepare to output results.
+ if (!_sortedFiles.empty()) {
+ _spilled = true;
+ if (!_groups->empty()) {
+ _sortedFiles.push_back(spill());
+ }
- // We won't be using groups again so free its memory.
- _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
+ // We won't be using groups again so free its memory.
+ _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
- _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
- sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
+ _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
+ _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
- // prepare current to accumulate data
- _currentAccumulators.reserve(numAccumulators);
- for (size_t i = 0; i < numAccumulators; i++) {
- _currentAccumulators.push_back(vpAccumulatorFactory[i]());
- _currentAccumulators.back()->injectExpressionContext(pExpCtx);
- }
+ // prepare current to accumulate data
+ _currentAccumulators.reserve(numAccumulators);
+ for (size_t i = 0; i < numAccumulators; i++) {
+ _currentAccumulators.push_back(vpAccumulatorFactory[i]());
+ _currentAccumulators.back()->injectExpressionContext(pExpCtx);
+ }
- verify(_sorterIterator->more()); // we put data in, we should get something out.
- _firstPartOfNextGroup = _sorterIterator->next();
- } else {
- // start the group iterator
- groupsIterator = _groups->begin();
+ verify(_sorterIterator->more()); // we put data in, we should get something out.
+ _firstPartOfNextGroup = _sorterIterator->next();
+ } else {
+ // start the group iterator
+ groupsIterator = _groups->begin();
+ }
+
+ // This must happen last so that, unless control gets here, we will re-enter
+ // initialization after getting a GetNextResult::ResultState::kPauseExecution.
+ _initialized = true;
+ return input;
+ }
}
+ MONGO_UNREACHABLE;
}
shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
@@ -654,7 +671,9 @@ boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
BSONObjSet DocumentSourceGroup::getOutputSorts() {
if (!_initialized) {
- initialize();
+ initialize(); // Note this might not finish initializing, but that's OK. We just want to
+ // do some initialization to try to determine if we are streaming or spilled.
+ // False negatives are OK.
}
if (!(_streaming || _spilled)) {