diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 160 |
1 files changed, 101 insertions, 59 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 95c33761454..25fe9668ce0 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -39,6 +39,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_merge_gen.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/query/query_knobs_gen.h" @@ -50,6 +51,66 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; +namespace { + +/** + * Constructs a query of the following shape: + * {$or: [ + * {'fieldName': {$eq: 'values[0]'}}, + * {'fieldName': {$eq: 'values[1]'}}, + * ... + * ]} + */ +BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) { + BSONObjBuilder orBuilder; + { + BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or")); + for (auto&& value : values) { + orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value))); + } + } + return orBuilder.obj(); +} + +void lookupPipeValidator(const Pipeline& pipeline) { + const auto& sources = pipeline.getSources(); + std::for_each(sources.begin(), sources.end(), [](auto& src) { + uassert(51047, + str::stream() << src->getSourceName() + << " is not allowed within a $lookup's sub-pipeline", + src->constraints().isAllowedInLookupPipeline()); + }); +} + +bool foreignShardedLookupAllowed() { + return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); +} + +// Parses $lookup 'from' field. The 'from' field must be a string or an object in the form of +// {from: {db: "config", coll: "cache.chunks.*}, ...}. +NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "$lookup 'from' field must be either a string or an object, but found " + << typeName(elem.type()), + elem.type() == BSONType::String || elem.type() == BSONType::Object); + + if (elem.type() == BSONType::String) { + return NamespaceString(defaultDb, elem.valueStringData()); + } + + // Valdate the db and coll names. + auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject()); + auto nss = NamespaceString(spec.getDb().value_or(""), spec.getColl().value_or("")); + uassert( + ErrorCodes::FailedToParse, + str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: " + << nss.db() << " and coll: " << nss.coll(), + nss.isConfigDotCacheDotChunks()); + return nss; +} + +} // namespace + DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& expCtx) @@ -120,12 +181,7 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars uassert(ErrorCodes::FailedToParse, str::stream() << "missing 'from' option to $lookup stage specification: " << specObj, fromElement); - uassert(ErrorCodes::FailedToParse, - str::stream() << "'from' option to $lookup must be a string, but was type " - << typeName(specObj["from"].type()), - fromElement.type() == BSONType::String); - - NamespaceString fromNss(nss.db(), fromElement.valueStringData()); + auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "invalid $lookup namespace: " << fromNss.ns(), fromNss.isValid()); @@ -176,18 +232,19 @@ const char* DocumentSourceLookUp::getSourceName() const { } StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { - // If executing on mongos and the foreign collection is sharded, then this stage can run on - // mongos or any shard. - HostTypeRequirement hostRequirement = - (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) - ? HostTypeRequirement::kNone - : HostTypeRequirement::kPrimaryShard; - - const bool foreignShardedAllowed = - getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); - if (!foreignShardedAllowed) { - // Always run on the primary shard. - hostRequirement = HostTypeRequirement::kPrimaryShard; + HostTypeRequirement hostRequirement; + if (_fromNs.isConfigDotCacheDotChunks()) { + // $lookup from config.cache.chunks* namespaces is permitted to run on each individual + // shard, rather than just the primary, since each shard should have an identical copy of + // the namespace. + hostRequirement = HostTypeRequirement::kAnyShard; + } else { + // When $lookup on sharded foreign collections is allowed, the foreign collection is + // sharded, and the stage is executing on mongos, the stage can run on mongos or any shard. + hostRequirement = (foreignShardedLookupAllowed() && pExpCtx->inMongos && + pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) + ? HostTypeRequirement::kNone + : HostTypeRequirement::kPrimaryShard; } // By default, $lookup is allowed in a transaction and does not use disk. @@ -213,42 +270,6 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { return constraints; } -namespace { - -/** - * Constructs a query of the following shape: - * {$or: [ - * {'fieldName': {$eq: 'values[0]'}}, - * {'fieldName': {$eq: 'values[1]'}}, - * ... - * ]} - */ -BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) { - BSONObjBuilder orBuilder; - { - BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or")); - for (auto&& value : values) { - orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value))); - } - } - return orBuilder.obj(); -} - -void lookupPipeValidator(const Pipeline& pipeline) { - const auto& sources = pipeline.getSources(); - std::for_each(sources.begin(), sources.end(), [](auto& src) { - uassert(51047, - str::stream() << src->getSourceName() - << " is not allowed within a $lookup's sub-pipeline", - src->constraints().isAllowedInLookupPipeline()); - }); -} - -bool foreignShardedLookupAllowed() { - return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); -} -} // namespace - DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { if (_unwindSrc) { return unwindResult(); @@ -685,6 +706,12 @@ void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() { void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Document doc; + + // Support alternative $lookup from config.cache.chunks* namespaces. + auto fromValue = (pExpCtx->ns.db() == _fromNs.db()) + ? Value(_fromNs.coll()) + : Value(Document{{"db", _fromNs.db()}, {"coll", _fromNs.coll()}}); + if (wasConstructedWithPipelineSyntax()) { MutableDocument exprList; for (auto letVar : _letVariables) { @@ -698,13 +725,13 @@ void DocumentSourceLookUp::serializeToArray( } doc = Document{{getSourceName(), - Document{{"from", _fromNs.coll()}, + Document{{"from", fromValue}, {"as", _as.fullPath()}, {"let", exprList.freeze()}, {"pipeline", pipeline}}}}; } else { doc = Document{{getSourceName(), - {Document{{"from", _fromNs.coll()}, + {Document{{"from", fromValue}, {"as", _as.fullPath()}, {"localField", _localField->fullPath()}, {"foreignField", _foreignField->fullPath()}}}}}; @@ -781,6 +808,18 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons return DepsTracker::State::SEE_NEXT; } +boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceLookUp::distributedPlanLogic() { + if (_fromExpCtx->ns.isConfigDotCacheDotChunks()) { + // When $lookup reads from config.cache.chunks.* namespaces, it should run on each + // individual shard in parallel. This is a special case, and atypical for standard $lookup + // since a full copy of config.cache.chunks.* collections exists on all shards. + return boost::none; + } + + // {shardsStage, mergingStage, sortPattern} + return DistributedPlanLogic{nullptr, this, boost::none}; +} + void DocumentSourceLookUp::detachFromOperationContext() { if (_pipeline) { // We have a pipeline we're going to be executing across multiple calls to getNext(), so we @@ -845,14 +884,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( continue; } + if (argName == "from") { + fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db()); + continue; + } + uassert(ErrorCodes::FailedToParse, str::stream() << "$lookup argument '" << argName << "' must be a string, found " << argument << ": " << argument.type(), argument.type() == BSONType::String); - if (argName == "from") { - fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String()); - } else if (argName == "as") { + if (argName == "as") { as = argument.String(); } else if (argName == "localField") { localField = argument.String(); |