diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 19 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup_test.cpp | 283 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 12 |
9 files changed, 470 insertions, 23 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 5cb7b6ac887..2aa42c8f736 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include <deque> #include <vector> #include "mongo/base/init.h" @@ -62,6 +63,7 @@ #include "mongo/db/views/view_sharding_check.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" +#include "mongo/util/string_map.h" namespace mongo { @@ -162,6 +164,72 @@ bool handleCursorCommand(OperationContext* txn, return static_cast<bool>(cursor); } +StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNamespaces( + OperationContext* txn, + const boost::intrusive_ptr<Pipeline>& pipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + // We intentionally do not drop and reacquire our DB lock after resolving the view definition in + // order to prevent the definition for any view namespaces we've already resolved from changing. + // This is necessary to prevent a cycle from being formed among the view definitions cached in + // 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered. + AutoGetDb autoDb(txn, expCtx->ns.db(), MODE_IS); + ViewCatalog* viewCatalog = autoDb.getDb() ? autoDb.getDb()->getViewCatalog() : nullptr; + + const auto& pipelineInvolvedNamespaces = pipeline->getInvolvedCollections(); + std::deque<NamespaceString> involvedNamespacesQueue(pipelineInvolvedNamespaces.begin(), + pipelineInvolvedNamespaces.end()); + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + + while (!involvedNamespacesQueue.empty()) { + auto involvedNs = std::move(involvedNamespacesQueue.front()); + involvedNamespacesQueue.pop_front(); + + if (resolvedNamespaces.find(involvedNs.coll()) != resolvedNamespaces.end()) { + continue; + } + + if (viewCatalog && viewCatalog->lookup(txn, involvedNs.ns())) { + // If the database exists and 'involvedNs' refers to a view namespace, then we resolve + // its definition. + auto resolvedView = viewCatalog->resolveView(txn, involvedNs); + if (!resolvedView.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': " + << resolvedView.getStatus().toString()}; + } + + resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(), + resolvedView.getValue().getPipeline()}; + + // We parse the pipeline corresponding to the resolved view in case we must resolve + // other view namespaces that are also involved. + auto resolvedViewPipeline = + Pipeline::parse(resolvedView.getValue().getPipeline(), expCtx); + if (!resolvedViewPipeline.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "Failed to parse definition for view '" << involvedNs.ns() + << "': " + << resolvedViewPipeline.getStatus().toString()}; + } + + const auto& resolvedViewInvolvedNamespaces = + resolvedViewPipeline.getValue()->getInvolvedCollections(); + involvedNamespacesQueue.insert(involvedNamespacesQueue.end(), + resolvedViewInvolvedNamespaces.begin(), + resolvedViewInvolvedNamespaces.end()); + } else { + // If the database exists and 'involvedNs' refers to a collection namespace, then we + // resolve it as an empty pipeline in order to read directly from the underlying + // collection. If the database doesn't exist, then we still resolve it as an empty + // pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent + // snapshot of the view catalog. + resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}}; + } + } + + return resolvedNamespaces; +} + /** * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). * fasserts if it fails to parse after being serialized. @@ -265,6 +333,12 @@ public: } auto pipeline = std::move(statusWithPipeline.getValue()); + auto resolvedNamespaces = resolveInvolvedNamespaces(txn, pipeline, expCtx); + if (!resolvedNamespaces.isOK()) { + return appendCommandStatus(result, resolvedNamespaces.getStatus()); + } + expCtx->resolvedNamespaces = std::move(resolvedNamespaces.getValue()); + unique_ptr<ClientCursorPin> pin; // either this OR the exec will be non-null unique_ptr<PlanExecutor> exec; auto curOp = CurOp::get(txn); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index a6e03e82e58..e93717e96d9 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -274,6 +274,19 @@ env.Library( ], ) +env.CppUnitTest( + target='document_source_graph_lookup_test', + source='document_source_graph_lookup_test.cpp', + LIBDEPS=[ + 'document_source', + 'document_source_lookup', + 'document_value_test_util', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/db/service_context_noop_init', + ], +) + env.Library( target='document_source_facet', source=[ diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 32528244d87..a171281feb0 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -369,8 +369,6 @@ public: virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; - virtual bool hasUniqueIdIndex(const NamespaceString& ns) const = 0; - /** * Appends operation latency statistics for collection "nss" to "builder" */ @@ -1744,6 +1742,9 @@ private: // namespace. boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + // The aggregation pipeline to perform against the '_fromNs' namespace. + std::vector<BSONObj> _fromPipeline; + boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; @@ -1789,6 +1790,17 @@ public: void doReattachToOperationContext(OperationContext* opCtx) final; + static boost::intrusive_ptr<DocumentSourceGraphLookUp> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + NamespaceString fromNs, + std::string asField, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<BSONObj> additionalFilter, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth); + static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); @@ -1874,6 +1886,9 @@ private: // namespace. boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + // The aggregation pipeline to perform against the '_from' namespace. + std::vector<BSONObj> _fromPipeline; + size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 5e75716e082..632165ffb51 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -57,9 +57,6 @@ const char* DocumentSourceGraphLookUp::getSourceName() const { boost::optional<Document> DocumentSourceGraphLookUp::getNext() { pExpCtx->checkForInterrupt(); - uassert( - 40106, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from)); - if (_unwind) { return getNextUnwound(); } @@ -171,8 +168,17 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // Query for all keys that were in the frontier and not in the cache, populating // '_frontier' for the next iteration of search. - auto pipeline = uassertStatusOK(_mongod->makePipeline({*matchStage}, _fromExpCtx)); + // We've already allocated space for the trailing $match stage in '_fromPipeline'. + _fromPipeline.back() = *matchStage; + auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); while (auto next = pipeline->output()->getNext()) { + uassert(40271, + str::stream() + << "Documents in the '" + << _from.ns() + << "' namespace must contain an _id for de-duplication in $graphLookup", + !(*next)["_id"].missing()); + BSONObj result = next->toBson(); shouldPerformAnotherQuery = addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; @@ -416,7 +422,17 @@ void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool } void DocumentSourceGraphLookUp::doInjectExpressionContext() { - _fromExpCtx = pExpCtx->copyWith(_from); + auto it = pExpCtx->resolvedNamespaces.find(_from.coll()); + invariant(it != pExpCtx->resolvedNamespaces.end()); + const auto& resolvedNamespace = it->second; + _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns); + _fromPipeline = resolvedNamespace.pipeline; + + // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage + // we'll eventually construct from the input document. + _fromPipeline.reserve(_fromPipeline.size() + 1); + _fromPipeline.push_back(BSONObj()); + _frontier = pExpCtx->getValueComparator().makeUnorderedValueSet(); _cache.setValueComparator(pExpCtx->getValueComparator()); } @@ -451,6 +467,32 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( _visited(ValueComparator::kInstance.makeUnorderedValueMap<BSONObj>()), _cache(expCtx->getValueComparator()) {} +intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create( + const intrusive_ptr<ExpressionContext>& expCtx, + NamespaceString fromNs, + std::string asField, + std::string connectFromField, + std::string connectToField, + intrusive_ptr<Expression> startWith, + boost::optional<BSONObj> additionalFilter, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth) { + intrusive_ptr<DocumentSourceGraphLookUp> source( + new DocumentSourceGraphLookUp(std::move(fromNs), + std::move(asField), + std::move(connectFromField), + std::move(connectToField), + std::move(startWith), + additionalFilter, + depthField, + maxDepth, + expCtx)); + source->_variables.reset(new Variables()); + + source->injectExpressionContext(expCtx); + return std::move(source); +} + intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { NamespaceString from; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp new file mode 100644 index 00000000000..24c5166a3bc --- /dev/null +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -0,0 +1,283 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source.h" + +#include <algorithm> +#include <deque> + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +// Crutch. +bool isMongos() { + return false; +} + +namespace { + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceGraphLookUpTest = AggregationContextFixture; + +// +// Evaluation. +// + +/** + * A MongodInterface use for testing that supports making pipelines with an initial + * DocumentSourceMock source. + */ +class MockMongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { +public: + MockMongodImplementation(std::deque<Document> documents) : _documents(documents) {} + + void setOperationContext(OperationContext* opCtx) final { + MONGO_UNREACHABLE; + } + + DBClientBase* directClient() final { + MONGO_UNREACHABLE; + } + + bool isSharded(const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(const NamespaceString& nss, BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + Status renameIfOptionsAndIndexesHaveNotChanged( + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final { + MONGO_UNREACHABLE; + } + + StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_documents)); + pipeline.getValue()->injectExpressionContext(expCtx); + pipeline.getValue()->optimizePipeline(); + + return pipeline; + } + +private: + std::deque<Document> _documents; +}; + +TEST_F(DocumentSourceGraphLookUpTest, + ShouldErrorWhenDoingInitialMatchIfDocumentInFromCollectionIsMissingId) { + auto expCtx = getExpCtx(); + + std::deque<Document> inputs{Document{{"_id", 0}}}; + auto inputMock = DocumentSourceMock::create(std::move(inputs)); + + std::deque<Document> fromContents{Document{{"to", 0}}}; + + NamespaceString fromNs("test", "graph_lookup"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("_id"), + boost::none, + boost::none, + boost::none); + graphLookupStage->setSource(inputMock.get()); + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + + ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271); +} + +TEST_F(DocumentSourceGraphLookUpTest, + ShouldErrorWhenExploringGraphIfDocumentInFromCollectionIsMissingId) { + auto expCtx = getExpCtx(); + + std::deque<Document> inputs{Document{{"_id", 0}}}; + auto inputMock = DocumentSourceMock::create(std::move(inputs)); + + std::deque<Document> fromContents{Document{{"_id", "a"}, {"to", 0}, {"from", 1}}, + Document{{"to", 1}}}; + + NamespaceString fromNs("test", "graph_lookup"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("_id"), + boost::none, + boost::none, + boost::none); + graphLookupStage->setSource(inputMock.get()); + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + + ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271); +} + +TEST_F(DocumentSourceGraphLookUpTest, + ShouldErrorWhenHandlingUnwindIfDocumentInFromCollectionIsMissingId) { + auto expCtx = getExpCtx(); + + std::deque<Document> inputs{Document{{"_id", 0}}}; + auto inputMock = DocumentSourceMock::create(std::move(inputs)); + + std::deque<Document> fromContents{Document{{"to", 0}}}; + + NamespaceString fromNs("test", "graph_lookup"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("_id"), + boost::none, + boost::none, + boost::none); + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + + auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none); + auto pipeline = + unittest::assertGet(Pipeline::create({inputMock, graphLookupStage, unwindStage}, expCtx)); + pipeline->optimizePipeline(); + + ASSERT_THROWS_CODE(pipeline->output()->getNext(), UserException, 40271); +} + +bool arrayContains(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const std::vector<Value>& arr, + const Value& elem) { + auto result = std::find_if(arr.begin(), arr.end(), [&expCtx, &elem](const Value& other) { + return expCtx->getValueComparator().evaluate(elem == other); + }); + return result != arr.end(); +} + +TEST_F(DocumentSourceGraphLookUpTest, + ShouldTraverseSubgraphIfIdOfDocumentsInFromCollectionAreNonUnique) { + auto expCtx = getExpCtx(); + + std::deque<Document> inputs{Document{{"_id", 0}}}; + auto inputMock = DocumentSourceMock::create(std::move(inputs)); + + Document to0from1{{"_id", "a"}, {"to", 0}, {"from", 1}}; + Document to0from2{{"_id", "a"}, {"to", 0}, {"from", 2}}; + Document to1{{"_id", "b"}, {"to", 1}}; + Document to2{{"_id", "c"}, {"to", 2}}; + std::deque<Document> fromContents{to1, to2, to0from1, to0from2}; + + NamespaceString fromNs("test", "graph_lookup"); + expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}}; + auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, + fromNs, + "results", + "from", + "to", + ExpressionFieldPath::create("_id"), + boost::none, + boost::none, + boost::none); + graphLookupStage->setSource(inputMock.get()); + graphLookupStage->injectMongodInterface( + std::make_shared<MockMongodImplementation>(std::move(fromContents))); + auto pipeline = unittest::assertGet(Pipeline::create({inputMock, graphLookupStage}, expCtx)); + + auto next = pipeline->output()->getNext(); + ASSERT(next); + + ASSERT_EQ(2U, next->size()); + ASSERT_VALUE_EQ(Value(0), next->getField("_id")); + + auto resultsValue = next->getField("results"); + ASSERT(resultsValue.isArray()); + auto resultsArray = resultsValue.getArray(); + + // Since 'to0from1' and 'to0from2' have the same _id, we should end up only exploring the path + // through one of them. + if (arrayContains(expCtx, resultsArray, Value(to0from1))) { + // If 'to0from1' was returned, then we should see 'to1' and nothing else. + ASSERT(arrayContains(expCtx, resultsArray, Value(to1))); + ASSERT_EQ(2U, resultsArray.size()); + + next = pipeline->output()->getNext(); + ASSERT(!next); + } else if (arrayContains(expCtx, resultsArray, Value(to0from2))) { + // If 'to0from2' was returned, then we should see 'to2' and nothing else. + ASSERT(arrayContains(expCtx, resultsArray, Value(to2))); + ASSERT_EQ(2U, resultsArray.size()); + + next = pipeline->output()->getNext(); + ASSERT(!next); + } else { + FAIL(str::stream() << "Expected either [ " << to0from1.toString() << " ] or [ " + << to0from2.toString() + << " ] but found [ " + << next->toString() + << " ]"); + } +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index dc413d8fff8..73d274eef06 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -88,7 +88,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const vector<Value>& boost::optional<Document> DocumentSourceLookUp::getNext() { pExpCtx->checkForInterrupt(); - uassert(4567, "from collection cannot be sharded", !_mongod->isSharded(_fromNs)); + uassert(4567, "from collection cannot be sharded", !_mongod->isSharded(_fromExpCtx->ns)); if (!_additionalFilter && _matchSrc) { // We have internalized a $match, but have not yet computed the descended $match that should @@ -112,7 +112,9 @@ boost::optional<Document> DocumentSourceLookUp::getNext() { auto matchStage = makeMatchStageFromInput(*input, _localField, _foreignFieldFieldName, BSONObj()); - auto pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx)); + // We've already allocated space for the trailing $match stage in '_fromPipeline'. + _fromPipeline.back() = matchStage; + auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); std::vector<Value> results; int objsize = 0; @@ -357,7 +359,9 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() { BSONObj filter = _additionalFilter.value_or(BSONObj()); auto matchStage = makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter); - _pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx)); + // We've already allocated space for the trailing $match stage in '_fromPipeline'. + _fromPipeline.back() = matchStage; + _pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); _cursorIndex = 0; _nextValue = _pipeline->output()->getNext(); @@ -440,7 +444,16 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* } void DocumentSourceLookUp::doInjectExpressionContext() { - _fromExpCtx = pExpCtx->copyWith(_fromNs); + auto it = pExpCtx->resolvedNamespaces.find(_fromNs.coll()); + invariant(it != pExpCtx->resolvedNamespaces.end()); + const auto& resolvedNamespace = it->second; + _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns); + _fromPipeline = resolvedNamespace.pipeline; + + // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage + // we'll eventually construct from the input document. + _fromPipeline.reserve(_fromPipeline.size() + 1); + _fromPipeline.push_back(BSONObj()); } void DocumentSourceLookUp::doDetachFromOperationContext() { diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 494dbdb31ae..865bf08ec6c 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -35,6 +35,10 @@ namespace mongo { using boost::intrusive_ptr; +ExpressionContext::ResolvedNamespace::ResolvedNamespace(NamespaceString ns, + std::vector<BSONObj> pipeline) + : ns(std::move(ns)), pipeline(std::move(pipeline)) {} + ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request) : isExplain(request.isExplain()), inShard(request.isFromRouter()), @@ -86,6 +90,8 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) expCtx->setCollator(_collator->clone()); } + expCtx->resolvedNamespaces = resolvedNamespaces; + // Note that we intentionally skip copying the value of 'interruptCounter' because 'expCtx' is // intended to be used for executing a separate aggregation pipeline. diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 27708590b18..67114fa52b0 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -31,7 +31,9 @@ #include <boost/intrusive_ptr.hpp> #include <memory> #include <string> +#include <vector> +#include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" @@ -39,11 +41,20 @@ #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/util/intrusive_counter.h" +#include "mongo/util/string_map.h" namespace mongo { struct ExpressionContext : public IntrusiveCounterUnsigned { public: + struct ResolvedNamespace { + ResolvedNamespace() = default; + ResolvedNamespace(NamespaceString ns, std::vector<BSONObj> pipeline); + + NamespaceString ns; + std::vector<BSONObj> pipeline; + }; + ExpressionContext() = default; ExpressionContext(OperationContext* opCtx, const AggregationRequest& request); @@ -89,6 +100,8 @@ public: // collation. BSONObj collation; + StringMap<ResolvedNamespace> resolvedNamespaces; + static const int kInterruptCheckPeriod = 128; int interruptCounter = kInterruptCheckPeriod; // when 0, check interruptStatus diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index a1e77179f2f..cc870e04bb1 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -114,18 +114,6 @@ public: return collection->infoCache()->getIndexUsageStats(); } - bool hasUniqueIdIndex(const NamespaceString& ns) const final { - AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); - Collection* collection = ctx.getCollection(); - - if (!collection) { - // Collection doesn't exist; the correct return value is questionable. - return false; - } - - return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx); - } - void appendLatencyStats(const NamespaceString& nss, BSONObjBuilder* builder) const final { Top::get(_ctx->opCtx->getServiceContext()).appendLatencyStats(nss.ns(), builder); } |