diff options
author | Pol Pinol Castuera <pol.castuera@mongodb.com> | 2022-09-08 13:38:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-08 15:08:01 +0000 |
commit | a1e564b5da7107c5a21f53bbb2cae838709f0977 (patch) | |
tree | 6fc326b6d18d53d76e32e25f04995496f6153547 /src/mongo | |
parent | ad4a7ae485fd0189542e59b3350e798b42b42d1b (diff) | |
download | mongo-a1e564b5da7107c5a21f53bbb2cae838709f0977.tar.gz |
SERVER-67891 New design with $lookup
Diffstat (limited to 'src/mongo')
11 files changed, 451 insertions, 26 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 681aa0d2e16..5cfd942a8f3 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -170,6 +170,9 @@ const NamespaceString NamespaceString::kClusterParametersNamespace(NamespaceStri const NamespaceString NamespaceString::kConfigsvrShardsNamespace(NamespaceString::kConfigDb, "shards"); +const NamespaceString NamespaceString::kConfigsvrCollectionsNamespace(NamespaceString::kConfigDb, + "collections"); + const NamespaceString NamespaceString::kConfigsvrIndexCatalogNamespace(NamespaceString::kConfigDb, "csrs.indexes"); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 5045d925c92..79da7a0f8ae 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -237,6 +237,9 @@ public: // Namespace used for storing the list of shards on the CSRS. static const NamespaceString kConfigsvrShardsNamespace; + // Namespace used for storing the list of sharded collections on the CSRS. + static const NamespaceString kConfigsvrCollectionsNamespace; + // Namespace used for storing the index catalog on the CSRS. static const NamespaceString kConfigsvrIndexCatalogNamespace; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index dd55e020bc7..cb58ad3043c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -249,6 +249,7 @@ pipelineEnv.Library( 'document_source_graph_lookup.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', + 'document_source_internal_all_collection_stats.cpp', 'document_source_internal_compute_geo_near_distance.cpp', 'document_source_internal_convert_bucket_index_stats.cpp', 'document_source_internal_inhibit_optimization.cpp', @@ -275,6 +276,7 @@ pipelineEnv.Library( 'document_source_sequential_document_cache.cpp', 'document_source_set_variable_from_subpipeline.cpp', 'document_source_set_window_fields.cpp', + 'document_source_sharded_data_distribution.cpp', 'document_source_single_document_transformation.cpp', 'document_source_skip.cpp', 'document_source_sort.cpp', diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 19619ed9021..3f40948ad9c 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -71,18 +71,15 @@ intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson( return make_intrusive<DocumentSourceCollStats>(pExpCtx, std::move(spec)); } -DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() { - if (_finished) { - return GetNextResult::makeEOF(); - } - - _finished = true; - +BSONObj DocumentSourceCollStats::makeStatsForNs( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const DocumentSourceCollStatsSpec& spec) { BSONObjBuilder builder; - builder.append("ns", pExpCtx->ns.ns()); + builder.append("ns", nss.ns()); - auto shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx); + auto shardName = expCtx->mongoProcessInterface->getShardName(expCtx->opCtx); if (!shardName.empty()) { builder.append("shard", shardName); @@ -91,33 +88,42 @@ DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() { builder.append("host", getHostNameCachedAndPort()); builder.appendDate("localTime", jsTime()); - if (auto latencyStatsSpec = _collStatsSpec.getLatencyStats()) { - pExpCtx->mongoProcessInterface->appendLatencyStats( - pExpCtx->opCtx, pExpCtx->ns, latencyStatsSpec->getHistograms(), &builder); + if (auto latencyStatsSpec = spec.getLatencyStats()) { + expCtx->mongoProcessInterface->appendLatencyStats( + expCtx->opCtx, nss, latencyStatsSpec->getHistograms(), &builder); } - if (auto storageStats = _collStatsSpec.getStorageStats()) { + if (auto storageStats = spec.getStorageStats()) { // If the storageStats field exists, it must have been validated as an object when parsing. BSONObjBuilder storageBuilder(builder.subobjStart("storageStats")); - uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendStorageStats( - pExpCtx->opCtx, pExpCtx->ns, *storageStats, &storageBuilder), + uassertStatusOKWithContext(expCtx->mongoProcessInterface->appendStorageStats( + expCtx->opCtx, nss, *storageStats, &storageBuilder), "Unable to retrieve storageStats in $collStats stage"); storageBuilder.doneFast(); } - if (_collStatsSpec.getCount()) { - uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendRecordCount( - pExpCtx->opCtx, pExpCtx->ns, &builder), - "Unable to retrieve count in $collStats stage"); + if (spec.getCount()) { + uassertStatusOKWithContext( + expCtx->mongoProcessInterface->appendRecordCount(expCtx->opCtx, nss, &builder), + "Unable to retrieve count in $collStats stage"); } - if (_collStatsSpec.getQueryExecStats()) { - uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendQueryExecStats( - pExpCtx->opCtx, pExpCtx->ns, &builder), - "Unable to retrieve queryExecStats in $collStats stage"); + if (spec.getQueryExecStats()) { + uassertStatusOKWithContext( + expCtx->mongoProcessInterface->appendQueryExecStats(expCtx->opCtx, nss, &builder), + "Unable to retrieve queryExecStats in $collStats stage"); } + return builder.obj(); +} + +DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() { + if (_finished) { + return GetNextResult::makeEOF(); + } + + _finished = true; - return {Document(builder.obj())}; + return {Document(makeStatsForNs(pExpCtx, pExpCtx->ns, _collStatsSpec))}; } Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 99d2a66f20f..f860a1222ee 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -85,6 +85,10 @@ public: const DocumentSourceCollStatsSpec _spec; }; + static BSONObj makeStatsForNs(const boost::intrusive_ptr<ExpressionContext>&, + const NamespaceString&, + const DocumentSourceCollStatsSpec&); + DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, DocumentSourceCollStatsSpec spec) : DocumentSource(kStageName, pExpCtx), _collStatsSpec(std::move(spec)) {} diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp new file mode 100644 index 00000000000..f4ccb6a7699 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/document_source_internal_all_collection_stats.h" + +#include "mongo/db/pipeline/document_source_coll_stats.h" +#include "mongo/db/pipeline/document_source_coll_stats_gen.h" + + +namespace mongo { + +using boost::intrusive_ptr; + +DocumentSourceInternalAllCollectionStats::DocumentSourceInternalAllCollectionStats( + const intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSource(kStageNameInternal, pExpCtx) {} + +REGISTER_DOCUMENT_SOURCE(_internalAllCollectionStats, + DocumentSourceInternalAllCollectionStats::LiteParsed::parse, + DocumentSourceInternalAllCollectionStats::createFromBsonInternal, + AllowedWithApiStrict::kAlways); + +PrivilegeVector DocumentSourceInternalAllCollectionStats::LiteParsed::requiredPrivileges( + bool isMongos, bool bypassDocumentValidation) const { + + // TODO: SERVER-68249 + + return PrivilegeVector{Privilege(ResourcePattern::forAnyNormalResource(), ActionType::find)}; +} + +DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNext() { + if (!_catalogDocs) { + _catalogDocs = pExpCtx->mongoProcessInterface->listCatalog(pExpCtx->opCtx); + } + + while (!_catalogDocs->empty()) { + BSONObj obj(std::move(_catalogDocs->front())); + NamespaceString nss(obj["ns"].String()); + + DocumentSourceCollStatsSpec collStatsSpec; + collStatsSpec.setStorageStats(StorageStatsSpec{}); + + _catalogDocs->pop_front(); + try { + return {Document{DocumentSourceCollStats::makeStatsForNs(pExpCtx, nss, collStatsSpec)}}; + } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // We don't want to retrieve data for views, only for collections. + continue; + } + } + + return GetNextResult::makeEOF(); +} + +intrusive_ptr<DocumentSource> DocumentSourceInternalAllCollectionStats::createFromBsonInternal( + BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(6789103, + "The $_internalAllCollectionStats stage specification must be an empty object", + elem.type() == Object && elem.Obj().isEmpty()); + + uassert(6789104, + "The $_internalAllCollectionStats stage must be run on the admin database", + pExpCtx->ns.isAdminDB() && pExpCtx->ns.isCollectionlessAggregateNS()); + + return new DocumentSourceInternalAllCollectionStats(pExpCtx); +} + +const char* DocumentSourceInternalAllCollectionStats::getSourceName() const { + return kStageNameInternal.rawData(); +} + +Value DocumentSourceInternalAllCollectionStats::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(DOC(getSourceName() << Document())); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h new file mode 100644 index 00000000000..206851d5fc5 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This aggregation stage is the ‘$_internalAllCollectionStats´. It takes no arguments. Its + * response will be a cursor, each document of which represents the collection statistics for a + * single collection for all the existing collections. + * + * When executing the '$_internalAllCollectionStats' aggregation stage, we will need to obtain the + * catalog containing all collections namespaces. + * + * Then, for each collection, we will call `makeStatsForNs` method from DocumentSourceCollStats that + * will retrieve all storage stats for that particular collection. + */ +class DocumentSourceInternalAllCollectionStats final : public DocumentSource { +public: + static constexpr StringData kStageNameInternal = "$_internalAllCollectionStats"_sd; + + class LiteParsed final : public LiteParsedDocumentSource { + public: + static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, + const BSONElement& spec) { + return std::make_unique<LiteParsed>(spec.fieldName()); + } + + explicit LiteParsed(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + + PrivilegeVector requiredPrivileges(bool isMongos, + bool bypassDocumentValidation) const final; + + bool isInitialSource() const final { + return true; + } + }; + + const char* getSourceName() const final; + + void addVariableRefs(std::set<Variables::Id>* refs) const final{}; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); + + constraints.isIndependentOfAnyCollection = true; + constraints.requiresInputDocSource = false; + return constraints; + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return boost::none; + } + + static boost::intrusive_ptr<DocumentSource> createFromBsonInternal( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceInternalAllCollectionStats( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + GetNextResult doGetNext() final; + + boost::optional<std::deque<BSONObj>> _catalogDocs; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index ae3e227a214..e222d0ea7ce 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -34,6 +34,7 @@ #include "mongo/db/exec/document_value/value.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/expression_algo.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_documents.h" @@ -115,7 +116,8 @@ NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: " << nss.db() << " and coll: " << nss.coll(), nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace || - nss == NamespaceString::kTenantMigrationOplogView); + nss == NamespaceString::kTenantMigrationOplogView || + nss == NamespaceString::kConfigsvrCollectionsNamespace); return nss; } @@ -377,6 +379,11 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState pipeStat // This stage will only be on the shards pipeline if $lookup on sharded foreign collections // is allowed. hostRequirement = HostTypeRequirement::kAnyShard; + } else if (_fromNs == NamespaceString::kConfigsvrCollectionsNamespace) { + // This is an unsharded collection, but the primary shard would be the config server, and + // the config servers are not prepared to take queries. Instead, we'll merge on any of the + // other shards. + hostRequirement = HostTypeRequirement::kAnyShard; } else { // If the pipeline is unsplit or this stage is on the merging part of the pipeline, // when $lookup on sharded foreign collections is allowed, the foreign collection is diff --git a/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp b/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp new file mode 100644 index 00000000000..a2c4b231931 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/document_source_sharded_data_distribution.h" + +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_internal_all_collection_stats.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/s/catalog/type_collection.h" + +#include "mongo/logv2/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::list; + +REGISTER_DOCUMENT_SOURCE(shardedDataDistribution, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceShardedDataDistribution::createFromBson, + AllowedWithApiStrict::kAlways); + +list<intrusive_ptr<DocumentSource>> DocumentSourceShardedDataDistribution::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + uassert(6789100, + "The $shardedDataDistribution stage specification must be an empty object", + elem.type() == Object && elem.Obj().isEmpty()); + + uassert( + 6789101, "The $shardedDataDistribution stage can only be run on mongoS", expCtx->inMongos); + + uassert(6789102, + "The $shardedDataDistribution stage must be run on the admin database", + expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()); + + // Add 'config.collections' as a resolved namespace + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces[NamespaceString::kConfigsvrCollectionsNamespace.coll()] = { + NamespaceString::kConfigsvrCollectionsNamespace, std::vector<BSONObj>{}}; + expCtx->setResolvedNamespaces(resolvedNamespaces); + + static const BSONObj kAllCollStatsObj = fromjson("{$_internalAllCollectionStats: {}}"); + static const BSONObj kGroupObj = fromjson(R"({ + $group: { + _id: "$ns", + shards: { + $push: { + $let: { + vars: { + nOwnedDocs: { + $subtract: [ + "$storageStats.count", + "$storageStats.numOrphanDocs" + ] + } + }, + in: { + shardName: "$shard", + numOrphanedDocs: "$storageStats.numOrphanDocs", + numOwnedDocuments: "$$nOwnedDocs", + ownedSizeBytes: { + $multiply: [ + "$storageStats.avgObjSize", + "$$nOwnedDocs" + ] + }, + orphanedSizeBytes: { + $multiply: [ + "$storageStats.avgObjSize", + "$storageStats.numOrphanDocs" + ] + } + } + } + } + } + } + })"); + static const BSONObj kLookupObj = fromjson(R"({ + $lookup: { + from: { + db: "config", + coll: "collections" + }, + localField: "_id", + foreignField: "_id", + as: "matchingShardedCollection" + } + })"); + static const BSONObj kMatchObj = fromjson("{$match: {matchingShardedCollection: {$ne: []}}}"); + static const BSONObj kProjectObj = fromjson(R"({ + $project: { + _id: 0, + ns: "$_id", + shards: "$shards" + } + })"); + + return {DocumentSourceInternalAllCollectionStats::createFromBsonInternal( + kAllCollStatsObj.firstElement(), expCtx), + DocumentSourceGroup::createFromBson(kGroupObj.firstElement(), expCtx), + DocumentSourceLookUp::createFromBson(kLookupObj.firstElement(), expCtx), + DocumentSourceMatch::createFromBson(kMatchObj.firstElement(), expCtx), + DocumentSourceProject::createFromBson(kProjectObj.firstElement(), expCtx)}; +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sharded_data_distribution.h b/src/mongo/db/pipeline/document_source_sharded_data_distribution.h new file mode 100644 index 00000000000..7cf96c85972 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sharded_data_distribution.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This aggregation stage is an alias for ‘$shardedDataDistribution’. It takes no arguments. Its + * response will be a cursor, each document of which represents the data-distribution information + * for a particular collection. + */ +namespace DocumentSourceShardedDataDistribution { + +static constexpr StringData kStageName = "$shardedDataDistribution"_sd; + +static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + +}; // namespace DocumentSourceShardedDataDistribution + +} // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index c836400728b..9444a694c81 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1562,11 +1562,14 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( [&](OperationContext* opCtx, const ChunkManager& cm) { auto pipelineToTarget = pipeline->clone(); - if (!cm.isSharded()) { + if (!cm.isSharded() && expCtx->ns != NamespaceString::kConfigsvrCollectionsNamespace) { // If the collection is unsharded and we are on the primary, we should be able to // do a local read. The primary may be moved right after the primary shard check, // but the local read path will do a db version check before it establishes a cursor // to catch this case and ensure we fail to read locally. + // There is the case where we are in config.collections (collection unsharded) and + // we want to broadcast to all shards. In this case we don't want to do a local read + // and we must target the config servers. try { auto expectUnshardedCollection( expCtx->mongoProcessInterface->expectUnshardedCollectionInScope( |