summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorPol Pinol Castuera <pol.castuera@mongodb.com>2022-09-08 13:38:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-08 15:08:01 +0000
commita1e564b5da7107c5a21f53bbb2cae838709f0977 (patch)
tree6fc326b6d18d53d76e32e25f04995496f6153547 /src/mongo/db/pipeline
parentad4a7ae485fd0189542e59b3350e798b42b42d1b (diff)
downloadmongo-a1e564b5da7107c5a21f53bbb2cae838709f0977.tar.gz
SERVER-67891 New design with $lookup
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp54
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp102
-rw-r--r--src/mongo/db/pipeline/document_source_internal_all_collection_stats.h109
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp136
-rw-r--r--src/mongo/db/pipeline/document_source_sharded_data_distribution.h50
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp5
9 files changed, 445 insertions, 26 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index dd55e020bc7..cb58ad3043c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -249,6 +249,7 @@ pipelineEnv.Library(
'document_source_graph_lookup.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
+ 'document_source_internal_all_collection_stats.cpp',
'document_source_internal_compute_geo_near_distance.cpp',
'document_source_internal_convert_bucket_index_stats.cpp',
'document_source_internal_inhibit_optimization.cpp',
@@ -275,6 +276,7 @@ pipelineEnv.Library(
'document_source_sequential_document_cache.cpp',
'document_source_set_variable_from_subpipeline.cpp',
'document_source_set_window_fields.cpp',
+ 'document_source_sharded_data_distribution.cpp',
'document_source_single_document_transformation.cpp',
'document_source_skip.cpp',
'document_source_sort.cpp',
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 19619ed9021..3f40948ad9c 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -71,18 +71,15 @@ intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson(
return make_intrusive<DocumentSourceCollStats>(pExpCtx, std::move(spec));
}
-DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() {
- if (_finished) {
- return GetNextResult::makeEOF();
- }
-
- _finished = true;
-
+BSONObj DocumentSourceCollStats::makeStatsForNs(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ const DocumentSourceCollStatsSpec& spec) {
BSONObjBuilder builder;
- builder.append("ns", pExpCtx->ns.ns());
+ builder.append("ns", nss.ns());
- auto shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx);
+ auto shardName = expCtx->mongoProcessInterface->getShardName(expCtx->opCtx);
if (!shardName.empty()) {
builder.append("shard", shardName);
@@ -91,33 +88,42 @@ DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() {
builder.append("host", getHostNameCachedAndPort());
builder.appendDate("localTime", jsTime());
- if (auto latencyStatsSpec = _collStatsSpec.getLatencyStats()) {
- pExpCtx->mongoProcessInterface->appendLatencyStats(
- pExpCtx->opCtx, pExpCtx->ns, latencyStatsSpec->getHistograms(), &builder);
+ if (auto latencyStatsSpec = spec.getLatencyStats()) {
+ expCtx->mongoProcessInterface->appendLatencyStats(
+ expCtx->opCtx, nss, latencyStatsSpec->getHistograms(), &builder);
}
- if (auto storageStats = _collStatsSpec.getStorageStats()) {
+ if (auto storageStats = spec.getStorageStats()) {
// If the storageStats field exists, it must have been validated as an object when parsing.
BSONObjBuilder storageBuilder(builder.subobjStart("storageStats"));
- uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendStorageStats(
- pExpCtx->opCtx, pExpCtx->ns, *storageStats, &storageBuilder),
+ uassertStatusOKWithContext(expCtx->mongoProcessInterface->appendStorageStats(
+ expCtx->opCtx, nss, *storageStats, &storageBuilder),
"Unable to retrieve storageStats in $collStats stage");
storageBuilder.doneFast();
}
- if (_collStatsSpec.getCount()) {
- uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendRecordCount(
- pExpCtx->opCtx, pExpCtx->ns, &builder),
- "Unable to retrieve count in $collStats stage");
+ if (spec.getCount()) {
+ uassertStatusOKWithContext(
+ expCtx->mongoProcessInterface->appendRecordCount(expCtx->opCtx, nss, &builder),
+ "Unable to retrieve count in $collStats stage");
}
- if (_collStatsSpec.getQueryExecStats()) {
- uassertStatusOKWithContext(pExpCtx->mongoProcessInterface->appendQueryExecStats(
- pExpCtx->opCtx, pExpCtx->ns, &builder),
- "Unable to retrieve queryExecStats in $collStats stage");
+ if (spec.getQueryExecStats()) {
+ uassertStatusOKWithContext(
+ expCtx->mongoProcessInterface->appendQueryExecStats(expCtx->opCtx, nss, &builder),
+ "Unable to retrieve queryExecStats in $collStats stage");
}
+ return builder.obj();
+}
+
+DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() {
+ if (_finished) {
+ return GetNextResult::makeEOF();
+ }
+
+ _finished = true;
- return {Document(builder.obj())};
+ return {Document(makeStatsForNs(pExpCtx, pExpCtx->ns, _collStatsSpec))};
}
Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 99d2a66f20f..f860a1222ee 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -85,6 +85,10 @@ public:
const DocumentSourceCollStatsSpec _spec;
};
+ static BSONObj makeStatsForNs(const boost::intrusive_ptr<ExpressionContext>&,
+ const NamespaceString&,
+ const DocumentSourceCollStatsSpec&);
+
DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
DocumentSourceCollStatsSpec spec)
: DocumentSource(kStageName, pExpCtx), _collStatsSpec(std::move(spec)) {}
diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp
new file mode 100644
index 00000000000..f4ccb6a7699
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/document_source_internal_all_collection_stats.h"
+
+#include "mongo/db/pipeline/document_source_coll_stats.h"
+#include "mongo/db/pipeline/document_source_coll_stats_gen.h"
+
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+
+DocumentSourceInternalAllCollectionStats::DocumentSourceInternalAllCollectionStats(
+ const intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(kStageNameInternal, pExpCtx) {}
+
+REGISTER_DOCUMENT_SOURCE(_internalAllCollectionStats,
+ DocumentSourceInternalAllCollectionStats::LiteParsed::parse,
+ DocumentSourceInternalAllCollectionStats::createFromBsonInternal,
+ AllowedWithApiStrict::kAlways);
+
+PrivilegeVector DocumentSourceInternalAllCollectionStats::LiteParsed::requiredPrivileges(
+ bool isMongos, bool bypassDocumentValidation) const {
+
+ // TODO: SERVER-68249
+
+ return PrivilegeVector{Privilege(ResourcePattern::forAnyNormalResource(), ActionType::find)};
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNext() {
+ if (!_catalogDocs) {
+ _catalogDocs = pExpCtx->mongoProcessInterface->listCatalog(pExpCtx->opCtx);
+ }
+
+ while (!_catalogDocs->empty()) {
+ BSONObj obj(std::move(_catalogDocs->front()));
+ NamespaceString nss(obj["ns"].String());
+
+ DocumentSourceCollStatsSpec collStatsSpec;
+ collStatsSpec.setStorageStats(StorageStatsSpec{});
+
+ _catalogDocs->pop_front();
+ try {
+ return {Document{DocumentSourceCollStats::makeStatsForNs(pExpCtx, nss, collStatsSpec)}};
+ } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // We don't want to retrieve data for views, only for collections.
+ continue;
+ }
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceInternalAllCollectionStats::createFromBsonInternal(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(6789103,
+ "The $_internalAllCollectionStats stage specification must be an empty object",
+ elem.type() == Object && elem.Obj().isEmpty());
+
+ uassert(6789104,
+ "The $_internalAllCollectionStats stage must be run on the admin database",
+ pExpCtx->ns.isAdminDB() && pExpCtx->ns.isCollectionlessAggregateNS());
+
+ return new DocumentSourceInternalAllCollectionStats(pExpCtx);
+}
+
+const char* DocumentSourceInternalAllCollectionStats::getSourceName() const {
+ return kStageNameInternal.rawData();
+}
+
+Value DocumentSourceInternalAllCollectionStats::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ return Value(DOC(getSourceName() << Document()));
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h
new file mode 100644
index 00000000000..206851d5fc5
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * This aggregation stage is the ‘$_internalAllCollectionStats´. It takes no arguments. Its
+ * response will be a cursor, each document of which represents the collection statistics for a
+ * single collection for all the existing collections.
+ *
+ * When executing the '$_internalAllCollectionStats' aggregation stage, we will need to obtain the
+ * catalog containing all collections namespaces.
+ *
+ * Then, for each collection, we will call `makeStatsForNs` method from DocumentSourceCollStats that
+ * will retrieve all storage stats for that particular collection.
+ */
+class DocumentSourceInternalAllCollectionStats final : public DocumentSource {
+public:
+ static constexpr StringData kStageNameInternal = "$_internalAllCollectionStats"_sd;
+
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
+ const BSONElement& spec) {
+ return std::make_unique<LiteParsed>(spec.fieldName());
+ }
+
+ explicit LiteParsed(std::string parseTimeName)
+ : LiteParsedDocumentSource(std::move(parseTimeName)) {}
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return stdx::unordered_set<NamespaceString>();
+ }
+
+ PrivilegeVector requiredPrivileges(bool isMongos,
+ bool bypassDocumentValidation) const final;
+
+ bool isInitialSource() const final {
+ return true;
+ }
+ };
+
+ const char* getSourceName() const final;
+
+ void addVariableRefs(std::set<Variables::Id>* refs) const final{};
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed);
+
+ constraints.isIndependentOfAnyCollection = true;
+ constraints.requiresInputDocSource = false;
+ return constraints;
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ return boost::none;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBsonInternal(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceInternalAllCollectionStats(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ GetNextResult doGetNext() final;
+
+ boost::optional<std::deque<BSONObj>> _catalogDocs;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index ae3e227a214..e222d0ea7ce 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_documents.h"
@@ -115,7 +116,8 @@ NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem,
str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: "
<< nss.db() << " and coll: " << nss.coll(),
nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace ||
- nss == NamespaceString::kTenantMigrationOplogView);
+ nss == NamespaceString::kTenantMigrationOplogView ||
+ nss == NamespaceString::kConfigsvrCollectionsNamespace);
return nss;
}
@@ -377,6 +379,11 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState pipeStat
// This stage will only be on the shards pipeline if $lookup on sharded foreign collections
// is allowed.
hostRequirement = HostTypeRequirement::kAnyShard;
+ } else if (_fromNs == NamespaceString::kConfigsvrCollectionsNamespace) {
+ // This is an unsharded collection, but the primary shard would be the config server, and
+ // the config servers are not prepared to take queries. Instead, we'll merge on any of the
+ // other shards.
+ hostRequirement = HostTypeRequirement::kAnyShard;
} else {
// If the pipeline is unsplit or this stage is on the merging part of the pipeline,
// when $lookup on sharded foreign collections is allowed, the foreign collection is
diff --git a/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp b/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp
new file mode 100644
index 00000000000..a2c4b231931
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sharded_data_distribution.cpp
@@ -0,0 +1,136 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/document_source_sharded_data_distribution.h"
+
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_internal_all_collection_stats.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/s/catalog/type_collection.h"
+
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::list;
+
+REGISTER_DOCUMENT_SOURCE(shardedDataDistribution,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceShardedDataDistribution::createFromBson,
+ AllowedWithApiStrict::kAlways);
+
+list<intrusive_ptr<DocumentSource>> DocumentSourceShardedDataDistribution::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(6789100,
+ "The $shardedDataDistribution stage specification must be an empty object",
+ elem.type() == Object && elem.Obj().isEmpty());
+
+ uassert(
+ 6789101, "The $shardedDataDistribution stage can only be run on mongoS", expCtx->inMongos);
+
+ uassert(6789102,
+ "The $shardedDataDistribution stage must be run on the admin database",
+ expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS());
+
+ // Add 'config.collections' as a resolved namespace
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[NamespaceString::kConfigsvrCollectionsNamespace.coll()] = {
+ NamespaceString::kConfigsvrCollectionsNamespace, std::vector<BSONObj>{}};
+ expCtx->setResolvedNamespaces(resolvedNamespaces);
+
+ static const BSONObj kAllCollStatsObj = fromjson("{$_internalAllCollectionStats: {}}");
+ static const BSONObj kGroupObj = fromjson(R"({
+ $group: {
+ _id: "$ns",
+ shards: {
+ $push: {
+ $let: {
+ vars: {
+ nOwnedDocs: {
+ $subtract: [
+ "$storageStats.count",
+ "$storageStats.numOrphanDocs"
+ ]
+ }
+ },
+ in: {
+ shardName: "$shard",
+ numOrphanedDocs: "$storageStats.numOrphanDocs",
+ numOwnedDocuments: "$$nOwnedDocs",
+ ownedSizeBytes: {
+ $multiply: [
+ "$storageStats.avgObjSize",
+ "$$nOwnedDocs"
+ ]
+ },
+ orphanedSizeBytes: {
+ $multiply: [
+ "$storageStats.avgObjSize",
+ "$storageStats.numOrphanDocs"
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ })");
+ static const BSONObj kLookupObj = fromjson(R"({
+ $lookup: {
+ from: {
+ db: "config",
+ coll: "collections"
+ },
+ localField: "_id",
+ foreignField: "_id",
+ as: "matchingShardedCollection"
+ }
+ })");
+ static const BSONObj kMatchObj = fromjson("{$match: {matchingShardedCollection: {$ne: []}}}");
+ static const BSONObj kProjectObj = fromjson(R"({
+ $project: {
+ _id: 0,
+ ns: "$_id",
+ shards: "$shards"
+ }
+ })");
+
+ return {DocumentSourceInternalAllCollectionStats::createFromBsonInternal(
+ kAllCollStatsObj.firstElement(), expCtx),
+ DocumentSourceGroup::createFromBson(kGroupObj.firstElement(), expCtx),
+ DocumentSourceLookUp::createFromBson(kLookupObj.firstElement(), expCtx),
+ DocumentSourceMatch::createFromBson(kMatchObj.firstElement(), expCtx),
+ DocumentSourceProject::createFromBson(kProjectObj.firstElement(), expCtx)};
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sharded_data_distribution.h b/src/mongo/db/pipeline/document_source_sharded_data_distribution.h
new file mode 100644
index 00000000000..7cf96c85972
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sharded_data_distribution.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * This aggregation stage is an alias for ‘$shardedDataDistribution’. It takes no arguments. Its
+ * response will be a cursor, each document of which represents the data-distribution information
+ * for a particular collection.
+ */
+namespace DocumentSourceShardedDataDistribution {
+
+static constexpr StringData kStageName = "$shardedDataDistribution"_sd;
+
+static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+}; // namespace DocumentSourceShardedDataDistribution
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index c836400728b..9444a694c81 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1562,11 +1562,14 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
[&](OperationContext* opCtx, const ChunkManager& cm) {
auto pipelineToTarget = pipeline->clone();
- if (!cm.isSharded()) {
+ if (!cm.isSharded() && expCtx->ns != NamespaceString::kConfigsvrCollectionsNamespace) {
// If the collection is unsharded and we are on the primary, we should be able to
// do a local read. The primary may be moved right after the primary shard check,
// but the local read path will do a db version check before it establishes a cursor
// to catch this case and ensure we fail to read locally.
+ // There is the case where we are in config.collections (collection unsharded) and
+ // we want to broadcast to all shards. In this case we don't want to do a local read
+ // and we must target the config servers.
try {
auto expectUnshardedCollection(
expCtx->mongoProcessInterface->expectUnshardedCollectionInScope(