summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-07-21 19:16:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-07 22:18:46 +0000
commit426ed301b2e61107e82822764a855feb27168742 (patch)
tree4f5a38722f2e658c80ba2be3da8935e9c84415e7 /src
parentf0bd888de22307dad1825323cadd3b708f670ae0 (diff)
downloadmongo-426ed301b2e61107e82822764a855feb27168742.tar.gz
SERVER-49290 Support running $lookup locally on shard for config.cache.chunks.* namespaces
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/namespace_string.cpp6
-rw-r--r--src/mongo/db/namespace_string.h5
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp160
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h5
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp176
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp50
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp11
7 files changed, 342 insertions, 71 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index f2b8f148779..6928d8da953 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -231,12 +231,16 @@ bool NamespaceString::isNamespaceAlwaysUnsharded() const {
return true;
if (ns() == "config.cache.databases" || ns() == "config.cache.collections" ||
- (db() == "config" && coll().startsWith("cache.chunks")))
+ isConfigDotCacheDotChunks())
return true;
return false;
}
+bool NamespaceString::isConfigDotCacheDotChunks() const {
+ return db() == "config" && coll().startsWith("cache.chunks.");
+}
+
bool NamespaceString::isReplicated() const {
if (isLocal()) {
return false;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 8f5f3feb49b..7c65d4c8871 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -280,6 +280,11 @@ public:
bool isNamespaceAlwaysUnsharded() const;
/**
+ * Returns whether the specified namespace is config.cache.chunks.<>.
+ */
+ bool isConfigDotCacheDotChunks() const;
+
+ /**
* Returns whether a namespace is replicated, based only on its string value. One notable
* omission is that map reduce `tmp.mr` collections may or may not be replicated. Callers must
* decide how to handle that case separately.
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 95c33761454..25fe9668ce0 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -50,6 +51,66 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
+namespace {
+
+/**
+ * Constructs a query of the following shape:
+ * {$or: [
+ * {'fieldName': {$eq: 'values[0]'}},
+ * {'fieldName': {$eq: 'values[1]'}},
+ * ...
+ * ]}
+ */
+BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
+ BSONObjBuilder orBuilder;
+ {
+ BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
+ for (auto&& value : values) {
+ orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
+ }
+ }
+ return orBuilder.obj();
+}
+
+void lookupPipeValidator(const Pipeline& pipeline) {
+ const auto& sources = pipeline.getSources();
+ std::for_each(sources.begin(), sources.end(), [](auto& src) {
+ uassert(51047,
+ str::stream() << src->getSourceName()
+ << " is not allowed within a $lookup's sub-pipeline",
+ src->constraints().isAllowedInLookupPipeline());
+ });
+}
+
+bool foreignShardedLookupAllowed() {
+ return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
+}
+
+// Parses $lookup 'from' field. The 'from' field must be a string or an object in the form of
+// {from: {db: "config", coll: "cache.chunks.*}, ...}.
+NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$lookup 'from' field must be either a string or an object, but found "
+ << typeName(elem.type()),
+ elem.type() == BSONType::String || elem.type() == BSONType::Object);
+
+ if (elem.type() == BSONType::String) {
+ return NamespaceString(defaultDb, elem.valueStringData());
+ }
+
+ // Valdate the db and coll names.
+ auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
+ auto nss = NamespaceString(spec.getDb().value_or(""), spec.getColl().value_or(""));
+ uassert(
+ ErrorCodes::FailedToParse,
+ str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: "
+ << nss.db() << " and coll: " << nss.coll(),
+ nss.isConfigDotCacheDotChunks());
+ return nss;
+}
+
+} // namespace
+
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
@@ -120,12 +181,7 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
uassert(ErrorCodes::FailedToParse,
str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
fromElement);
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "'from' option to $lookup must be a string, but was type "
- << typeName(specObj["from"].type()),
- fromElement.type() == BSONType::String);
-
- NamespaceString fromNss(nss.db(), fromElement.valueStringData());
+ auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
fromNss.isValid());
@@ -176,18 +232,19 @@ const char* DocumentSourceLookUp::getSourceName() const {
}
StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
- // If executing on mongos and the foreign collection is sharded, then this stage can run on
- // mongos or any shard.
- HostTypeRequirement hostRequirement =
- (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
- ? HostTypeRequirement::kNone
- : HostTypeRequirement::kPrimaryShard;
-
- const bool foreignShardedAllowed =
- getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
- if (!foreignShardedAllowed) {
- // Always run on the primary shard.
- hostRequirement = HostTypeRequirement::kPrimaryShard;
+ HostTypeRequirement hostRequirement;
+ if (_fromNs.isConfigDotCacheDotChunks()) {
+ // $lookup from config.cache.chunks* namespaces is permitted to run on each individual
+ // shard, rather than just the primary, since each shard should have an identical copy of
+ // the namespace.
+ hostRequirement = HostTypeRequirement::kAnyShard;
+ } else {
+ // When $lookup on sharded foreign collections is allowed, the foreign collection is
+ // sharded, and the stage is executing on mongos, the stage can run on mongos or any shard.
+ hostRequirement = (foreignShardedLookupAllowed() && pExpCtx->inMongos &&
+ pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
+ ? HostTypeRequirement::kNone
+ : HostTypeRequirement::kPrimaryShard;
}
// By default, $lookup is allowed in a transaction and does not use disk.
@@ -213,42 +270,6 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
return constraints;
}
-namespace {
-
-/**
- * Constructs a query of the following shape:
- * {$or: [
- * {'fieldName': {$eq: 'values[0]'}},
- * {'fieldName': {$eq: 'values[1]'}},
- * ...
- * ]}
- */
-BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
- BSONObjBuilder orBuilder;
- {
- BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
- for (auto&& value : values) {
- orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
- }
- }
- return orBuilder.obj();
-}
-
-void lookupPipeValidator(const Pipeline& pipeline) {
- const auto& sources = pipeline.getSources();
- std::for_each(sources.begin(), sources.end(), [](auto& src) {
- uassert(51047,
- str::stream() << src->getSourceName()
- << " is not allowed within a $lookup's sub-pipeline",
- src->constraints().isAllowedInLookupPipeline());
- });
-}
-
-bool foreignShardedLookupAllowed() {
- return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
-}
-} // namespace
-
DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
if (_unwindSrc) {
return unwindResult();
@@ -685,6 +706,12 @@ void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() {
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
+
+ // Support alternative $lookup from config.cache.chunks* namespaces.
+ auto fromValue = (pExpCtx->ns.db() == _fromNs.db())
+ ? Value(_fromNs.coll())
+ : Value(Document{{"db", _fromNs.db()}, {"coll", _fromNs.coll()}});
+
if (wasConstructedWithPipelineSyntax()) {
MutableDocument exprList;
for (auto letVar : _letVariables) {
@@ -698,13 +725,13 @@ void DocumentSourceLookUp::serializeToArray(
}
doc = Document{{getSourceName(),
- Document{{"from", _fromNs.coll()},
+ Document{{"from", fromValue},
{"as", _as.fullPath()},
{"let", exprList.freeze()},
{"pipeline", pipeline}}}};
} else {
doc = Document{{getSourceName(),
- {Document{{"from", _fromNs.coll()},
+ {Document{{"from", fromValue},
{"as", _as.fullPath()},
{"localField", _localField->fullPath()},
{"foreignField", _foreignField->fullPath()}}}}};
@@ -781,6 +808,18 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons
return DepsTracker::State::SEE_NEXT;
}
+boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceLookUp::distributedPlanLogic() {
+ if (_fromExpCtx->ns.isConfigDotCacheDotChunks()) {
+ // When $lookup reads from config.cache.chunks.* namespaces, it should run on each
+ // individual shard in parallel. This is a special case, and atypical for standard $lookup
+ // since a full copy of config.cache.chunks.* collections exists on all shards.
+ return boost::none;
+ }
+
+ // {shardsStage, mergingStage, sortPattern}
+ return DistributedPlanLogic{nullptr, this, boost::none};
+}
+
void DocumentSourceLookUp::detachFromOperationContext() {
if (_pipeline) {
// We have a pipeline we're going to be executing across multiple calls to getNext(), so we
@@ -845,14 +884,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
continue;
}
+ if (argName == "from") {
+ fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db());
+ continue;
+ }
+
uassert(ErrorCodes::FailedToParse,
str::stream() << "$lookup argument '" << argName << "' must be a string, found "
<< argument << ": " << argument.type(),
argument.type() == BSONType::String);
- if (argName == "from") {
- fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
- } else if (argName == "as") {
+ if (argName == "as") {
as = argument.String();
} else if (argName == "localField") {
localField = argument.String();
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index e36be9f2762..63e17f643f5 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -105,10 +105,7 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{nullptr, this, boost::none};
- }
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 781248b4930..baa1fd032d0 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -30,12 +30,14 @@
#include "mongo/platform/basic.h"
#include <boost/intrusive_ptr.hpp>
+#include <boost/optional/optional_io.hpp>
#include <deque>
#include <vector>
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/exec/document_value/value.h"
@@ -430,6 +432,180 @@ TEST_F(DocumentSourceLookUpTest, ShouldBeAbleToReParseSerializedStage) {
ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
}
+// Tests that $lookup with special 'from' syntax from: {db: config, coll: cache.chunks.*} can
+// be round tripped.
+TEST_F(DocumentSourceLookUpTest, LookupReParseSerializedStageWithFromDBAndColl) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("config", "cache.chunks.test.foo");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ auto originalBSON = BSON("$lookup" << BSON("from" << BSON("db"
+ << "config"
+ << "coll"
+ << "cache.chunks.test.foo")
+ << "localField"
+ << "x"
+ << "foreignField"
+ << "id"
+ << "as"
+ << "results"));
+ auto lookupStage = DocumentSourceLookUp::createFromBson(originalBSON.firstElement(), expCtx);
+
+ //
+ // Serialize the $lookup stage and confirm contents.
+ //
+ vector<Value> serialization;
+ static const UnorderedFieldsBSONObjComparator kComparator;
+ lookupStage->serializeToArray(serialization);
+ auto serializedBSON = serialization[0].getDocument().toBson();
+ ASSERT_EQ(kComparator.compare(serializedBSON, originalBSON), 0);
+
+ auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBSON.firstElement(), expCtx);
+
+ vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
+// Tests that $lookup with 'let' and special 'from' syntax from: {db: config, coll: cache.chunks.*}
+// can be round tripped.
+TEST_F(DocumentSourceLookUpTest, LookupWithLetReParseSerializedStageWithFromDBAndColl) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("config", "cache.chunks.test.foo");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ auto originalBSON =
+ BSON("$lookup" << BSON("from" << BSON("db"
+ << "config"
+ << "coll"
+ << "cache.chunks.test.foo")
+ << "let"
+ << BSON("local_x"
+ << "$x")
+ << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))
+ << "as"
+ << "as"));
+ auto lookupStage = DocumentSourceLookUp::createFromBson(originalBSON.firstElement(), expCtx);
+
+ //
+ // Serialize the $lookup stage and confirm contents.
+ //
+ vector<Value> serialization;
+ static const UnorderedFieldsBSONObjComparator kComparator;
+ lookupStage->serializeToArray(serialization);
+ auto serializedBSON = serialization[0].getDocument().toBson();
+ ASSERT_EQ(kComparator.compare(serializedBSON, originalBSON), 0);
+
+ auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBSON.firstElement(), expCtx);
+
+ vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
+// $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace that isn't
+// config.cache.chunks*.
+TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadDBAndColl) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ ASSERT_THROWS_CODE(
+ DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: {db: 'test', coll: 'coll'}, as: 'as', pipeline: []}}")
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+}
+
+// $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace when "coll" is
+// "cache.chunks.*" but "db" is not "config".
+TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadColl) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("config", "coll");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ ASSERT_THROWS_CODE(
+ DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: {db: 'config', coll: 'coll'}, as: 'as', pipeline: []}}")
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+}
+
+// $lookup : {from : {db: <>, coll: <>}} syntax fails when "db" is config but "coll" is
+// not "cache.chunks.*".
+TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadDB) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "cache.chunks.test.foo");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ ASSERT_THROWS_CODE(DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: {db: 'test', coll: 'cache.chunks.test.foo'}, "
+ "as: 'as', pipeline: []}}")
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+}
+
+// Tests that $lookup distributedPlanLogic() is boost::none, allowing for the stage to run on each
+// shard, when it reads from config.cache.chunks.* namespaces using from: {db: <> , coll: <> }
+// syntax.
+TEST_F(DocumentSourceLookUpTest, FromDBAndCollDistributedPlanLogic) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("config", "cache.chunks.test.foo");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from" << BSON("db"
+ << "config"
+ << "coll"
+ << "cache.chunks.test.foo")
+ << "let"
+ << BSON("local_x"
+ << "$x")
+ << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+
+ ASSERT(!lookupStage->distributedPlanLogic());
+}
+
+// Tests $lookup distributedPlanLogic() is prohibited from executing on the shardsStage for standard
+// $lookup with from: <string> syntax.
+TEST_F(DocumentSourceLookUpTest, LookupDistributedPlanLogic) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from"
+ << "coll"
+ << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+ ASSERT(lookupStage->distributedPlanLogic());
+ ASSERT(lookupStage->distributedPlanLogic()->shardsStage == nullptr);
+ ASSERT(lookupStage->distributedPlanLogic()->mergingStage != nullptr);
+}
+
TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) {
auto input = Document{{"local", 1}};
BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 239924ce42e..1328409f11c 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2321,6 +2321,11 @@ public:
virtual string shardPipeJson() = 0;
virtual string mergePipeJson() = 0;
+ // Allows tests to override the default resolvedNamespaces.
+ virtual NamespaceString getLookupCollNs() {
+ return NamespaceString("a", "lookupColl");
+ }
+
BSONObj pipelineFromJsonArray(const string& array) {
return fromjson("{pipeline: " + array + "}");
}
@@ -2342,7 +2347,7 @@ public:
// For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the
// operations will be able to have a resolved view definition.
- NamespaceString lookupCollNs("a", "lookupColl");
+ auto lookupCollNs = getLookupCollNs();
ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}});
// Test that we can both split the pipeline and reassemble it into its original form.
@@ -2390,6 +2395,47 @@ class Empty : public Base {
}
};
+// Since each shard has an identical copy of config.cache.chunks.* namespaces, $lookup from
+// config.cache.chunks.* should run on each shard in parallel.
+namespace lookupFromShardsInParallel {
+class LookupWithDBAndColl : public Base {
+ string inputPipeJson() {
+ return "[{$lookup: {from: {db: 'config', coll: 'cache.chunks.test.foo'}, as: 'results', "
+ "localField: 'x', foreignField: '_id'}}]";
+ }
+ string shardPipeJson() {
+ return inputPipeJson();
+ }
+
+ string mergePipeJson() {
+ return "[]";
+ }
+
+ NamespaceString getLookupCollNs() override {
+ return {"config", "cache.chunks.test.foo"};
+ }
+};
+
+class LookupWithLetWithDBAndColl : public Base {
+ string inputPipeJson() {
+ return "[{$lookup: {from: {db: 'config', coll: 'cache.chunks.test.foo'}, as: 'results', "
+ "let: {x_field: '$x'}, pipeline: []}}]";
+ }
+ string shardPipeJson() {
+ return inputPipeJson();
+ }
+
+ string mergePipeJson() {
+ return "[]";
+ }
+
+ NamespaceString getLookupCollNs() override {
+ return {"config", "cache.chunks.test.foo"};
+ }
+};
+
+} // namespace lookupFromShardsInParallel
+
namespace moveFinalUnwindFromShardsToMerger {
class OneUnwind : public Base {
@@ -4030,6 +4076,8 @@ public:
add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::
ShardedMatchSortProjLimBecomesMatchTopKSortProj>();
add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::ShardAlreadyExhaustive>();
+ add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithDBAndColl>();
+ add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithLetWithDBAndColl>();
add<Optimizations::Sharded::needsPrimaryShardMerger::Out>();
add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithUnshardedCollection>();
add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithShardedCollection>();
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 1058c618cfd..e9b0e3f48ae 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1146,13 +1146,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
return shardVersionRetry(
expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() {
auto pipelineToTarget = pipeline->clone();
- if (!allowTargetingShards || expCtx->ns.db() == "local") {
- // If the db is local, this may be a change stream examining the oplog. We know the
- // oplog (and any other local collections) will not be sharded.
- return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
- pipelineToTarget.release());
+ if (allowTargetingShards && !expCtx->ns.isConfigDotCacheDotChunks() &&
+ expCtx->ns.db() != "local") {
+ return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release());
}
- return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release());
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ pipelineToTarget.release());
});
}