diff options
author | samontea <merciers.merciers@gmail.com> | 2022-04-04 15:52:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-04 16:36:38 +0000 |
commit | 553be4ed9ad7ecd723d81b825361fb255624ed8b (patch) | |
tree | bfc7a776b4b8fbdd2e4ea6cf7cfd90a047a97fb5 /src/mongo | |
parent | 5448856fd7cf65bd7d480f2028f04aa2e568ee75 (diff) | |
download | mongo-553be4ed9ad7ecd723d81b825361fb255624ed8b.tar.gz |
SERVER-57000 Fix handling of correlated pipeline with facet
Diffstat (limited to 'src/mongo')
7 files changed, 130 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index de4b1a5fb4c..f1ba018554c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -516,6 +516,7 @@ env.CppUnitTest( target='pipeline_test', source=[ 'dependencies_test.cpp', + 'document_source_sequential_document_cache_test.cpp', 'pipeline_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp index 0fb88369117..237eda6355a 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp @@ -54,9 +54,17 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() { pExpCtx->checkForInterrupt(); + if (_cacheIsEOF) { + return GetNextResult::makeEOF(); + } + if (_cache->isServing()) { auto nextDoc = _cache->getNext(); - return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF()); + if (nextDoc) { + return std::move(*nextDoc); + } + _cacheIsEOF = true; + return GetNextResult::makeEOF(); } auto nextResult = pSource->getNext(); @@ -64,6 +72,7 @@ DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() { if (!_cache->isAbandoned()) { if (nextResult.isEOF()) { _cache->freeze(); + _cacheIsEOF = true; } else { _cache->add(nextResult.getDocument()); } @@ -111,11 +120,14 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt DepsTracker deps(DepsTracker::kAllMetadataAvailable); // Iterate through the pipeline stages until we find one which references an external variable. + DocumentSource* lastPtr = nullptr; for (; prefixSplit != container->end(); ++prefixSplit) { if (((*prefixSplit)->getDependencies(&deps) == DepsTracker::State::NOT_SUPPORTED) || deps.hasVariableReferenceTo(varIDs)) { break; } + + lastPtr = prefixSplit->get(); } // The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If @@ -128,6 +140,9 @@ Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOpt // If the cache has been populated and is serving results, remove the non-correlated prefix. if (_cache->isServing()) { + // Need to dispose last stage to be removed. + Pipeline::stitch(container); + lastPtr->dispose(); container->erase(container->begin(), prefixSplit); } diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index 0031ca8694b..d8d2ba90db8 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -96,6 +96,10 @@ private: SequentialDocumentCache* _cache; + // This flag is set to prevent the cache stage from immediately serving from the cache after it + // has exhausted input from its source for the first time. + bool _cacheIsEOF = false; + bool _hasOptimizedPos = false; }; diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp new file mode 100644 index 00000000000..fcead5d1f67 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache_test.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bson_depth.h" +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/json.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceSequentialDocumentCacheTest = AggregationContextFixture; + +const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load(); + +TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFOnSubsequentCallsAfterSourceExhausted) { + SequentialDocumentCache cache(kDefaultMaxCacheSize); + auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache); + + auto source = DocumentSourceMock::createForTest({"{a: 1, b: 2}", "{a: 3, b: 4}"}); + documentCache->setSource(source.get()); + + ASSERT(documentCache->getNext().isAdvanced()); + ASSERT(documentCache->getNext().isAdvanced()); + ASSERT(documentCache->getNext().isEOF()); + ASSERT(documentCache->getNext().isEOF()); +} + +TEST_F(DocumentSourceSequentialDocumentCacheTest, ReturnsEOFAfterCacheExhausted) { + SequentialDocumentCache cache(kDefaultMaxCacheSize); + cache.add(DOC("_id" << 0)); + cache.add(DOC("_id" << 1)); + cache.freeze(); + + auto documentCache = DocumentSourceSequentialDocumentCache::create(getExpCtx(), &cache); + + ASSERT(cache.isServing()); + ASSERT(documentCache->getNext().isAdvanced()); + ASSERT(documentCache->getNext().isAdvanced()); + ASSERT(documentCache->getNext().isEOF()); + ASSERT(documentCache->getNext().isEOF()); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index 4575e9fa02b..46ae082afe5 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -60,6 +60,10 @@ const char* DocumentSourceSingleDocumentTransformation::getSourceName() const { DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNext() { pExpCtx->checkForInterrupt(); + if (!_parsedTransform) { + return DocumentSource::GetNextResult::makeEOF(); + } + // Get the next input document. auto input = pSource->getNext(); if (!input.isAdvanced()) { @@ -71,7 +75,9 @@ DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNex } intrusive_ptr<DocumentSource> DocumentSourceSingleDocumentTransformation::optimize() { - _parsedTransform->optimize(); + if (_parsedTransform) { + _parsedTransform->optimize(); + } return this; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index c4ac29cc21b..85c4beec2a6 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -476,6 +476,23 @@ void Pipeline::stitch() { } } +void Pipeline::stitch(SourceContainer* container) { + if (container->empty()) { + return; + } + + // Chain together all the stages. + DocumentSource* prevSource = container->front().get(); + prevSource->setSource(nullptr); + for (Pipeline::SourceContainer::iterator iter(++container->begin()), listEnd(container->end()); + iter != listEnd; + ++iter) { + intrusive_ptr<DocumentSource> pTemp(*iter); + pTemp->setSource(prevSource); + prevSource = pTemp.get(); + } +} + boost::optional<Document> Pipeline::getNext() { invariant(!_sources.empty()); auto nextResult = _sources.back()->getNext(); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index b6dc48a9ccd..ad5cb37e0ff 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -285,6 +285,13 @@ public: } /** + * Stitch together the source pointers by calling setSource() for each source in 'container'. + * This function must be called any time the order of stages within the container changes, e.g. + * in optimizeContainer(). + */ + static void stitch(SourceContainer* container); + + /** * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is * empty. */ |