summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp160
1 files changed, 101 insertions, 59 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 95c33761454..25fe9668ce0 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -50,6 +51,66 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
+namespace {
+
+/**
+ * Constructs a query of the following shape:
+ * {$or: [
+ * {'fieldName': {$eq: 'values[0]'}},
+ * {'fieldName': {$eq: 'values[1]'}},
+ * ...
+ * ]}
+ */
+BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
+ BSONObjBuilder orBuilder;
+ {
+ BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
+ for (auto&& value : values) {
+ orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
+ }
+ }
+ return orBuilder.obj();
+}
+
+void lookupPipeValidator(const Pipeline& pipeline) {
+ const auto& sources = pipeline.getSources();
+ std::for_each(sources.begin(), sources.end(), [](auto& src) {
+ uassert(51047,
+ str::stream() << src->getSourceName()
+ << " is not allowed within a $lookup's sub-pipeline",
+ src->constraints().isAllowedInLookupPipeline());
+ });
+}
+
+bool foreignShardedLookupAllowed() {
+ return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
+}
+
+// Parses $lookup 'from' field. The 'from' field must be a string or an object in the form of
+// {from: {db: "config", coll: "cache.chunks.*}, ...}.
+NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$lookup 'from' field must be either a string or an object, but found "
+ << typeName(elem.type()),
+ elem.type() == BSONType::String || elem.type() == BSONType::Object);
+
+ if (elem.type() == BSONType::String) {
+ return NamespaceString(defaultDb, elem.valueStringData());
+ }
+
+ // Valdate the db and coll names.
+ auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject());
+ auto nss = NamespaceString(spec.getDb().value_or(""), spec.getColl().value_or(""));
+ uassert(
+ ErrorCodes::FailedToParse,
+ str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: "
+ << nss.db() << " and coll: " << nss.coll(),
+ nss.isConfigDotCacheDotChunks());
+ return nss;
+}
+
+} // namespace
+
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
@@ -120,12 +181,7 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
uassert(ErrorCodes::FailedToParse,
str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
fromElement);
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "'from' option to $lookup must be a string, but was type "
- << typeName(specObj["from"].type()),
- fromElement.type() == BSONType::String);
-
- NamespaceString fromNss(nss.db(), fromElement.valueStringData());
+ auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
fromNss.isValid());
@@ -176,18 +232,19 @@ const char* DocumentSourceLookUp::getSourceName() const {
}
StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
- // If executing on mongos and the foreign collection is sharded, then this stage can run on
- // mongos or any shard.
- HostTypeRequirement hostRequirement =
- (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
- ? HostTypeRequirement::kNone
- : HostTypeRequirement::kPrimaryShard;
-
- const bool foreignShardedAllowed =
- getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
- if (!foreignShardedAllowed) {
- // Always run on the primary shard.
- hostRequirement = HostTypeRequirement::kPrimaryShard;
+ HostTypeRequirement hostRequirement;
+ if (_fromNs.isConfigDotCacheDotChunks()) {
+ // $lookup from config.cache.chunks* namespaces is permitted to run on each individual
+ // shard, rather than just the primary, since each shard should have an identical copy of
+ // the namespace.
+ hostRequirement = HostTypeRequirement::kAnyShard;
+ } else {
+ // When $lookup on sharded foreign collections is allowed, the foreign collection is
+ // sharded, and the stage is executing on mongos, the stage can run on mongos or any shard.
+ hostRequirement = (foreignShardedLookupAllowed() && pExpCtx->inMongos &&
+ pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
+ ? HostTypeRequirement::kNone
+ : HostTypeRequirement::kPrimaryShard;
}
// By default, $lookup is allowed in a transaction and does not use disk.
@@ -213,42 +270,6 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
return constraints;
}
-namespace {
-
-/**
- * Constructs a query of the following shape:
- * {$or: [
- * {'fieldName': {$eq: 'values[0]'}},
- * {'fieldName': {$eq: 'values[1]'}},
- * ...
- * ]}
- */
-BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
- BSONObjBuilder orBuilder;
- {
- BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
- for (auto&& value : values) {
- orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
- }
- }
- return orBuilder.obj();
-}
-
-void lookupPipeValidator(const Pipeline& pipeline) {
- const auto& sources = pipeline.getSources();
- std::for_each(sources.begin(), sources.end(), [](auto& src) {
- uassert(51047,
- str::stream() << src->getSourceName()
- << " is not allowed within a $lookup's sub-pipeline",
- src->constraints().isAllowedInLookupPipeline());
- });
-}
-
-bool foreignShardedLookupAllowed() {
- return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
-}
-} // namespace
-
DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
if (_unwindSrc) {
return unwindResult();
@@ -685,6 +706,12 @@ void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() {
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
+
+ // Support alternative $lookup from config.cache.chunks* namespaces.
+ auto fromValue = (pExpCtx->ns.db() == _fromNs.db())
+ ? Value(_fromNs.coll())
+ : Value(Document{{"db", _fromNs.db()}, {"coll", _fromNs.coll()}});
+
if (wasConstructedWithPipelineSyntax()) {
MutableDocument exprList;
for (auto letVar : _letVariables) {
@@ -698,13 +725,13 @@ void DocumentSourceLookUp::serializeToArray(
}
doc = Document{{getSourceName(),
- Document{{"from", _fromNs.coll()},
+ Document{{"from", fromValue},
{"as", _as.fullPath()},
{"let", exprList.freeze()},
{"pipeline", pipeline}}}};
} else {
doc = Document{{getSourceName(),
- {Document{{"from", _fromNs.coll()},
+ {Document{{"from", fromValue},
{"as", _as.fullPath()},
{"localField", _localField->fullPath()},
{"foreignField", _foreignField->fullPath()}}}}};
@@ -781,6 +808,18 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons
return DepsTracker::State::SEE_NEXT;
}
+boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceLookUp::distributedPlanLogic() {
+ if (_fromExpCtx->ns.isConfigDotCacheDotChunks()) {
+ // When $lookup reads from config.cache.chunks.* namespaces, it should run on each
+ // individual shard in parallel. This is a special case, and atypical for standard $lookup
+ // since a full copy of config.cache.chunks.* collections exists on all shards.
+ return boost::none;
+ }
+
+ // {shardsStage, mergingStage, sortPattern}
+ return DistributedPlanLogic{nullptr, this, boost::none};
+}
+
void DocumentSourceLookUp::detachFromOperationContext() {
if (_pipeline) {
// We have a pipeline we're going to be executing across multiple calls to getNext(), so we
@@ -845,14 +884,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
continue;
}
+ if (argName == "from") {
+ fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db());
+ continue;
+ }
+
uassert(ErrorCodes::FailedToParse,
str::stream() << "$lookup argument '" << argName << "' must be a string, found "
<< argument << ": " << argument.type(),
argument.type() == BSONType::String);
- if (argName == "from") {
- fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
- } else if (argName == "as") {
+ if (argName == "as") {
as = argument.String();
} else if (argName == "localField") {
localField = argument.String();