summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamontea <merciers.merciers@gmail.com>2022-04-04 15:52:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-04 16:36:38 +0000
commit553be4ed9ad7ecd723d81b825361fb255624ed8b (patch)
treebfc7a776b4b8fbdd2e4ea6cf7cfd90a047a97fb5
parent5448856fd7cf65bd7d480f2028f04aa2e568ee75 (diff)
downloadmongo-553be4ed9ad7ecd723d81b825361fb255624ed8b.tar.gz
SERVER-57000 Fix handling of correlated pipeline with facet
-rw-r--r--jstests/aggregation/bugs/lookup_cache_safe_prefix_disposal.js54
-rw-r--r--jstests/aggregation/sources/lookup/lookup_non_correlated_prefix.js98
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp78
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp8
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp17
-rw-r--r--src/mongo/db/pipeline/pipeline.h7
9 files changed, 282 insertions, 2 deletions
diff --git a/jstests/aggregation/bugs/lookup_cache_safe_prefix_disposal.js b/jstests/aggregation/bugs/lookup_cache_safe_prefix_disposal.js
new file mode 100644
index 00000000000..7ffe9e3b372
--- /dev/null
+++ b/jstests/aggregation/bugs/lookup_cache_safe_prefix_disposal.js
@@ -0,0 +1,54 @@
+// This test was designed to reproduce SERVER-59435. A $lookup sub-pipeline which is entirely
+// uncorrelated can have its results be cached and re-used for subsequent $lookups. To do this we
+// insert a special caching stage in the $lookup sub-pipeline at a place where everything "in front
+// of it" or "to the left" is uncorrelated (the same for each lookup). For the first lookup, the
+// cache just sits there and collects results. In the second lookup it is inserted in the same way
+// to the new pipeline (which may be different if there were correlated pieces that need to be
+// updated on each lookup), and then after being inserted it *deletes* the prefix of the pipeline
+// which is no longer needed because the results will now come from the cache. There was a bug where
+// this deletion happened in a state where the pipeline was not prepared for destruction and
+// followed an old/invalid pointer. This test case reproduces the scenario that discovered this bug
+// and is simply designed to demonstrate that we no longer crash.
+//
+// @tags: [
+// # $lookup containing $facet is not allowed in $facet (because $facet is not allowed in $facet).
+// do_not_wrap_aggregations_in_facets,
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/fixture_helpers.js"); // For isSharded.
+
+const joinColl = db.lookup_non_correlated_prefix_join;
+const testColl = db.lookup_non_correlated_prefix;
+joinColl.drop();
+testColl.drop();
+
+// Do not run the rest of the tests if the foreign collection is implicitly sharded but the flag to
+// allow $lookup/$graphLookup into a sharded collection is disabled.
+const getShardedLookupParam = db.adminCommand({getParameter: 1, featureFlagShardedLookup: 1});
+const isShardedLookupEnabled = getShardedLookupParam.hasOwnProperty("featureFlagShardedLookup") &&
+ getShardedLookupParam.featureFlagShardedLookup.value;
+if (FixtureHelpers.isSharded(joinColl) && !isShardedLookupEnabled) {
+ return;
+}
+// We need two documents to make sure we have time to (1) populate the cache and (2) attempt to
+// re-use the cache.
+testColl.insert([{}, {}]);
+
+assert.doesNotThrow(() => testColl.aggregate([{
+ $lookup: {
+ as: 'items_check',
+ from: joinColl.getName(),
+ let: { id: '$_id' },
+ pipeline: [
+ // This pipeline is interesting - the $match stage will swap before the $addFields. In doing
+ // so, it will create a copy and destroy itself, which will leave $facet with a dangling
+ // pointer to an old $match stage which is no longer valid.
+ { $addFields: { id: '$_id' } },
+ { $match: {} },
+ { $facet: { all: [{ $match: {} }] } }
+ ]
+ }
+}]));
+}());
diff --git a/jstests/aggregation/sources/lookup/lookup_non_correlated_prefix.js b/jstests/aggregation/sources/lookup/lookup_non_correlated_prefix.js
new file mode 100644
index 00000000000..b4feed0ab31
--- /dev/null
+++ b/jstests/aggregation/sources/lookup/lookup_non_correlated_prefix.js
@@ -0,0 +1,98 @@
+// Cannot implicitly shard accessed collections as $lookup does not support sharded target
+// collection. Facet in a lookup cannot be wrapped in a facet.
+// @tags: [assumes_unsharded_collection, do_not_wrap_aggregations_in_facets]
+
+/**
+ * Confirms that $lookup with a non-correlated prefix returns expected results.
+ */
+(function() {
+"use strict";
+
+const testColl = db.lookup_non_correlated_prefix;
+testColl.drop();
+const joinColl = db.lookup_non_correlated_prefix_join;
+joinColl.drop();
+
+const users = [
+ {
+ _id: "user_1",
+ },
+ {
+ _id: "user_2",
+ },
+];
+let res = assert.commandWorked(testColl.insert(users));
+
+const items = [
+ {_id: "item_1", owner: "user_1"},
+ {_id: "item_2", owner: "user_2"},
+];
+res = assert.commandWorked(joinColl.insert(items));
+
+// $lookup with non-correlated prefix followed by correlated pipeline suffix containing $facet
+// returns correct results. This test confirms the fix for SERVER-41714.
+let cursor = testColl.aggregate([
+ {
+ $lookup: {
+ as: 'items_check',
+ from: joinColl.getName(),
+ let : {id: '$_id'},
+ pipeline: [
+ {$addFields: {id: '$_id'}},
+ {$match: {$expr: {$eq: ['$$id', '$owner']}}},
+ {
+ $facet: {
+ all: [{$match: {}}],
+ },
+ },
+ ],
+ },
+ },
+]);
+assert(cursor.hasNext());
+cursor.toArray().forEach(user => {
+ const joinedDocs = user['items_check'][0]['all'];
+ assert.neq(null, joinedDocs);
+ assert.eq(1, joinedDocs.length);
+ assert.eq(user['_id'], joinedDocs[0].owner);
+});
+
+cursor = testColl.aggregate([
+ {
+ $lookup: {
+ as: 'items_check',
+ from: joinColl.getName(),
+ let : {id: '$_id'},
+ pipeline: [
+ {$addFields: {id: '$_id'}},
+ {$match: {$expr: {$eq: ['$$id', '$owner']}}},
+ ],
+ },
+ },
+]);
+assert(cursor.hasNext());
+cursor.toArray().forEach(user => {
+ const joinedDocs = user['items_check'];
+ assert.neq(null, joinedDocs);
+ assert.eq(1, joinedDocs.length);
+ assert.eq(user['_id'], joinedDocs[0].owner);
+});
+
+// SERVER-57000: Test handling of lack of correlation (addFields with empty set of columns)
+assert.doesNotThrow(() => testColl.aggregate([
+ {
+ $lookup: {
+ as: 'items_check',
+ from: joinColl.getName(),
+ pipeline: [
+ {$addFields: {}},
+ {
+ $facet: {
+ all: [{$match: {}}],
+ },
+ },
+ ],
+ },
+ },
+]));
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index de4b1a5fb4c..f1ba018554c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -516,6 +516,7 @@ env.CppUnitTest(
target='pipeline_test',
source=[
'dependencies_test.cpp',
+ 'document_source_sequential_document_cache_test.cpp',
'pipeline_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
index 0fb88369117..237eda6355a 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
@@ -54,9 +54,17 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
pExpCtx->checkForInterrupt();
+ if (_cacheIsEOF) {
+ return GetNextResult::makeEOF();
+ }
+
if (_cache->isServing()) {
auto nextDoc = _cache->getNext();
- return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF());
+ if (nextDoc) {
+ return std::move(*nextDoc);
+ }
+ _cacheIsEOF = true;
+ return GetNextResult::makeEOF();
}
auto nextResult = pSource->getNext();
@@ -64,6 +72,7 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
if (!_cache->isAbandoned()) {
if (nextResult.isEOF()) {
_cache->freeze();
+ _cacheIsEOF = true;
} else {
_cache->add(nextResult.getDocument());
}
@@ -111,11 +120,14 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt
DepsTracker deps(DepsTracker::kAllMetadataAvailable);
// Iterate through the pipeline stages until we find one which references an external variable.
+ DocumentSource* lastPtr = nullptr;
for (; prefixSplit != container->end(); ++prefixSplit) {
if (((*prefixSplit)->getDependencies(&deps) == DepsTracker::State::NOT_SUPPORTED) ||
deps.hasVariableReferenceTo(varIDs)) {
break;
}
+
+ lastPtr = prefixSplit->get();
}
// The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If
@@ -128,6 +140,9 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt
// If the cache has been populated and is serving results, remove the non-correlated prefix.
if (_cache->isServing()) {
+ // Need to dispose last stage to be removed.
+ Pipeline::stitch(container);
+ lastPtr->dispose();
container->erase(container->begin(), prefixSplit);
}
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index 0031ca8694b..d8d2ba90db8 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -96,6 +96,10 @@ private:
SequentialDocumentCache* _cache;
+ // This flag is set to prevent the cache stage from immediately serving from the cache after it
+ // has exhausted input from its source for the first time.
+ bool _cacheIsEOF = false;
+
bool _hasOptimizedPos = false;
};
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp
new file mode 100644
index 00000000000..fcead5d1f67
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bson_depth.h"
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceSequentialDocumentCacheTest = AggregationContextFixture;
+
+const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load();
+
+TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFOnSubsequentCallsAfterSourceExhausted) {
+ SequentialDocumentCache cache(kDefaultMaxCacheSize);
+ auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache);
+
+ auto source = DocumentSourceMock::createForTest({"{a: 1, b: 2}", "{a: 3, b: 4}"});
+ documentCache->setSource(source.get());
+
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isEOF());
+ ASSERT(documentCache->getNext().isEOF());
+}
+
+TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFAfterCacheExhausted) {
+ SequentialDocumentCache cache(kDefaultMaxCacheSize);
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+ cache.freeze();
+
+ auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache);
+
+ ASSERT(cache.isServing());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isAdvanced());
+ ASSERT(documentCache->getNext().isEOF());
+ ASSERT(documentCache->getNext().isEOF());
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index 4575e9fa02b..46ae082afe5 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -60,6 +60,10 @@ const char* DocumentSourceSingleDocumentTransformation::getSourceName() const {
DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNext() {
pExpCtx->checkForInterrupt();
+ if (!_parsedTransform) {
+ return DocumentSource::GetNextResult::makeEOF();
+ }
+
// Get the next input document.
auto input = pSource->getNext();
if (!input.isAdvanced()) {
@@ -71,7 +75,9 @@ DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNex
}
intrusive_ptr<DocumentSource> DocumentSourceSingleDocumentTransformation::optimize() {
- _parsedTransform->optimize();
+ if (_parsedTransform) {
+ _parsedTransform->optimize();
+ }
return this;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index c4ac29cc21b..85c4beec2a6 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -476,6 +476,23 @@ void Pipeline::stitch() {
}
}
+void Pipeline::stitch(SourceContainer* container) {
+ if (container->empty()) {
+ return;
+ }
+
+ // Chain together all the stages.
+ DocumentSource* prevSource = container->front().get();
+ prevSource->setSource(nullptr);
+ for (Pipeline::SourceContainer::iterator iter(++container->begin()), listEnd(container->end());
+ iter != listEnd;
+ ++iter) {
+ intrusive_ptr<DocumentSource> pTemp(*iter);
+ pTemp->setSource(prevSource);
+ prevSource = pTemp.get();
+ }
+}
+
boost::optional<Document> Pipeline::getNext() {
invariant(!_sources.empty());
auto nextResult = _sources.back()->getNext();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index b6dc48a9ccd..ad5cb37e0ff 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -285,6 +285,13 @@ public:
}
/**
+ * Stitch together the source pointers by calling setSource() for each source in 'container'.
+ * This function must be called any time the order of stages within the container changes, e.g.
+ * in optimizeContainer().
+ */
+ static void stitch(SourceContainer* container);
+
+ /**
* Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is
* empty.
*/