summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlyssa Wagenmaker <alyssa.wagenmaker@mongodb.com>2023-01-09 17:58:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-09 20:12:49 +0000
commit9634296064c60f12d3d8055fbb101109143967a5 (patch)
tree680046eb0a73b778379d9fc65c0874d806724c69
parent0a997440edbb4bb546068acf2396ba5d254f3df0 (diff)
downloadmongo-9634296064c60f12d3d8055fbb101109143967a5.tar.gz
SERVER-71691 handle paused execution in setWindowFields
-rw-r--r--jstests/aggregation/bugs/window_inside_facet.js77
-rw-r--r--jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js2
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp6
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.cpp12
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.h10
6 files changed, 104 insertions, 7 deletions
diff --git a/jstests/aggregation/bugs/window_inside_facet.js b/jstests/aggregation/bugs/window_inside_facet.js
new file mode 100644
index 00000000000..baf9fcd5150
--- /dev/null
+++ b/jstests/aggregation/bugs/window_inside_facet.js
@@ -0,0 +1,77 @@
+// Test that $setWindowFields inside $facet correctly propagates its state when it encounters paused
+// execution.
+(function() {
+"use strict";
+
+const coll = db.window_inside_facet;
+coll.drop();
+
+assert.commandWorked(coll.insert([
+ {_id: 'a', n: 0},
+ {_id: 'b', n: 1},
+ {_id: 'c', n: 2},
+ {_id: 'd', n: 3},
+ {_id: 'e', n: 4},
+ {_id: 'f', n: 5}
+]));
+
+// Test window with a sort within $facet alongside another pipeline that will cause it to pause
+// execution. The sort will cause the window to hit paused execution immediately, before an advance.
+let result =
+ coll.aggregate({
+ $facet: {
+ facet1: [{
+ $setWindowFields: {
+ output: {prevId: {$shift: {by: -1, default: null, output: "$_id"}}},
+ sortBy: {_id: 1}
+ }
+ }],
+ facet2: [{$count: "count"}]
+ }
+ })
+ .toArray()[0];
+let expected = {
+ facet1: [
+ {_id: 'a', n: 0, prevId: null},
+ {_id: 'b', n: 1, prevId: 'a'},
+ {_id: 'c', n: 2, prevId: 'b'},
+ {_id: 'd', n: 3, prevId: 'c'},
+ {_id: 'e', n: 4, prevId: 'd'},
+ {_id: 'f', n: 5, prevId: 'e'}
+ ],
+ facet2: [{count: 6}]
+};
+assert.docEq(expected, result, "$setWindowFields with sort failed.");
+
+// Test window with no sort within $facet alongside another pipeline that will cause it to pause
+// execution. Having no sort will cause the window to hit paused execution after advancing.
+result = coll.aggregate({
+ $facet: {
+ facet1: [
+ {
+ $setWindowFields: {
+ output: {
+ min: {$min: "$n"},
+ max: {$max: "$n"},
+ }
+ }
+ },
+ {$sort: {_id: 1}}
+ ],
+ facet2: [{$count: "count"}]
+ }
+ })
+ .toArray()[0];
+expected = {
+ facet1: [
+ {_id: 'a', n: 0, min: 0, max: 5},
+ {_id: 'b', n: 1, min: 0, max: 5},
+ {_id: 'c', n: 2, min: 0, max: 5},
+ {_id: 'd', n: 3, min: 0, max: 5},
+ {_id: 'e', n: 4, min: 0, max: 5},
+ {_id: 'f', n: 5, min: 0, max: 5}
+ ],
+ facet2: [{count: 6}]
+};
+assert.docEq(expected, result, "$setWindowFields without sort failed.");
+}());
diff --git a/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js b/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js
index 92199015faf..0d4e0158b3e 100644
--- a/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js
+++ b/jstests/libs/override_methods/implicitly_wrap_pipelines_in_facets.js
@@ -67,7 +67,7 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) {
}
cmdObj.pipeline = [
- {$facet: {originalPipeline: originalPipeline}},
+ {$facet: {originalPipeline: originalPipeline, extraPipeline: [{$count: "count"}]}},
{$unwind: '$originalPipeline'},
{$replaceRoot: {newRoot: '$originalPipeline'}},
];
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 5ec204d1fa9..24e86b38cba 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -52,8 +52,10 @@ REGISTER_DOCUMENT_SOURCE(sample,
AllowedWithApiStrict::kAlways);
DocumentSource::GetNextResult DocumentSourceSample::doGetNext() {
- if (_size == 0)
+ if (_size == 0) {
+ pSource->dispose();
return GetNextResult::makeEOF();
+ }
if (!_sortStage->isPopulated()) {
// Exhaust source stage, add random metadata, and push all into sorter.
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index 45223bb1cf4..73806992561 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -464,8 +464,12 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
return DocumentSource::GetNextResult::makeEOF();
auto curDoc = _iterator.current();
- // The only way we hit this case is if there are no documents, since otherwise _eof will be set.
if (!curDoc) {
+ if (_iterator.isPaused()) {
+ return DocumentSource::GetNextResult::makePauseExecution();
+ }
+ // The only way we hit this case is if there are no documents, since otherwise _eof will be
+ // set.
_eof = true;
return DocumentSource::GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
index d8992aec28c..7962fee00fc 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
@@ -116,8 +116,8 @@ optional<Document> PartitionIterator::operator[](int index) {
for (int i = _cache->getHighestIndex(); i < docDesired; i++) {
// Pull in document from prior stage.
getNextDocument();
- // Check for EOF or the next partition.
- if (_state == IteratorState::kAwaitingAdvanceToNext ||
+ // Check whether the next document is available.
+ if (isPaused() || _state == IteratorState::kAwaitingAdvanceToNext ||
_state == IteratorState::kAwaitingAdvanceToEOF) {
return boost::none;
}
@@ -163,6 +163,7 @@ PartitionIterator::AdvanceResult PartitionIterator::advanceInternal() {
// whether to pull from the prior stage.
switch (_state) {
case IteratorState::kNotInitialized:
+ case IteratorState::kPauseExecution:
case IteratorState::kIntraPartition:
// Pull in the next document and advance the pointer.
getNextDocument();
@@ -467,9 +468,12 @@ void PartitionIterator::getNextDocument() {
return;
}
- if (!getNextRes.isAdvanced())
+ if (getNextRes.isPaused()) {
+ _state = IteratorState::kPauseExecution;
return;
+ }
+ tassert(7169100, "getNextResult must have advanced", getNextRes.isAdvanced());
auto doc = getNextRes.releaseDocument();
// Greedily populate the internal document cache to enable easier memory tracking versus
@@ -477,7 +481,7 @@ void PartitionIterator::getNextDocument() {
doc.fillCache();
if (_partitionExpr) {
- if (_state == IteratorState::kNotInitialized) {
+ if (!_partitionComparator) {
_partitionComparator =
std::make_unique<PartitionKeyComparator>(_expCtx, *_partitionExpr, doc);
_nextPartitionDoc = std::move(doc);
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h
index 128901834ee..28e0e6a6242 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.h
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.h
@@ -77,6 +77,13 @@ public:
return (*this)[0];
}
+ /**
+ * Returns true if iterator execution is paused.
+ */
+ bool isPaused() {
+ return _state == IteratorState::kPauseExecution;
+ }
+
enum class AdvanceResult {
kAdvanced,
kNewPartition,
@@ -287,6 +294,9 @@ private:
enum class IteratorState {
// Default state, no documents have been pulled into the cache.
kNotInitialized,
+ // Input sources do not have a result to be processed yet, but there may be more results in
+ // the future.
+ kPauseExecution,
// Iterating the current partition. We don't know where the current partition ends, or
// whether it's the last partition.
kIntraPartition,