diff options
author | Alyssa Wagenmaker <alyssa.wagenmaker@mongodb.com> | 2023-01-09 15:52:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-10 19:21:55 +0000 |
commit | 35832bd3cbfba591689688af6589558432a6bc85 (patch) | |
tree | 443892b9dedb13a45c411264a5218fb0161e5f7a | |
parent | 0f8ed3b2ae6fff6fe7f2050e2a80c571f4cb5c58 (diff) | |
download | mongo-35832bd3cbfba591689688af6589558432a6bc85.tar.gz |
SERVER-71691 handle paused execution in setWindowFields
6 files changed, 104 insertions, 7 deletions
diff --git a/jstests/aggregation/bugs/window_inside_facet.js b/jstests/aggregation/bugs/window_inside_facet.js new file mode 100644 index 00000000000..baf9fcd5150 --- /dev/null +++ b/jstests/aggregation/bugs/window_inside_facet.js @@ -0,0 +1,77 @@ +// Test that $setWindowFields inside $facet correctly propagates its state when it encounters paused +// execution. +(function() { +"use strict"; + +const coll = db.window_inside_facet; +coll.drop(); + +assert.commandWorked(coll.insert([ + {_id: 'a', n: 0}, + {_id: 'b', n: 1}, + {_id: 'c', n: 2}, + {_id: 'd', n: 3}, + {_id: 'e', n: 4}, + {_id: 'f', n: 5} +])); + +// Test window with a sort within $facet alongside another pipeline that will cause it to pause +// execution. The sort will cause the window to hit paused execution immediately, before an advance. +let result = + coll.aggregate({ + $facet: { + facet1: [{ + $setWindowFields: { + output: {prevId: {$shift: {by: -1, default: null, output: "$_id"}}}, + sortBy: {_id: 1} + } + }], + facet2: [{$count: "count"}] + } + }) + .toArray()[0]; +let expected = { + facet1: [ + {_id: 'a', n: 0, prevId: null}, + {_id: 'b', n: 1, prevId: 'a'}, + {_id: 'c', n: 2, prevId: 'b'}, + {_id: 'd', n: 3, prevId: 'c'}, + {_id: 'e', n: 4, prevId: 'd'}, + {_id: 'f', n: 5, prevId: 'e'} + ], + facet2: [{count: 6}] +}; +assert.docEq(expected, result, "$setWindowFields with sort failed."); + +// Test window with no sort within $facet alongside another pipeline that will cause it to pause +// execution. Having no sort will cause the window to hit paused execution after advancing. +result = coll.aggregate({ + $facet: { + facet1: [ + { + $setWindowFields: { + output: { + min: {$min: "$n"}, + max: {$max: "$n"}, + } + } + }, + {$sort: {_id: 1}} + ], + facet2: [{$count: "count"}] + } + }) + .toArray()[0]; +expected = { + facet1: [ + {_id: 'a', n: 0, min: 0, max: 5}, + {_id: 'b', n: 1, min: 0, max: 5}, + {_id: 'c', n: 2, min: 0, max: 5}, + {_id: 'd', n: 3, min: 0, max: 5}, + {_id: 'e', n: 4, min: 0, max: 5}, + {_id: 'f', n: 5, min: 0, max: 5} + ], + facet2: [{count: 6}] +}; +assert.docEq(expected, result, "$setWindowFields without sort failed."); +}()); diff --git a/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js b/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js index 92199015faf..0d4e0158b3e 100644 --- a/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js +++ b/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js @@ -67,7 +67,7 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) { } cmdObj.pipeline = [ - {$facet: {originalPipeline: originalPipeline}}, + {$facet: {originalPipeline: originalPipeline, extraPipeline: [{$count: "count"}]}}, {$unwind: '$originalPipeline'}, {$replaceRoot: {newRoot: '$originalPipeline'}}, ]; diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 5ec204d1fa9..24e86b38cba 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -52,8 +52,10 @@ REGISTER_DOCUMENT_SOURCE(sample, AllowedWithApiStrict::kAlways); DocumentSource::GetNextResult DocumentSourceSample::doGetNext() { - if (_size == 0) + if (_size == 0) { + pSource->dispose(); return GetNextResult::makeEOF(); + } if (!_sortStage->isPopulated()) { // Exhaust source stage, add random metadata, and push all into sorter. diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index 0200ce21cc7..ead167946eb 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -458,8 +458,12 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() return DocumentSource::GetNextResult::makeEOF(); auto curDoc = _iterator.current(); - // The only way we hit this case is if there are no documents, since otherwise _eof will be set. if (!curDoc) { + if (_iterator.isPaused()) { + return DocumentSource::GetNextResult::makePauseExecution(); + } + // The only way we hit this case is if there are no documents, since otherwise _eof will be + // set. _eof = true; return DocumentSource::GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp index ea59f5c6e09..e470103b764 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp @@ -116,8 +116,8 @@ optional<Document> PartitionIterator::operator[](int index) { for (int i = _cache->getHighestIndex(); i < docDesired; i++) { // Pull in document from prior stage. getNextDocument(); - // Check for EOF or the next partition. - if (_state == IteratorState::kAwaitingAdvanceToNext || + // Check whether the next document is available. + if (isPaused() || _state == IteratorState::kAwaitingAdvanceToNext || _state == IteratorState::kAwaitingAdvanceToEOF) { return boost::none; } @@ -163,6 +163,7 @@ PartitionIterator::AdvanceResult PartitionIterator::advanceInternal() { // whether to pull from the prior stage. switch (_state) { case IteratorState::kNotInitialized: + case IteratorState::kPauseExecution: case IteratorState::kIntraPartition: // Pull in the next document and advance the pointer. getNextDocument(); @@ -467,9 +468,12 @@ void PartitionIterator::getNextDocument() { return; } - if (!getNextRes.isAdvanced()) + if (getNextRes.isPaused()) { + _state = IteratorState::kPauseExecution; return; + } + tassert(7169100, "getNextResult must have advanced", getNextRes.isAdvanced()); auto doc = getNextRes.releaseDocument(); // Greedily populate the internal document cache to enable easier memory tracking versus @@ -477,7 +481,7 @@ void PartitionIterator::getNextDocument() { doc.fillCache(); if (_partitionExpr) { - if (_state == IteratorState::kNotInitialized) { + if (!_partitionComparator) { _partitionComparator = std::make_unique<PartitionKeyComparator>(_expCtx, *_partitionExpr, doc); _nextPartitionDoc = std::move(doc); diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h index 128901834ee..28e0e6a6242 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.h +++ b/src/mongo/db/pipeline/window_function/partition_iterator.h @@ -77,6 +77,13 @@ public: return (*this)[0]; } + /** + * Returns true if iterator execution is paused. + */ + bool isPaused() { + return _state == IteratorState::kPauseExecution; + } + enum class AdvanceResult { kAdvanced, kNewPartition, @@ -287,6 +294,9 @@ private: enum class IteratorState { // Default state, no documents have been pulled into the cache. kNotInitialized, + // Input sources do not have a result to be processed yet, but there may be more results in + // the future. + kPauseExecution, // Iterating the current partition. We don't know where the current partition ends, or // whether it's the last partition. kIntraPartition, |