summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2016-08-10 20:20:19 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2016-08-10 20:20:19 -0400
commitfad11e0917e79e5cfa2bb744e9ec5d1bcb97a608 (patch)
tree32eba6f1cd9558d7f0aa2fe07a4135b9f6cbd445 /src
parenta0b7e4fc8cf224505267b2fe589975ba36f49078 (diff)
downloadmongo-fad11e0917e79e5cfa2bb744e9ec5d1bcb97a608.tar.gz
SERVER-24769 Add support for $lookup and $graphLookup on a view.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp74
-rw-r--r--src/mongo/db/pipeline/SConscript13
-rw-r--r--src/mongo/db/pipeline/document_source.h19
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp52
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp283
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp21
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp6
-rw-r--r--src/mongo/db/pipeline/expression_context.h13
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
9 files changed, 470 insertions, 23 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 5cb7b6ac887..2aa42c8f736 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include <deque>
#include <vector>
#include "mongo/base/init.h"
@@ -62,6 +63,7 @@
#include "mongo/db/views/view_sharding_check.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
+#include "mongo/util/string_map.h"
namespace mongo {
@@ -162,6 +164,72 @@ bool handleCursorCommand(OperationContext* txn,
return static_cast<bool>(cursor);
}
+StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNamespaces(
+ OperationContext* txn,
+ const boost::intrusive_ptr<Pipeline>& pipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ // We intentionally do not drop and reacquire our DB lock after resolving the view definition in
+ // order to prevent the definition for any view namespaces we've already resolved from changing.
+ // This is necessary to prevent a cycle from being formed among the view definitions cached in
+ // 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered.
+ AutoGetDb autoDb(txn, expCtx->ns.db(), MODE_IS);
+ ViewCatalog* viewCatalog = autoDb.getDb() ? autoDb.getDb()->getViewCatalog() : nullptr;
+
+ const auto& pipelineInvolvedNamespaces = pipeline->getInvolvedCollections();
+ std::deque<NamespaceString> involvedNamespacesQueue(pipelineInvolvedNamespaces.begin(),
+ pipelineInvolvedNamespaces.end());
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+
+ while (!involvedNamespacesQueue.empty()) {
+ auto involvedNs = std::move(involvedNamespacesQueue.front());
+ involvedNamespacesQueue.pop_front();
+
+ if (resolvedNamespaces.find(involvedNs.coll()) != resolvedNamespaces.end()) {
+ continue;
+ }
+
+ if (viewCatalog && viewCatalog->lookup(txn, involvedNs.ns())) {
+ // If the database exists and 'involvedNs' refers to a view namespace, then we resolve
+ // its definition.
+ auto resolvedView = viewCatalog->resolveView(txn, involvedNs);
+ if (!resolvedView.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': "
+ << resolvedView.getStatus().toString()};
+ }
+
+ resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(),
+ resolvedView.getValue().getPipeline()};
+
+ // We parse the pipeline corresponding to the resolved view in case we must resolve
+ // other view namespaces that are also involved.
+ auto resolvedViewPipeline =
+ Pipeline::parse(resolvedView.getValue().getPipeline(), expCtx);
+ if (!resolvedViewPipeline.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse definition for view '" << involvedNs.ns()
+ << "': "
+ << resolvedViewPipeline.getStatus().toString()};
+ }
+
+ const auto& resolvedViewInvolvedNamespaces =
+ resolvedViewPipeline.getValue()->getInvolvedCollections();
+ involvedNamespacesQueue.insert(involvedNamespacesQueue.end(),
+ resolvedViewInvolvedNamespaces.begin(),
+ resolvedViewInvolvedNamespaces.end());
+ } else {
+ // If the database exists and 'involvedNs' refers to a collection namespace, then we
+ // resolve it as an empty pipeline in order to read directly from the underlying
+ // collection. If the database doesn't exist, then we still resolve it as an empty
+ // pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent
+ // snapshot of the view catalog.
+ resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}};
+ }
+ }
+
+ return resolvedNamespaces;
+}
+
/**
* Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse().
* fasserts if it fails to parse after being serialized.
@@ -265,6 +333,12 @@ public:
}
auto pipeline = std::move(statusWithPipeline.getValue());
+ auto resolvedNamespaces = resolveInvolvedNamespaces(txn, pipeline, expCtx);
+ if (!resolvedNamespaces.isOK()) {
+ return appendCommandStatus(result, resolvedNamespaces.getStatus());
+ }
+ expCtx->resolvedNamespaces = std::move(resolvedNamespaces.getValue());
+
unique_ptr<ClientCursorPin> pin; // either this OR the exec will be non-null
unique_ptr<PlanExecutor> exec;
auto curOp = CurOp::get(txn);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index a6e03e82e58..e93717e96d9 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -274,6 +274,19 @@ env.Library(
],
)
+env.CppUnitTest(
+ target='document_source_graph_lookup_test',
+ source='document_source_graph_lookup_test.cpp',
+ LIBDEPS=[
+ 'document_source',
+ 'document_source_lookup',
+ 'document_value_test_util',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ ],
+)
+
env.Library(
target='document_source_facet',
source=[
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 32528244d87..a171281feb0 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -369,8 +369,6 @@ public:
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
- virtual bool hasUniqueIdIndex(const NamespaceString& ns) const = 0;
-
/**
* Appends operation latency statistics for collection "nss" to "builder"
*/
@@ -1744,6 +1742,9 @@ private:
// namespace.
boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+ // The aggregation pipeline to perform against the '_fromNs' namespace.
+ std::vector<BSONObj> _fromPipeline;
+
boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
@@ -1789,6 +1790,17 @@ public:
void doReattachToOperationContext(OperationContext* opCtx) final;
+ static boost::intrusive_ptr<DocumentSourceGraphLookUp> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ NamespaceString fromNs,
+ std::string asField,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<BSONObj> additionalFilter,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth);
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -1874,6 +1886,9 @@ private:
// namespace.
boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+ // The aggregation pipeline to perform against the '_from' namespace.
+ std::vector<BSONObj> _fromPipeline;
+
size_t _maxMemoryUsageBytes = 100 * 1024 * 1024;
// Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'.
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 5e75716e082..632165ffb51 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -57,9 +57,6 @@ const char* DocumentSourceGraphLookUp::getSourceName() const {
boost::optional<Document> DocumentSourceGraphLookUp::getNext() {
pExpCtx->checkForInterrupt();
- uassert(
- 40106, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from));
-
if (_unwind) {
return getNextUnwound();
}
@@ -171,8 +168,17 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// Query for all keys that were in the frontier and not in the cache, populating
// '_frontier' for the next iteration of search.
- auto pipeline = uassertStatusOK(_mongod->makePipeline({*matchStage}, _fromExpCtx));
+ // We've already allocated space for the trailing $match stage in '_fromPipeline'.
+ _fromPipeline.back() = *matchStage;
+ auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
while (auto next = pipeline->output()->getNext()) {
+ uassert(40271,
+ str::stream()
+ << "Documents in the '"
+ << _from.ns()
+ << "' namespace must contain an _id for de-duplication in $graphLookup",
+ !(*next)["_id"].missing());
+
BSONObj result = next->toBson();
shouldPerformAnotherQuery =
addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery;
@@ -416,7 +422,17 @@ void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool
}
void DocumentSourceGraphLookUp::doInjectExpressionContext() {
- _fromExpCtx = pExpCtx->copyWith(_from);
+ auto it = pExpCtx->resolvedNamespaces.find(_from.coll());
+ invariant(it != pExpCtx->resolvedNamespaces.end());
+ const auto& resolvedNamespace = it->second;
+ _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns);
+ _fromPipeline = resolvedNamespace.pipeline;
+
+ // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage
+ // we'll eventually construct from the input document.
+ _fromPipeline.reserve(_fromPipeline.size() + 1);
+ _fromPipeline.push_back(BSONObj());
+
_frontier = pExpCtx->getValueComparator().makeUnorderedValueSet();
_cache.setValueComparator(pExpCtx->getValueComparator());
}
@@ -451,6 +467,32 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
_visited(ValueComparator::kInstance.makeUnorderedValueMap<BSONObj>()),
_cache(expCtx->getValueComparator()) {}
+intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ NamespaceString fromNs,
+ std::string asField,
+ std::string connectFromField,
+ std::string connectToField,
+ intrusive_ptr<Expression> startWith,
+ boost::optional<BSONObj> additionalFilter,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth) {
+ intrusive_ptr<DocumentSourceGraphLookUp> source(
+ new DocumentSourceGraphLookUp(std::move(fromNs),
+ std::move(asField),
+ std::move(connectFromField),
+ std::move(connectToField),
+ std::move(startWith),
+ additionalFilter,
+ depthField,
+ maxDepth,
+ expCtx));
+ source->_variables.reset(new Variables());
+
+ source->injectExpressionContext(expCtx);
+ return std::move(source);
+}
+
intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
NamespaceString from;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
new file mode 100644
index 00000000000..24c5166a3bc
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source.h"
+
+#include <algorithm>
+#include <deque>
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+// Crutch.
+bool isMongos() {
+ return false;
+}
+
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceGraphLookUpTest = AggregationContextFixture;
+
+//
+// Evaluation.
+//
+
+/**
+ * A MongodInterface use for testing that supports making pipelines with an initial
+ * DocumentSourceMock source.
+ */
+class MockMongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface {
+public:
+ MockMongodImplementation(std::deque<Document> documents) : _documents(documents) {}
+
+ void setOperationContext(OperationContext* opCtx) final {
+ MONGO_UNREACHABLE;
+ }
+
+ DBClientBase* directClient() final {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(const NamespaceString& nss, BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<boost::intrusive_ptr<Pipeline>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_documents));
+ pipeline.getValue()->injectExpressionContext(expCtx);
+ pipeline.getValue()->optimizePipeline();
+
+ return pipeline;
+ }
+
+private:
+ std::deque<Document> _documents;
+};
+
+TEST_F(DocumentSourceGraphLookUpTest,
+ ShouldErrorWhenDoingInitialMatchIfDocumentInFromCollectionIsMissingId) {
+ auto expCtx = getExpCtx();
+
+ std::deque<Document> inputs{Document{{"_id", 0}}};
+ auto inputMock = DocumentSourceMock::create(std::move(inputs));
+
+ std::deque<Document> fromContents{Document{{"to", 0}}};
+
+ NamespaceString fromNs("test", "graph_lookup");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("_id"),
+ boost::none,
+ boost::none,
+ boost::none);
+ graphLookupStage->setSource(inputMock.get());
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+
+ ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271);
+}
+
+TEST_F(DocumentSourceGraphLookUpTest,
+ ShouldErrorWhenExploringGraphIfDocumentInFromCollectionIsMissingId) {
+ auto expCtx = getExpCtx();
+
+ std::deque<Document> inputs{Document{{"_id", 0}}};
+ auto inputMock = DocumentSourceMock::create(std::move(inputs));
+
+ std::deque<Document> fromContents{Document{{"_id", "a"}, {"to", 0}, {"from", 1}},
+ Document{{"to", 1}}};
+
+ NamespaceString fromNs("test", "graph_lookup");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("_id"),
+ boost::none,
+ boost::none,
+ boost::none);
+ graphLookupStage->setSource(inputMock.get());
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+
+ ASSERT_THROWS_CODE(graphLookupStage->getNext(), UserException, 40271);
+}
+
+TEST_F(DocumentSourceGraphLookUpTest,
+ ShouldErrorWhenHandlingUnwindIfDocumentInFromCollectionIsMissingId) {
+ auto expCtx = getExpCtx();
+
+ std::deque<Document> inputs{Document{{"_id", 0}}};
+ auto inputMock = DocumentSourceMock::create(std::move(inputs));
+
+ std::deque<Document> fromContents{Document{{"to", 0}}};
+
+ NamespaceString fromNs("test", "graph_lookup");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("_id"),
+ boost::none,
+ boost::none,
+ boost::none);
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+
+ auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none);
+ auto pipeline =
+ unittest::assertGet(Pipeline::create({inputMock, graphLookupStage, unwindStage}, expCtx));
+ pipeline->optimizePipeline();
+
+ ASSERT_THROWS_CODE(pipeline->output()->getNext(), UserException, 40271);
+}
+
+bool arrayContains(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const std::vector<Value>& arr,
+ const Value& elem) {
+ auto result = std::find_if(arr.begin(), arr.end(), [&expCtx, &elem](const Value& other) {
+ return expCtx->getValueComparator().evaluate(elem == other);
+ });
+ return result != arr.end();
+}
+
+TEST_F(DocumentSourceGraphLookUpTest,
+ ShouldTraverseSubgraphIfIdOfDocumentsInFromCollectionAreNonUnique) {
+ auto expCtx = getExpCtx();
+
+ std::deque<Document> inputs{Document{{"_id", 0}}};
+ auto inputMock = DocumentSourceMock::create(std::move(inputs));
+
+ Document to0from1{{"_id", "a"}, {"to", 0}, {"from", 1}};
+ Document to0from2{{"_id", "a"}, {"to", 0}, {"from", 2}};
+ Document to1{{"_id", "b"}, {"to", 1}};
+ Document to2{{"_id", "c"}, {"to", 2}};
+ std::deque<Document> fromContents{to1, to2, to0from1, to0from2};
+
+ NamespaceString fromNs("test", "graph_lookup");
+ expCtx->resolvedNamespaces[fromNs.coll()] = {fromNs, std::vector<BSONObj>{}};
+ auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx,
+ fromNs,
+ "results",
+ "from",
+ "to",
+ ExpressionFieldPath::create("_id"),
+ boost::none,
+ boost::none,
+ boost::none);
+ graphLookupStage->setSource(inputMock.get());
+ graphLookupStage->injectMongodInterface(
+ std::make_shared<MockMongodImplementation>(std::move(fromContents)));
+ auto pipeline = unittest::assertGet(Pipeline::create({inputMock, graphLookupStage}, expCtx));
+
+ auto next = pipeline->output()->getNext();
+ ASSERT(next);
+
+ ASSERT_EQ(2U, next->size());
+ ASSERT_VALUE_EQ(Value(0), next->getField("_id"));
+
+ auto resultsValue = next->getField("results");
+ ASSERT(resultsValue.isArray());
+ auto resultsArray = resultsValue.getArray();
+
+ // Since 'to0from1' and 'to0from2' have the same _id, we should end up only exploring the path
+ // through one of them.
+ if (arrayContains(expCtx, resultsArray, Value(to0from1))) {
+ // If 'to0from1' was returned, then we should see 'to1' and nothing else.
+ ASSERT(arrayContains(expCtx, resultsArray, Value(to1)));
+ ASSERT_EQ(2U, resultsArray.size());
+
+ next = pipeline->output()->getNext();
+ ASSERT(!next);
+ } else if (arrayContains(expCtx, resultsArray, Value(to0from2))) {
+ // If 'to0from2' was returned, then we should see 'to2' and nothing else.
+ ASSERT(arrayContains(expCtx, resultsArray, Value(to2)));
+ ASSERT_EQ(2U, resultsArray.size());
+
+ next = pipeline->output()->getNext();
+ ASSERT(!next);
+ } else {
+ FAIL(str::stream() << "Expected either [ " << to0from1.toString() << " ] or [ "
+ << to0from2.toString()
+ << " ] but found [ "
+ << next->toString()
+ << " ]");
+ }
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index dc413d8fff8..73d274eef06 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -88,7 +88,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const vector<Value>&
boost::optional<Document> DocumentSourceLookUp::getNext() {
pExpCtx->checkForInterrupt();
- uassert(4567, "from collection cannot be sharded", !_mongod->isSharded(_fromNs));
+ uassert(4567, "from collection cannot be sharded", !_mongod->isSharded(_fromExpCtx->ns));
if (!_additionalFilter && _matchSrc) {
// We have internalized a $match, but have not yet computed the descended $match that should
@@ -112,7 +112,9 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
auto matchStage =
makeMatchStageFromInput(*input, _localField, _foreignFieldFieldName, BSONObj());
- auto pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx));
+ // We've already allocated space for the trailing $match stage in '_fromPipeline'.
+ _fromPipeline.back() = matchStage;
+ auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
std::vector<Value> results;
int objsize = 0;
@@ -357,7 +359,9 @@ boost::optional<Document> DocumentSourceLookUp::unwindResult() {
BSONObj filter = _additionalFilter.value_or(BSONObj());
auto matchStage =
makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter);
- _pipeline = uassertStatusOK(_mongod->makePipeline({matchStage}, _fromExpCtx));
+ // We've already allocated space for the trailing $match stage in '_fromPipeline'.
+ _fromPipeline.back() = matchStage;
+ _pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
_cursorIndex = 0;
_nextValue = _pipeline->output()->getNext();
@@ -440,7 +444,16 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker*
}
void DocumentSourceLookUp::doInjectExpressionContext() {
- _fromExpCtx = pExpCtx->copyWith(_fromNs);
+ auto it = pExpCtx->resolvedNamespaces.find(_fromNs.coll());
+ invariant(it != pExpCtx->resolvedNamespaces.end());
+ const auto& resolvedNamespace = it->second;
+ _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns);
+ _fromPipeline = resolvedNamespace.pipeline;
+
+ // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage
+ // we'll eventually construct from the input document.
+ _fromPipeline.reserve(_fromPipeline.size() + 1);
+ _fromPipeline.push_back(BSONObj());
}
void DocumentSourceLookUp::doDetachFromOperationContext() {
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 494dbdb31ae..865bf08ec6c 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -35,6 +35,10 @@ namespace mongo {
using boost::intrusive_ptr;
+ExpressionContext::ResolvedNamespace::ResolvedNamespace(NamespaceString ns,
+ std::vector<BSONObj> pipeline)
+ : ns(std::move(ns)), pipeline(std::move(pipeline)) {}
+
ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request)
: isExplain(request.isExplain()),
inShard(request.isFromRouter()),
@@ -86,6 +90,8 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns)
expCtx->setCollator(_collator->clone());
}
+ expCtx->resolvedNamespaces = resolvedNamespaces;
+
// Note that we intentionally skip copying the value of 'interruptCounter' because 'expCtx' is
// intended to be used for executing a separate aggregation pipeline.
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 27708590b18..67114fa52b0 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -31,7 +31,9 @@
#include <boost/intrusive_ptr.hpp>
#include <memory>
#include <string>
+#include <vector>
+#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
@@ -39,11 +41,20 @@
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/util/intrusive_counter.h"
+#include "mongo/util/string_map.h"
namespace mongo {
struct ExpressionContext : public IntrusiveCounterUnsigned {
public:
+ struct ResolvedNamespace {
+ ResolvedNamespace() = default;
+ ResolvedNamespace(NamespaceString ns, std::vector<BSONObj> pipeline);
+
+ NamespaceString ns;
+ std::vector<BSONObj> pipeline;
+ };
+
ExpressionContext() = default;
ExpressionContext(OperationContext* opCtx, const AggregationRequest& request);
@@ -89,6 +100,8 @@ public:
// collation.
BSONObj collation;
+ StringMap<ResolvedNamespace> resolvedNamespaces;
+
static const int kInterruptCheckPeriod = 128;
int interruptCounter = kInterruptCheckPeriod; // when 0, check interruptStatus
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index a1e77179f2f..cc870e04bb1 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -114,18 +114,6 @@ public:
return collection->infoCache()->getIndexUsageStats();
}
- bool hasUniqueIdIndex(const NamespaceString& ns) const final {
- AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns());
- Collection* collection = ctx.getCollection();
-
- if (!collection) {
- // Collection doesn't exist; the correct return value is questionable.
- return false;
- }
-
- return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx);
- }
-
void appendLatencyStats(const NamespaceString& nss, BSONObjBuilder* builder) const final {
Top::get(_ctx->opCtx->getServiceContext()).appendLatencyStats(nss.ns(), builder);
}