summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorsamontea <merciers.merciers@gmail.com>2022-04-04 15:52:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-04 16:36:38 +0000
commit553be4ed9ad7ecd723d81b825361fb255624ed8b (patch)
treebfc7a776b4b8fbdd2e4ea6cf7cfd90a047a97fb5 /src/mongo
parent5448856fd7cf65bd7d480f2028f04aa2e568ee75 (diff)
downloadmongo-553be4ed9ad7ecd723d81b825361fb255624ed8b.tar.gz
SERVER-57000 Fix handling of correlated pipeline with facet
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp78
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp8
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp17
-rw-r--r--src/mongo/db/pipeline/pipeline.h7
7 files changed, 130 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index de4b1a5fb4c..f1ba018554c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -516,6 +516,7 @@ env.CppUnitTest(
target='pipeline_test',
source=[
'dependencies_test.cpp',
+ 'document_source_sequential_document_cache_test.cpp',
'pipeline_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
index 0fb88369117..237eda6355a 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
@@ -54,9 +54,17 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
pExpCtx->checkForInterrupt();
+ if (_cacheIsEOF) {
+ return GetNextResult::makeEOF();
+ }
+
if (_cache->isServing()) {
auto nextDoc = _cache->getNext();
- return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF());
+ if (nextDoc) {
+ return std::move(*nextDoc);
+ }
+ _cacheIsEOF = true;
+ return GetNextResult::makeEOF();
}
auto nextResult = pSource->getNext();
@@ -64,6 +72,7 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
if (!_cache->isAbandoned()) {
if (nextResult.isEOF()) {
_cache->freeze();
+ _cacheIsEOF = true;
} else {
_cache->add(nextResult.getDocument());
}
@@ -111,11 +120,14 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt
DepsTracker deps(DepsTracker::kAllMetadataAvailable);
// Iterate through the pipeline stages until we find one which references an external variable.
+ DocumentSource* lastPtr = nullptr;
for (; prefixSplit != container->end(); ++prefixSplit) {
if (((*prefixSplit)->getDependencies(&deps) == DepsTracker::State::NOT_SUPPORTED) ||
deps.hasVariableReferenceTo(varIDs)) {
break;
}
+
+ lastPtr = prefixSplit->get();
}
// The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If
@@ -128,6 +140,9 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt
// If the cache has been populated and is serving results, remove the non-correlated prefix.
if (_cache->isServing()) {
+ // Need to dispose last stage to be removed.
+ Pipeline::stitch(container);
+ lastPtr->dispose();
container->erase(container->begin(), prefixSplit);
}
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index 0031ca8694b..d8d2ba90db8 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -96,6 +96,10 @@ private:
SequentialDocumentCache* _cache;
+ // This flag is set to prevent the cache stage from immediately serving from the cache after it
+ // has exhausted input from its source for the first time.
+ bool _cacheIsEOF = false;
+
bool _hasOptimizedPos = false;
};
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp
new file mode 100644
index 00000000000..fcead5d1f67
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bson_depth.h"
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceSequentialDocumentCacheTest = AggregationContextFixture;
+
+const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load();
+
+TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFOnSubsequentCallsAfterSourceExhausted) {
+ SequentialDocumentCache cache(kDefaultMaxCacheSize);
+ auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache);
+
+ auto source = DocumentSourceMock::createForTest({"{a: 1, b: 2}", "{a: 3, b: 4}"});
+ documentCache->setSource(source.get());
+
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isEOF());
+ ASSERT(documentCache->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFAfterCacheExhausted) {
+ SequentialDocumentCache cache(kDefaultMaxCacheSize);
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+ cache.freeze();
+
+ auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache);
+
+ ASSERT(cache.isServing());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isEOF());
+ ASSERT(documentCache->getNext().isEOF());
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index 4575e9fa02b..46ae082afe5 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -60,6 +60,10 @@ const char* DocumentSourceSingleDocumentTransformation::getSourceName() const {
DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNext() {
pExpCtx->checkForInterrupt();
+ if (!_parsedTransform) {
+ return DocumentSource::GetNextResult::makeEOF();
+ }
+
// Get the next input document.
auto input = pSource->getNext();
if (!input.isAdvanced()) {
@@ -71,7 +75,9 @@ DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNex
}
intrusive_ptr<DocumentSource> DocumentSourceSingleDocumentTransformation::optimize() {
- _parsedTransform->optimize();
+ if (_parsedTransform) {
+ _parsedTransform->optimize();
+ }
return this;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index c4ac29cc21b..85c4beec2a6 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -476,6 +476,23 @@ void Pipeline::stitch() {
}
}
+void Pipeline::stitch(SourceContainer* container) {
+ if (container->empty()) {
+ return;
+ }
+
+ // Chain together all the stages.
+ DocumentSource* prevSource = container->front().get();
+ prevSource->setSource(nullptr);
+ for (Pipeline::SourceContainer::iterator iter(++container->begin()), listEnd(container->end());
+ iter != listEnd;
+ ++iter) {
+ intrusive_ptr<DocumentSource> pTemp(*iter);
+ pTemp->setSource(prevSource);
+ prevSource = pTemp.get();
+ }
+}
+
boost::optional<Document> Pipeline::getNext() {
invariant(!_sources.empty());
auto nextResult = _sources.back()->getNext();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index b6dc48a9ccd..ad5cb37e0ff 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -285,6 +285,13 @@ public:
}
/**
+ * Stitch together the source pointers by calling setSource() for each source in 'container'.
+ * This function must be called any time the order of stages within the container changes, e.g.
+ * in optimizeContainer().
+ */
+ static void stitch(SourceContainer* container);
+
+ /**
* Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is
* empty.
*/