summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <cswanson310@gmail.com>2016-08-29 10:06:41 -0400
committerCharlie Swanson <cswanson310@gmail.com>2016-09-01 14:08:25 -0400
commitb1014fe1b40a69cd90b27cb336a170317eecc6b7 (patch)
treec189d2d931cdfbdec83359f08cbb1639c1d5e254 /src/mongo/db/pipeline/document_source_lookup.cpp
parentd289e240b653e70a7d90be885a3ad6de21b7c6cb (diff)
downloadmongo-b1014fe1b40a69cd90b27cb336a170317eecc6b7.tar.gz
SERVER-24153 Allow pipelines within $facet stage to process in batches.
This approach removes the need to buffer all documents in memory, thus removing concerns about spilling intermediate results to disk.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp31
1 files changed, 18 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 73d274eef06..5c9856f93ab 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -85,7 +85,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const vector<Value>&
} // namespace
-boost::optional<Document> DocumentSourceLookUp::getNext() {
+DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
pExpCtx->checkForInterrupt();
uassert(4567, "from collection cannot be sharded", !_mongod->isSharded(_fromExpCtx->ns));
@@ -102,23 +102,25 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
return unwindResult();
}
- boost::optional<Document> input = pSource->getNext();
- if (!input)
- return {};
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced()) {
+ return nextInput;
+ }
+ auto inputDoc = nextInput.releaseDocument();
// If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
// '_handlingUnwind' would be set to true, and we would not have made it here.
invariant(!_matchSrc);
auto matchStage =
- makeMatchStageFromInput(*input, _localField, _foreignFieldFieldName, BSONObj());
+ makeMatchStageFromInput(inputDoc, _localField, _foreignFieldFieldName, BSONObj());
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = matchStage;
auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
std::vector<Value> results;
int objsize = 0;
- while (auto result = pipeline->output()->getNext()) {
+ while (auto result = pipeline->getNext()) {
objsize += result->getApproximateSize();
uassert(4568,
str::stream() << "Total size of documents in " << _fromNs.coll() << " matching "
@@ -128,7 +130,7 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
results.emplace_back(std::move(*result));
}
- MutableDocument output(std::move(*input));
+ MutableDocument output(std::move(inputDoc));
output.setNestedField(_as, Value(std::move(results)));
return output.freeze();
}
@@ -345,16 +347,19 @@ BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input,
return match.obj();
}
-boost::optional<Document> DocumentSourceLookUp::unwindResult() {
+DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
const boost::optional<FieldPath> indexPath(_unwindSrc->indexPath());
// Loop until we get a document that has at least one match.
// Note we may return early from this loop if our source stage is exhausted or if the unwind
// source was asked to return empty arrays and we get a document without a match.
while (!_pipeline || !_nextValue) {
- _input = pSource->getNext();
- if (!_input)
- return {};
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced()) {
+ return nextInput;
+ }
+
+ _input = nextInput.releaseDocument();
BSONObj filter = _additionalFilter.value_or(BSONObj());
auto matchStage =
@@ -364,7 +369,7 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
_pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
_cursorIndex = 0;
- _nextValue = _pipeline->output()->getNext();
+ _nextValue = _pipeline->getNext();
if (_unwindSrc->preserveNullAndEmptyArrays() && !_nextValue) {
// There were no results for this cursor, but the $unwind was asked to preserve empty
@@ -382,7 +387,7 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
invariant(bool(_input) && bool(_nextValue));
auto currentValue = *_nextValue;
- _nextValue = _pipeline->output()->getNext();
+ _nextValue = _pipeline->getNext();
// Move input document into output if this is the last or only result, otherwise perform a copy.
MutableDocument output(_nextValue ? *_input : std::move(*_input));