diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2019-11-22 21:27:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-22 21:27:40 +0000 |
commit | 9f1e78ea0a51bae65b557db53eee1fb6489f31c4 (patch) | |
tree | 77b2a53955e4a0d2b5941d49c1381246300d6444 /src/mongo/db/pipeline | |
parent | 44497e0e5c0388f9ef790d9c95737816a377d29d (diff) | |
download | mongo-9f1e78ea0a51bae65b557db53eee1fb6489f31c4.tar.gz |
SERVER-44583 M/R Agg: Add explain support for mapReduce command
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_writer.h | 88 |
1 files changed, 45 insertions, 43 deletions
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index dd5dd0c37a5..5110582faeb 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -170,54 +170,56 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { return GetNextResult::makeEOF(); } - if (!_initialized) { - // Explain should never try to actually execute any writes. We only ever expect - // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain - // modes. This assertion should not be triggered for 'queryPlanner' explain, which - // is perfectly legal. - uassert(51029, - "explain of {} is not allowed with verbosity {}"_format( - getSourceName(), ExplainOptions::verbosityString(*pExpCtx->explain)), - !pExpCtx->explain); - initialize(); - _initialized = true; - } - - BatchedObjects batch; - int bufferedBytes = 0; - - auto nextInput = pSource->getNext(); - for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - waitWhileFailPointEnabled(); - - auto doc = nextInput.releaseDocument(); - auto [obj, objSize] = makeBatchObject(std::move(doc)); + // Ignore writes and exhaust input if we are in explain mode. + if (pExpCtx->explain) { + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + } + _done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF; + return nextInput; + } else { + if (!_initialized) { + initialize(); + _initialized = true; + } - bufferedBytes += objSize; - if (!batch.empty() && - (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { + BatchedObjects batch; + int bufferedBytes = 0; + + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + waitWhileFailPointEnabled(); + + auto doc = nextInput.releaseDocument(); + auto [obj, objSize] = makeBatchObject(std::move(doc)); + + bufferedBytes += objSize; + if (!batch.empty() && + (bufferedBytes > BSONObjMaxUserSize || + batch.size() >= write_ops::kMaxWriteBatchSize)) { + spill(std::move(batch)); + batch.clear(); + bufferedBytes = objSize; + } + batch.push_back(obj); + } + if (!batch.empty()) { spill(std::move(batch)); batch.clear(); - bufferedBytes = objSize; } - batch.push_back(obj); - } - if (!batch.empty()) { - spill(std::move(batch)); - batch.clear(); - } - switch (nextInput.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - MONGO_UNREACHABLE; // We consumed all advances above. - } - case GetNextResult::ReturnStatus::kPauseExecution: { - return nextInput; // Propagate the pause. - } - case GetNextResult::ReturnStatus::kEOF: { - _done = true; - finalize(); - return nextInput; + switch (nextInput.getStatus()) { + case GetNextResult::ReturnStatus::kAdvanced: { + MONGO_UNREACHABLE; // We consumed all advances above. + } + case GetNextResult::ReturnStatus::kPauseExecution: { + return nextInput; // Propagate the pause. + } + case GetNextResult::ReturnStatus::kEOF: { + _done = true; + finalize(); + return nextInput; + } } } MONGO_UNREACHABLE; |