summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJanna Golden <janna.golden@mongodb.com>2019-12-17 16:41:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-11 19:44:06 +0000
commitc6533b269898f6cd077ffca7b2cc9545863313e9 (patch)
tree045e5caa5e58cd6916e8204d35af5a5a67f93b38
parent93c36da06fa36454a7f8ff77ce2c86a07ba97e4f (diff)
downloadmongo-c6533b269898f6cd077ffca7b2cc9545863313e9.tar.gz
SERVER-43617 Add metrics on the mongos to indicate the number of shards targeted for CRUD and agg commands
(cherry picked from commit 1fe9dfb1ade548488831bf29cdcc57636e5e3b8a)
-rw-r--r--jstests/sharding/num_hosts_targeted_metrics.js220
-rw-r--r--src/mongo/s/chunk_manager.cpp4
-rw-r--r--src/mongo/s/chunk_manager.h12
-rw-r--r--src/mongo/s/client/SConscript1
-rw-r--r--src/mongo/s/client/num_hosts_targeted_metrics.cpp128
-rw-r--r--src/mongo/s/client/num_hosts_targeted_metrics.h91
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp32
-rw-r--r--src/mongo/s/ns_targeter.h5
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp218
-rw-r--r--src/mongo/s/query/cluster_find.cpp24
-rw-r--r--src/mongo/s/s_sharding_server_status.cpp4
-rw-r--r--src/mongo/s/sharding_initialization.cpp4
-rw-r--r--src/mongo/s/sharding_router_test_fixture.cpp3
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp12
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h3
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp10
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h4
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp8
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h2
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h5
20 files changed, 710 insertions, 80 deletions
diff --git a/jstests/sharding/num_hosts_targeted_metrics.js b/jstests/sharding/num_hosts_targeted_metrics.js
new file mode 100644
index 00000000000..be5cc88e2e2
--- /dev/null
+++ b/jstests/sharding/num_hosts_targeted_metrics.js
@@ -0,0 +1,220 @@
+/*
+ * This test checks that mongos reports the number of hosts targeted for a given command in
+ * serverStatus.
+ * @tags: [uses_transactions, uses_multi_shard_transaction]
+ */
+
+(function() {
+'use strict';
+
+const st = new ShardingTest({shards: 3});
+const mongos = st.s;
+const testDb = mongos.getDB("test");
+
+assert.commandWorked(mongos.adminCommand({enablesharding: "test"}));
+st.ensurePrimaryShard("test", st.shard1.shardName);
+
+// Set up "test.coll0"
+assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll0", key: {x: 1}}));
+assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 5}}));
+assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 200}}));
+
+// // Move chunks so each shard has one chunk.
+assert.commandWorked(mongos.adminCommand(
+ {moveChunk: "test.coll0", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true}));
+assert.commandWorked(mongos.adminCommand(
+ {moveChunk: "test.coll0", find: {x: 200}, to: st.shard2.shardName, _waitForDelete: true}));
+
+// Set up "test.coll1"
+assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll1", key: {x: 1}}));
+assert.commandWorked(testDb.adminCommand({split: "test.coll1", middle: {x: 5}}));
+
+// // Move chunk so only shards 0 and 1 have chunks.
+assert.commandWorked(mongos.adminCommand(
+ {moveChunk: "test.coll1", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true}));
+
+function assertShardingStats(initialStats, updatedStats, expectedChanges) {
+ for (let [cmd, targetedHostsVals] of Object.entries(expectedChanges)) {
+ for (let [numTargetedHosts, count] of Object.entries(targetedHostsVals)) {
+ if (!initialStats) {
+ assert.eq(NumberInt(updatedStats[cmd][numTargetedHosts]), count);
+ } else {
+ assert.eq(NumberInt(initialStats[cmd][numTargetedHosts]) + count,
+ NumberInt(updatedStats[cmd][numTargetedHosts]));
+ }
+ }
+ }
+}
+
+// ----- Check write targeting stats -----
+
+let serverStatusInitial = testDb.serverStatus();
+assert.commandWorked(testDb.coll0.insert({x: 9}));
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"insert": {"oneShard": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+let bulk = testDb.coll0.initializeUnorderedBulkOp();
+for (var x = 0; x < 10; x++) {
+ bulk.insert({x: x});
+}
+assert.commandWorked(bulk.execute());
+assertShardingStats({"insert": {"manyShards": 0}},
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"insert": {"manyShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+bulk = testDb.unshardedCollection.initializeOrderedBulkOp();
+bulk.insert({j: -25});
+bulk.insert({j: 8});
+assert.commandWorked(bulk.execute());
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"insert": {"unsharded": 1}});
+
+bulk = testDb.coll0.initializeUnorderedBulkOp();
+bulk.find({x: 200}).update({$set: {a: -21}});
+bulk.find({x: -100}).update({$set: {a: -21}});
+bulk.find({x: 45}).update({$set: {a: -21}});
+assert.commandWorked(bulk.execute());
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"update": {"allShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+bulk = testDb.coll0.initializeUnorderedBulkOp();
+bulk.insert({x: -20});
+bulk.find({x: -20}).update({$set: {a: -21}});
+bulk.insert({x: 40});
+bulk.insert({x: 90});
+assert.commandWorked(bulk.execute());
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"insert": {"oneShard": 2}, "update": {"oneShard": 1}});
+
+// If delete targets more than one shard, we broadcast to all shards if the write is not in a
+// transaction
+serverStatusInitial = testDb.serverStatus();
+assert.commandWorked(testDb.coll0.remove({x: {$lt: 9}}));
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"delete": {"allShards": 1}});
+
+// If delete targets more than one shard, we *do not* broadcast to all shards when the write is in a
+// transaction
+if (jsTestOptions().mongosBinVersion !== 'last-lts' &&
+ jsTestOptions().mongosBinVersion !== 'last-stable') {
+ let session = mongos.startSession();
+ let sessionDB = session.getDatabase("test");
+ session.startTransaction();
+ serverStatusInitial = testDb.serverStatus();
+ assert.commandWorked(sessionDB.coll0.remove({x: {$lt: 9}}));
+ session.commitTransaction();
+ assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"delete": {"manyShards": 1}});
+}
+
+serverStatusInitial = testDb.serverStatus();
+bulk = testDb.coll1.initializeUnorderedBulkOp();
+bulk.insert({x: -100, a: 100});
+bulk.insert({x: 100, a: 100});
+assert.commandWorked(bulk.execute());
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"insert": {"allShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+assert.commandWorked(testDb.coll1.update({a: 100}, {$set: {a: -21}}, {multi: true}));
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"update": {"allShards": 1}});
+
+// ----- Check find targeting stats -----
+
+serverStatusInitial = testDb.serverStatus();
+let findRes = testDb.coll0.find({"x": {$gt: 8}}).itcount();
+assert.gte(findRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"find": {"manyShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+findRes = testDb.coll0.find({"x": {$lt: 300}}).itcount();
+assert.gte(findRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"find": {"allShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+findRes = testDb.coll0.find({"x": 40}).itcount();
+assert.gte(findRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"find": {"oneShard": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+findRes = testDb.unshardedCollection.find({"j": 8}).itcount();
+assert.gte(findRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"find": {"unsharded": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+findRes = testDb.coll1.find({a: -21}).itcount();
+assert.gte(findRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"find": {"allShards": 1}});
+
+// ----- Check aggregate targeting stats -----
+
+serverStatusInitial = testDb.serverStatus();
+let aggRes =
+ testDb.coll0.aggregate([{$match: {x: {$gte: 15, $lte: 100}}}].concat([{$sort: {x: 1}}]))
+ .itcount();
+assert.gte(aggRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"aggregate": {"oneShard": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+aggRes = testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 50}}}].concat([{$sort: {x: 1}}]))
+ .itcount();
+assert.gte(aggRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"aggregate": {"manyShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+aggRes = testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 500}}}].concat([{$sort: {x: 1}}]))
+ .itcount();
+assert.gte(aggRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"aggregate": {"allShards": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+aggRes = testDb.unshardedCollection
+ .aggregate([{$match: {j: {$gte: -150, $lte: 50}}}].concat([{$sort: {j: 1}}]))
+ .itcount();
+assert.gte(aggRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"aggregate": {"unsharded": 1}});
+
+serverStatusInitial = testDb.serverStatus();
+aggRes =
+ testDb.coll1
+ .aggregate([{
+ $lookup: {from: "unshardedCollection", localField: "x", foreignField: "j", as: "lookup"}
+ }])
+ .itcount();
+assert.gte(aggRes, 1);
+assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted,
+ testDb.serverStatus().shardingStatistics.numHostsTargeted,
+ {"aggregate": {"allShards": 1}});
+
+st.stop();
+})();
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index bd8a4cf2e72..2c79b51be75 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -258,6 +258,10 @@ void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const {
[](const ShardVersionMap::value_type& pair) { return pair.first; });
}
+int RoutingTableHistory::getNShardsOwningChunks() const {
+ return _shardVersions.size();
+}
+
std::pair<ChunkInfoMap::const_iterator, ChunkInfoMap::const_iterator>
RoutingTableHistory::overlappingRanges(const BSONObj& min,
const BSONObj& max,
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 8544eccf878..c6ece9146e8 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -132,6 +132,11 @@ public:
void getAllShardIds(std::set<ShardId>* all) const;
/**
+ * Returns the number of shards on which the collection has any chunks
+ */
+ int getNShardsOwningChunks() const;
+
+ /**
* Returns true if, for this shard, the chunks are identical in both chunk managers
*/
bool compatibleWith(const RoutingTableHistory& other, const ShardId& shard) const;
@@ -361,6 +366,13 @@ public:
_rt->getAllShardIds(all);
}
+ /**
+ * Returns the number of shards on which the collection has any chunks
+ */
+ int getNShardsOwningChunks() {
+ return _rt->getNShardsOwningChunks();
+ }
+
// Transforms query into bounds for each field in the shard key
// for example :
// Key { a: 1, b: 1 },
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index 6f978b359b7..e099979a909 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -8,6 +8,7 @@ env.Library(
target='sharding_client',
source=[
'shard_remote.cpp',
+ 'num_hosts_targeted_metrics.cpp',
env.Idlc('shard_remote.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.cpp b/src/mongo/s/client/num_hosts_targeted_metrics.cpp
new file mode 100644
index 00000000000..82188f4f300
--- /dev/null
+++ b/src/mongo/s/client/num_hosts_targeted_metrics.cpp
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
+
+#include "mongo/base/init.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+
+const auto getNumHostsTargeted = ServiceContext::declareDecoration<NumHostsTargetedMetrics>();
+
+namespace {
+std::string queryTypeToString(NumHostsTargetedMetrics::QueryType queryType) {
+ switch (queryType) {
+ case NumHostsTargetedMetrics::QueryType::kFindCmd:
+ return "find";
+ case NumHostsTargetedMetrics::QueryType::kInsertCmd:
+ return "insert";
+ case NumHostsTargetedMetrics::QueryType::kUpdateCmd:
+ return "update";
+ case NumHostsTargetedMetrics::QueryType::kDeleteCmd:
+ return "delete";
+ case NumHostsTargetedMetrics::QueryType::kAggregateCmd:
+ return "aggregate";
+ default:
+ return "";
+ }
+}
+} // namespace
+
+void NumHostsTargetedMetrics::addNumHostsTargeted(NumHostsTargetedMetrics::QueryType queryType,
+ NumHostsTargetedMetrics::TargetType targetType) {
+ switch (targetType) {
+ case TargetType::kAllShards:
+ _numHostsTargeted[queryType]->allShards.addAndFetch(1);
+ return;
+ case TargetType::kManyShards:
+ _numHostsTargeted[queryType]->manyShards.addAndFetch(1);
+ return;
+ case TargetType::kOneShard:
+ _numHostsTargeted[queryType]->oneShard.addAndFetch(1);
+ return;
+ case TargetType::kUnsharded:
+ _numHostsTargeted[queryType]->unsharded.addAndFetch(1);
+ return;
+ }
+}
+
+void NumHostsTargetedMetrics::appendSection(BSONObjBuilder* builder) {
+ BSONObjBuilder numHostsTargetedStatsBuilder(builder->subobjStart("numHostsTargeted"));
+ for (auto i = 0; i < kNumQueryType; i++) {
+ auto& targetStat = _numHostsTargeted[i];
+ auto queryType = static_cast<QueryType>(i);
+ BSONObjBuilder queryStatsBuilder(
+ numHostsTargetedStatsBuilder.subobjStart(queryTypeToString(queryType)));
+ queryStatsBuilder.appendNumber("allShards", targetStat->allShards.load());
+ queryStatsBuilder.appendNumber("manyShards", targetStat->manyShards.load());
+ queryStatsBuilder.appendNumber("oneShard", targetStat->oneShard.load());
+ queryStatsBuilder.appendNumber("unsharded", targetStat->unsharded.load());
+ }
+}
+
+NumHostsTargetedMetrics::TargetType NumHostsTargetedMetrics::parseTargetType(
+ OperationContext* opCtx, int nShardsTargeted, int nShardsOwningChunks) {
+ // If nShardsOwningChunks == 0, this means the routing info did not contain a chunk manager so
+ // the collection is unsharded
+ if (nShardsOwningChunks == 0)
+ return TargetType::kUnsharded;
+
+ if (nShardsTargeted == 1)
+ return TargetType::kOneShard;
+
+ // If an update or delete targets > 1 shard, it will be broadcast to all shards in the cluster
+ // *regardless* of whether the shard owns any data for that collection. We should count this as
+ // 'allShards'.
+ if (nShardsTargeted >= nShardsOwningChunks)
+ return TargetType::kAllShards;
+
+ return TargetType::kManyShards;
+}
+
+NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(ServiceContext* serviceContext) {
+ return getNumHostsTargeted(serviceContext);
+}
+
+NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+void NumHostsTargetedMetrics::startup() {
+ _numHostsTargeted.reserve(kNumQueryType);
+ for (auto i = 0; i < kNumQueryType; i++) {
+ _numHostsTargeted.push_back(std::make_unique<TargetStats>());
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.h b/src/mongo/s/client/num_hosts_targeted_metrics.h
new file mode 100644
index 00000000000..5f75cbbb069
--- /dev/null
+++ b/src/mongo/s/client/num_hosts_targeted_metrics.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "mongo/platform/atomic_word.h"
+
+
+namespace mongo {
+
+class BSONObjBuilder;
+class OperationContext;
+class ServiceContext;
+
+struct TargetStats {
+ // the op targeted all shards that own chunks for the collection as long as the total number of
+ // shards in the cluster is > 1
+ AtomicWord<int> allShards;
+
+ // the op targeted > 1 shard that own chunks for the collection as long as the total number of
+ // shards in the cluster is > 1
+ AtomicWord<int> manyShards;
+
+ // the op targeted 1 shard (if there exists only one shard, the metric will count as 'oneShard')
+ AtomicWord<int> oneShard;
+
+ // the collection is unsharded
+ AtomicWord<int> unsharded;
+};
+
+class NumHostsTargetedMetrics {
+
+public:
+ enum QueryType {
+ kFindCmd,
+ kInsertCmd,
+ kUpdateCmd,
+ kDeleteCmd,
+ kAggregateCmd,
+ kNumQueryType,
+ };
+
+ enum TargetType { kAllShards, kManyShards, kOneShard, kUnsharded };
+
+ void addNumHostsTargeted(QueryType queryType, TargetType targetType);
+
+ void appendSection(BSONObjBuilder* builder);
+
+ TargetType parseTargetType(OperationContext* opCtx,
+ int nShardsTargeted,
+ int nShardsOwningChunks);
+
+ static NumHostsTargetedMetrics& get(ServiceContext* serviceContext);
+ static NumHostsTargetedMetrics& get(OperationContext* opCtx);
+
+ void startup();
+
+private:
+ std::vector<std::unique_ptr<TargetStats>> _numHostsTargeted;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 0be5d7c0af4..fce0171ed69 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/cluster_last_error_info.h"
@@ -320,6 +321,30 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
return updatedShardKey;
}
+void updateHostsTargetedMetrics(OperationContext* opCtx,
+ BatchedCommandRequest::BatchType batchType,
+ int nShardsOwningChunks,
+ int nShardsTargeted) {
+ NumHostsTargetedMetrics::QueryType writeType;
+ switch (batchType) {
+ case BatchedCommandRequest::BatchType_Insert:
+ writeType = NumHostsTargetedMetrics::QueryType::kInsertCmd;
+ break;
+ case BatchedCommandRequest::BatchType_Update:
+ writeType = NumHostsTargetedMetrics::QueryType::kUpdateCmd;
+ break;
+ case BatchedCommandRequest::BatchType_Delete:
+ writeType = NumHostsTargetedMetrics::QueryType::kDeleteCmd;
+ break;
+
+ MONGO_UNREACHABLE;
+ }
+
+ auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType(
+ opCtx, nShardsTargeted, nShardsOwningChunks);
+ NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(writeType, targetType);
+}
+
/**
* Base class for mongos write commands.
*/
@@ -534,6 +559,13 @@ private:
CurOp::get(opCtx)->debug().nShards =
stats.getTargetedShards().size() + (updatedShardKey ? 1 : 0);
+ if (stats.getNumShardsOwningChunks().is_initialized())
+ updateHostsTargetedMetrics(opCtx,
+ _batchedRequest.getBatchType(),
+ stats.getNumShardsOwningChunks().get(),
+ stats.getTargetedShards().size() +
+ (updatedShardKey ? 1 : 0));
+
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto writeCmdStatus = response.toStatus();
if (!writeCmdStatus.isOK()) {
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 92e29547705..e7f01f3139a 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -159,6 +159,11 @@ public:
* Returns !OK with message if could not refresh
*/
virtual Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0;
+
+ /**
+ * Returns the number of shards that own one or more chunks for the targeted collection.
+ */
+ virtual int getNShardsOwningChunks() const = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index bc4b9191c90..74f2132b4b0 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -60,6 +60,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
@@ -742,6 +743,54 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx,
appendEmptyResultSet(opCtx, *result, status, nss.ns());
}
+void updateHostsTargetedMetrics(OperationContext* opCtx,
+ const NamespaceString& executionNss,
+ boost::optional<CachedCollectionRoutingInfo> executionNsRoutingInfo,
+ stdx::unordered_set<NamespaceString> involvedNamespaces) {
+ if (!executionNsRoutingInfo)
+ return;
+
+ // Create a set of ShardIds that own a chunk belonging to any of the collections involved in
+ // this pipeline. This will be used to determine whether the pipeline targeted all of the shards
+ // that own chunks for any collection involved or not.
+ std::set<ShardId> shardsOwningChunks = [&]() {
+ std::set<ShardId> shardsIds;
+
+ if (executionNsRoutingInfo->cm()) {
+ std::set<ShardId> shardIdsForNs;
+ executionNsRoutingInfo->cm()->getAllShardIds(&shardIdsForNs);
+ for (const auto& shardId : shardIdsForNs) {
+ shardsIds.insert(shardId);
+ }
+ }
+
+ for (const auto& nss : involvedNamespaces) {
+ if (nss == executionNss)
+ continue;
+
+ const auto resolvedNsRoutingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
+ if (resolvedNsRoutingInfo.cm()) {
+ std::set<ShardId> shardIdsForNs;
+ resolvedNsRoutingInfo.cm()->getAllShardIds(&shardIdsForNs);
+ for (const auto& shardId : shardIdsForNs) {
+ shardsIds.insert(shardId);
+ }
+ }
+ }
+
+ return shardsIds;
+ }();
+
+ auto nShardsTargeted = CurOp::get(opCtx)->debug().nShards;
+ if (nShardsTargeted > 0) {
+ auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType(
+ opCtx, nShardsTargeted, shardsOwningChunks.size());
+ NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(
+ NumHostsTargetedMetrics::QueryType::kAggregateCmd, targetType);
+ }
+}
+
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
@@ -813,94 +862,103 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto resolvedNamespaces =
resolveInvolvedNamespaces(opCtx, litePipe, request.getNamespaceString());
- // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
- // met:
- //
- // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
- // 2. Is allowed to be forwarded to shards.
- // 3. Does not need to run on all shards.
- // 4. Doesn't need transformation via DocumentSource::serialize().
- if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
- litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
- const auto primaryShardId = routingInfo->db().primary()->getId();
- return aggPassthrough(
- opCtx, namespaces, primaryShardId, request, litePipe, privileges, result);
- }
-
- // Populate the collection UUID and the appropriate collation to use.
- auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe);
- BSONObj collationObj = collInfo.first;
- boost::optional<UUID> uuid = collInfo.second;
-
- // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
- // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
- // pipeline's stages.
- auto expCtx = makeExpressionContext(
- opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces));
-
- // Parse and optimize the full pipeline.
- auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
- pipeline->optimizePipeline();
-
- // Check whether the entire pipeline must be run on mongoS.
- if (pipeline->requiredToRunOnMongos()) {
- // If this is an explain write the explain output and return.
- if (expCtx->explain) {
- *result << "splitPipeline" << BSONNULL << "mongos"
- << Document{{"host", getHostNameCachedAndPort()},
- {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
- return Status::OK();
+ auto status = [&]() {
+ // A pipeline is allowed to passthrough to the primary shard iff the following conditions
+ // are met:
+ //
+ // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
+ // 2. Is allowed to be forwarded to shards.
+ // 3. Does not need to run on all shards.
+ // 4. Doesn't need transformation via DocumentSource::serialize().
+ if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
+ litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
+ const auto primaryShardId = routingInfo->db().primary()->getId();
+ return aggPassthrough(
+ opCtx, namespaces, primaryShardId, request, litePipe, privileges, result);
}
- return runPipelineOnMongoS(
- expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges);
- }
+ // Populate the collection UUID and the appropriate collation to use.
+ auto collInfo =
+ getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe);
+ BSONObj collationObj = collInfo.first;
+ boost::optional<UUID> uuid = collInfo.second;
+
+ // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
+ // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by
+ // the pipeline's stages.
+ auto expCtx = makeExpressionContext(
+ opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces));
+
+ // Parse and optimize the full pipeline.
+ auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
+ pipeline->optimizePipeline();
+
+ // Check whether the entire pipeline must be run on mongoS.
+ if (pipeline->requiredToRunOnMongos()) {
+ // If this is an explain write the explain output and return.
+ if (expCtx->explain) {
+ *result << "splitPipeline" << BSONNULL << "mongos"
+ << Document{{"host", getHostNameCachedAndPort()},
+ {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
+ return Status::OK();
+ }
- // If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
- expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
+ return runPipelineOnMongoS(
+ expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges);
+ }
- // If the operation is an explain, then we verify that it succeeded on all targeted shards,
- // write the results to the output builder, and return immediately.
- if (expCtx->explain) {
- return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
- }
+ // If not, split the pipeline as necessary and dispatch to the relevant shards.
+ auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
+ expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
- // If this isn't an explain, then we must have established cursors on at least one shard.
- invariant(shardDispatchResults.remoteCursors.size() > 0);
+ // If the operation is an explain, then we verify that it succeeded on all targeted shards,
+ // write the results to the output builder, and return immediately.
+ if (expCtx->explain) {
+ return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
+ }
- // If we sent the entire pipeline to a single shard, store the remote cursor and return.
- if (!shardDispatchResults.splitPipeline) {
- invariant(shardDispatchResults.remoteCursors.size() == 1);
- auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front());
- const auto shardId = remoteCursor->getShardId().toString();
- const auto reply = uassertStatusOK(storePossibleCursor(opCtx,
- namespaces.requestedNss,
- std::move(remoteCursor),
- privileges,
- expCtx->tailableMode));
- return appendCursorResponseToCommandResult(shardId, reply, result);
- }
+ // If this isn't an explain, then we must have established cursors on at least one shard.
+ invariant(shardDispatchResults.remoteCursors.size() > 0);
+
+ // If we sent the entire pipeline to a single shard, store the remote cursor and return.
+ if (!shardDispatchResults.splitPipeline) {
+ invariant(shardDispatchResults.remoteCursors.size() == 1);
+ auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front());
+ const auto shardId = remoteCursor->getShardId().toString();
+ const auto reply = uassertStatusOK(storePossibleCursor(opCtx,
+ namespaces.requestedNss,
+ std::move(remoteCursor),
+ privileges,
+ expCtx->tailableMode));
+ return appendCursorResponseToCommandResult(shardId, reply, result);
+ }
- // If we have the exchange spec then dispatch all consumers.
- if (shardDispatchResults.exchangeSpec) {
- shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx,
- namespaces.executionNss,
- request,
- litePipe,
- collationObj,
- &shardDispatchResults);
- }
+ // If we have the exchange spec then dispatch all consumers.
+ if (shardDispatchResults.exchangeSpec) {
+ shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx,
+ namespaces.executionNss,
+ request,
+ litePipe,
+ collationObj,
+ &shardDispatchResults);
+ }
- // If we reach here, we have a merge pipeline to dispatch.
- return dispatchMergingPipeline(expCtx,
- namespaces,
- request,
- litePipe,
- routingInfo,
- std::move(shardDispatchResults),
- result,
- privileges);
+ // If we reach here, we have a merge pipeline to dispatch.
+ return dispatchMergingPipeline(expCtx,
+ namespaces,
+ request,
+ litePipe,
+ routingInfo,
+ std::move(shardDispatchResults),
+ result,
+ privileges);
+ }();
+
+ if (status.isOK())
+ updateHostsTargetedMetrics(
+ opCtx, namespaces.executionNss, routingInfo, litePipe.getInvolvedNamespaces());
+
+ return status;
}
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 567110ae134..ad19dc32da3 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -52,6 +52,7 @@
#include "mongo/executor/task_executor_pool.h"
#include "mongo/platform/overflow_arithmetic.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
@@ -203,6 +204,20 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards(
return requests;
}
+void updateNumHostsTargetedMetrics(OperationContext* opCtx,
+ const CachedCollectionRoutingInfo& routingInfo,
+ int nTargetedShards) {
+ int nShardsOwningChunks = 0;
+ if (routingInfo.cm()) {
+ nShardsOwningChunks = routingInfo.cm()->getNShardsOwningChunks();
+ }
+
+ auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType(
+ opCtx, nTargetedShards, nShardsOwningChunks);
+ NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(
+ NumHostsTargetedMetrics::QueryType::kFindCmd, targetType);
+}
+
CursorId runQueryWithoutRetrying(OperationContext* opCtx,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
@@ -337,6 +352,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// allocate a cursor id.
if (cursorState == ClusterCursorManager::CursorState::Exhausted) {
CurOp::get(opCtx)->debug().cursorExhausted = true;
+
+ if (shardIds.size() > 0) {
+ updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size());
+ }
return CursorId(0);
}
@@ -354,6 +373,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// Record the cursorID in CurOp.
CurOp::get(opCtx)->debug().cursorid = cursorId;
+
+ if (shardIds.size() > 0) {
+ updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size());
+ }
+
return cursorId;
}
diff --git a/src/mongo/s/s_sharding_server_status.cpp b/src/mongo/s/s_sharding_server_status.cpp
index f06df21aff6..0bb94bc5775 100644
--- a/src/mongo/s/s_sharding_server_status.cpp
+++ b/src/mongo/s/s_sharding_server_status.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
@@ -80,8 +81,11 @@ public:
const BSONElement& configElement) const override {
auto const grid = Grid::get(opCtx);
auto const catalogCache = grid->catalogCache();
+ auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx);
BSONObjBuilder result;
+
+ numHostsTargetedMetrics.appendSection(&result);
catalogCache->report(&result);
return result.obj();
}
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 7f21ec10c84..29da54a9a30 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -57,6 +57,7 @@
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/cluster_identity_loader.h"
@@ -179,6 +180,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize);
executorPool->startup();
+ auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx);
+ numHostsTargetedMetrics.startup();
+
const auto service = opCtx->getServiceContext();
auto const grid = Grid::get(service);
diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp
index c533f14a125..a48437febfd 100644
--- a/src/mongo/s/sharding_router_test_fixture.cpp
+++ b/src/mongo/s/sharding_router_test_fixture.cpp
@@ -58,6 +58,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
@@ -140,6 +141,8 @@ ShardingTestFixture::ShardingTestFixture() {
stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager)));
catalogClient->startup();
+ NumHostsTargetedMetrics::get(service).startup();
+
ConnectionString configCS = ConnectionString::forReplicaSet(
"configRS", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}});
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index b06b0c1c63b..86ced587670 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -435,6 +435,10 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
}
}
+ auto nShardsOwningChunks = batchOp.getNShardsOwningChunks();
+ if (nShardsOwningChunks.is_initialized())
+ stats->noteNumShardsOwningChunks(nShardsOwningChunks.get());
+
batchOp.buildClientResponse(clientResponse);
LOG(4) << "Finished execution of write batch"
@@ -456,6 +460,10 @@ void BatchWriteExecStats::noteWriteAt(const HostAndPort& host,
_writeOpTimes[ConnectionString(host)] = HostOpTime(opTime, electionId);
}
+void BatchWriteExecStats::noteNumShardsOwningChunks(const int nShardsOwningChunks) {
+ _numShardsOwningChunks.emplace(nShardsOwningChunks);
+}
+
const std::set<ShardId>& BatchWriteExecStats::getTargetedShards() const {
return _targetedShards;
}
@@ -464,4 +472,8 @@ const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const {
return _writeOpTimes;
}
+const boost::optional<int> BatchWriteExecStats::getNumShardsOwningChunks() const {
+ return _numShardsOwningChunks;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index f61926889b8..846668967a6 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -90,9 +90,11 @@ public:
void noteWriteAt(const HostAndPort& host, repl::OpTime opTime, const OID& electionId);
void noteTargetedShard(const ShardId& shardId);
+ void noteNumShardsOwningChunks(const int nShardsOwningChunks);
const std::set<ShardId>& getTargetedShards() const;
const HostOpTimeMap& getWriteOpTimes() const;
+ const boost::optional<int> getNumShardsOwningChunks() const;
// Expose via helpers if this gets more complex
@@ -108,6 +110,7 @@ public:
private:
std::set<ShardId> _targetedShards;
HostOpTimeMap _writeOpTimes;
+ boost::optional<int> _numShardsOwningChunks;
};
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 08f40961c07..5a822387116 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -36,6 +36,10 @@
#include "mongo/base/error_codes.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/s/client/num_hosts_targeted_metrics.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
#include "mongo/s/transaction_router.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
@@ -432,6 +436,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
targetedBatches->emplace(batch->getEndpoint().shardName, batch);
}
+ _nShardsOwningChunks = targeter.getNShardsOwningChunks();
+
return Status::OK();
}
@@ -813,6 +819,10 @@ int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const {
});
}
+boost::optional<int> BatchWriteOp::getNShardsOwningChunks() {
+ return _nShardsOwningChunks;
+}
+
void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) {
const auto batchType = _clientRequest.getBatchType();
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 41525d1dcc7..718e6de75e7 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -201,6 +201,8 @@ public:
*/
int numWriteOpsIn(WriteOpState state) const;
+ boost::optional<int> getNShardsOwningChunks();
+
private:
/**
* Maintains the batch execution statistics when a response is received.
@@ -242,6 +244,8 @@ private:
// Set to true if this write is part of a transaction.
const bool _inTransaction{false};
+
+ boost::optional<int> _nShardsOwningChunks;
};
/**
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index 39bb70a734b..44415122efa 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -797,6 +797,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
MONGO_UNREACHABLE;
}
+int ChunkManagerTargeter::getNShardsOwningChunks() const {
+ if (_routingInfo->cm()) {
+ return _routingInfo->cm()->getNShardsOwningChunks();
+ }
+
+ return 0;
+}
+
Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) {
Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*_routingInfo));
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index 2ace173e625..1c6b527d54a 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -108,6 +108,8 @@ public:
*/
Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override;
+ int getNShardsOwningChunks() const override;
+
private:
using ShardVersionMap = std::map<ShardId, ChunkVersion>;
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index 2e368fadf74..1223af8be82 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -129,6 +129,11 @@ public:
return Status::OK();
}
+ int getNShardsOwningChunks() const override {
+ // No-op
+ return 0;
+ }
+
private:
static ChunkRange _parseRange(const BSONObj& query) {
const StringData fieldName(query.firstElement().fieldName());