diff options
author | Janna Golden <janna.golden@mongodb.com> | 2019-12-17 16:41:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-13 20:50:43 +0000 |
commit | 9aef163066b2d4197d328fb112307c40f840a541 (patch) | |
tree | d23eb12347e9509400f3a003099fa0f60eb63194 | |
parent | c7d191f056d41de04e5d21e1a182022226c9d56d (diff) | |
download | mongo-9aef163066b2d4197d328fb112307c40f840a541.tar.gz |
SERVER-43617 Add metrics on the mongos to indicate the number of shards targeted for CRUD and agg commands
(cherry picked from commit 1fe9dfb1ade548488831bf29cdcc57636e5e3b8a)
21 files changed, 688 insertions, 61 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index ad28c13356e..392607b4af9 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -39,6 +39,7 @@ selector: - jstests/sharding/mongos_validate_writes.js - jstests/sharding/resume_change_stream_on_subset_of_shards.js - jstests/sharding/drop_sharded_db_tags_cleanup.js + - jstests/sharding/num_hosts_targeted_metrics.js # Can be un-blacklisted once the version of 3.6 used for 'last-stable' includes SERVER-34338's # backport. - jstests/sharding/explainFind_stale_mongos.js diff --git a/jstests/sharding/num_hosts_targeted_metrics.js b/jstests/sharding/num_hosts_targeted_metrics.js new file mode 100644 index 00000000000..b21f2ba288e --- /dev/null +++ b/jstests/sharding/num_hosts_targeted_metrics.js @@ -0,0 +1,211 @@ +/* + * This test checks that mongos reports the number of hosts targeted for a given command in server + * status. + */ + +(function() { + 'use strict'; + + const st = new ShardingTest({shards: 3}); + const mongos = st.s; + const testDb = mongos.getDB("test"); + + assert.commandWorked(mongos.adminCommand({enablesharding: "test"})); + st.ensurePrimaryShard("test", st.shard1.shardName); + + // Set up "test.coll0" + assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll0", key: {x: 1}})); + assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 5}})); + assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 200}})); + + // // Move chunks so each shard has one chunk. + assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll0", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true})); + assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll0", find: {x: 200}, to: st.shard2.shardName, _waitForDelete: true})); + + // Set up "test.coll1" + assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll1", key: {x: 1}})); + assert.commandWorked(testDb.adminCommand({split: "test.coll1", middle: {x: 5}})); + + // // Move chunk so only shards 0 and 1 have chunks. + assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll1", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true})); + + function assertShardingStats(initialStats, updatedStats, expectedChanges) { + for (let cmd of Object.keys(expectedChanges)) { + let targetedHostsVals = expectedChanges[cmd]; + for (let numTargetedHosts of Object.keys(targetedHostsVals)) { + let count = targetedHostsVals[numTargetedHosts]; + if (!initialStats) { + assert.eq(NumberInt(updatedStats[cmd][numTargetedHosts]), count); + } else { + assert.eq(NumberInt(initialStats[cmd][numTargetedHosts]) + count, + NumberInt(updatedStats[cmd][numTargetedHosts])); + } + } + } + } + + // ----- Check write targeting stats ----- + + let serverStatusInitial = testDb.serverStatus(); + assert.commandWorked(testDb.coll0.insert({x: 9})); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"oneShard": 1}}); + + serverStatusInitial = testDb.serverStatus(); + let bulk = testDb.coll0.initializeUnorderedBulkOp(); + for (var x = 0; x < 10; x++) { + bulk.insert({x: x}); + } + assert.commandWorked(bulk.execute()); + assertShardingStats({"insert": {"manyShards": 0}}, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"manyShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + bulk = testDb.unshardedCollection.initializeOrderedBulkOp(); + bulk.insert({j: -25}); + bulk.insert({j: 8}); + assert.commandWorked(bulk.execute()); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"unsharded": 1}}); + + bulk = testDb.coll0.initializeUnorderedBulkOp(); + bulk.find({x: 200}).update({$set: {a: -21}}); + bulk.find({x: -100}).update({$set: {a: -21}}); + bulk.find({x: 45}).update({$set: {a: -21}}); + assert.commandWorked(bulk.execute()); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"update": {"allShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + bulk = testDb.coll0.initializeUnorderedBulkOp(); + bulk.insert({x: -20}); + bulk.find({x: -20}).update({$set: {a: -21}}); + bulk.insert({x: 40}); + bulk.insert({x: 90}); + assert.commandWorked(bulk.execute()); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"oneShard": 2}, "update": {"oneShard": 1}}); + + // If delete targets more than one shard, we broadcast to all shards + serverStatusInitial = testDb.serverStatus(); + assert.commandWorked(testDb.coll0.remove({x: {$lt: 9}})); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"delete": {"allShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + bulk = testDb.coll1.initializeUnorderedBulkOp(); + bulk.insert({x: -100, a: 100}); + bulk.insert({x: 100, a: 100}); + assert.commandWorked(bulk.execute()); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"allShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + assert.commandWorked(testDb.coll1.update({a: 100}, {$set: {a: -21}}, {multi: true})); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"update": {"allShards": 1}}); + + // ----- Check find targeting stats ----- + + serverStatusInitial = testDb.serverStatus(); + let findRes = testDb.coll0.find({"x": {$gt: 8}}).itcount(); + assert.gte(findRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"manyShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + findRes = testDb.coll0.find({"x": {$lt: 300}}).itcount(); + assert.gte(findRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"allShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + findRes = testDb.coll0.find({"x": 40}).itcount(); + assert.gte(findRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"oneShard": 1}}); + + serverStatusInitial = testDb.serverStatus(); + findRes = testDb.unshardedCollection.find({"j": 8}).itcount(); + assert.gte(findRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"unsharded": 1}}); + + serverStatusInitial = testDb.serverStatus(); + findRes = testDb.coll1.find({a: -21}).itcount(); + assert.gte(findRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"allShards": 1}}); + + // ----- Check aggregate targeting stats ----- + + serverStatusInitial = testDb.serverStatus(); + let aggRes = + testDb.coll0.aggregate([{$match: {x: {$gte: 15, $lte: 100}}}].concat([{$sort: {x: 1}}])) + .itcount(); + assert.gte(aggRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"oneShard": 1}}); + + serverStatusInitial = testDb.serverStatus(); + aggRes = + testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 50}}}].concat([{$sort: {x: 1}}])) + .itcount(); + assert.gte(aggRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"manyShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + aggRes = + testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 500}}}].concat([{$sort: {x: 1}}])) + .itcount(); + assert.gte(aggRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"allShards": 1}}); + + serverStatusInitial = testDb.serverStatus(); + aggRes = testDb.unshardedCollection + .aggregate([{$match: {j: {$gte: -150, $lte: 50}}}].concat([{$sort: {j: 1}}])) + .itcount(); + assert.gte(aggRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"unsharded": 1}}); + + serverStatusInitial = testDb.serverStatus(); + aggRes = testDb.coll1 + .aggregate([{ + $lookup: { + from: "unshardedCollection", + localField: "x", + foreignField: "j", + as: "lookup" + } + }]) + .itcount(); + assert.gte(aggRes, 1); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"allShards": 1}}); + + st.stop(); +})(); diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 37e591e7d41..f9bd130f8eb 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -254,6 +254,10 @@ void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const { [](const ShardVersionMap::value_type& pair) { return pair.first; }); } +int RoutingTableHistory::getNShardsOwningChunks() const { + return _shardVersions.size(); +} + std::pair<ChunkInfoMap::const_iterator, ChunkInfoMap::const_iterator> RoutingTableHistory::overlappingRanges(const BSONObj& min, const BSONObj& max, diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 2bc8e37df00..331dc569cd2 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -133,6 +133,11 @@ public: void getAllShardIds(std::set<ShardId>* all) const; /** + * Returns the number of shards on which the collection has any chunks + */ + int getNShardsOwningChunks() const; + + /** * Returns true if, for this shard, the chunks are identical in both chunk managers */ bool compatibleWith(const RoutingTableHistory& other, const ShardId& shard) const; @@ -378,6 +383,13 @@ public: _rt->getAllShardIds(all); } + /** + * Returns the number of shards on which the collection has any chunks + */ + int getNShardsOwningChunks() { + return _rt->getNShardsOwningChunks(); + } + // Transforms query into bounds for each field in the shard key // for example : // Key { a: 1, b: 1 }, diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 5dbae27a390..091defec1a6 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -8,6 +8,7 @@ env.Library( target='sharding_client', source=[ 'shard_remote.cpp', + 'num_hosts_targeted_metrics.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/fetcher', diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.cpp b/src/mongo/s/client/num_hosts_targeted_metrics.cpp new file mode 100644 index 00000000000..82188f4f300 --- /dev/null +++ b/src/mongo/s/client/num_hosts_targeted_metrics.cpp @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/client/num_hosts_targeted_metrics.h" + +#include "mongo/base/init.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/curop.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/s/grid.h" + +namespace mongo { + +const auto getNumHostsTargeted = ServiceContext::declareDecoration<NumHostsTargetedMetrics>(); + +namespace { +std::string queryTypeToString(NumHostsTargetedMetrics::QueryType queryType) { + switch (queryType) { + case NumHostsTargetedMetrics::QueryType::kFindCmd: + return "find"; + case NumHostsTargetedMetrics::QueryType::kInsertCmd: + return "insert"; + case NumHostsTargetedMetrics::QueryType::kUpdateCmd: + return "update"; + case NumHostsTargetedMetrics::QueryType::kDeleteCmd: + return "delete"; + case NumHostsTargetedMetrics::QueryType::kAggregateCmd: + return "aggregate"; + default: + return ""; + } +} +} // namespace + +void NumHostsTargetedMetrics::addNumHostsTargeted(NumHostsTargetedMetrics::QueryType queryType, + NumHostsTargetedMetrics::TargetType targetType) { + switch (targetType) { + case TargetType::kAllShards: + _numHostsTargeted[queryType]->allShards.addAndFetch(1); + return; + case TargetType::kManyShards: + _numHostsTargeted[queryType]->manyShards.addAndFetch(1); + return; + case TargetType::kOneShard: + _numHostsTargeted[queryType]->oneShard.addAndFetch(1); + return; + case TargetType::kUnsharded: + _numHostsTargeted[queryType]->unsharded.addAndFetch(1); + return; + } +} + +void NumHostsTargetedMetrics::appendSection(BSONObjBuilder* builder) { + BSONObjBuilder numHostsTargetedStatsBuilder(builder->subobjStart("numHostsTargeted")); + for (auto i = 0; i < kNumQueryType; i++) { + auto& targetStat = _numHostsTargeted[i]; + auto queryType = static_cast<QueryType>(i); + BSONObjBuilder queryStatsBuilder( + numHostsTargetedStatsBuilder.subobjStart(queryTypeToString(queryType))); + queryStatsBuilder.appendNumber("allShards", targetStat->allShards.load()); + queryStatsBuilder.appendNumber("manyShards", targetStat->manyShards.load()); + queryStatsBuilder.appendNumber("oneShard", targetStat->oneShard.load()); + queryStatsBuilder.appendNumber("unsharded", targetStat->unsharded.load()); + } +} + +NumHostsTargetedMetrics::TargetType NumHostsTargetedMetrics::parseTargetType( + OperationContext* opCtx, int nShardsTargeted, int nShardsOwningChunks) { + // If nShardsOwningChunks == 0, this means the routing info did not contain a chunk manager so + // the collection is unsharded + if (nShardsOwningChunks == 0) + return TargetType::kUnsharded; + + if (nShardsTargeted == 1) + return TargetType::kOneShard; + + // If an update or delete targets > 1 shard, it will be broadcast to all shards in the cluster + // *regardless* of whether the shard owns any data for that collection. We should count this as + // 'allShards'. + if (nShardsTargeted >= nShardsOwningChunks) + return TargetType::kAllShards; + + return TargetType::kManyShards; +} + +NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(ServiceContext* serviceContext) { + return getNumHostsTargeted(serviceContext); +} + +NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +void NumHostsTargetedMetrics::startup() { + _numHostsTargeted.reserve(kNumQueryType); + for (auto i = 0; i < kNumQueryType; i++) { + _numHostsTargeted.push_back(std::make_unique<TargetStats>()); + } +} + +} // namespace mongo diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.h b/src/mongo/s/client/num_hosts_targeted_metrics.h new file mode 100644 index 00000000000..5f75cbbb069 --- /dev/null +++ b/src/mongo/s/client/num_hosts_targeted_metrics.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/platform/atomic_word.h" + + +namespace mongo { + +class BSONObjBuilder; +class OperationContext; +class ServiceContext; + +struct TargetStats { + // the op targeted all shards that own chunks for the collection as long as the total number of + // shards in the cluster is > 1 + AtomicWord<int> allShards; + + // the op targeted > 1 shard that own chunks for the collection as long as the total number of + // shards in the cluster is > 1 + AtomicWord<int> manyShards; + + // the op targeted 1 shard (if there exists only one shard, the metric will count as 'oneShard') + AtomicWord<int> oneShard; + + // the collection is unsharded + AtomicWord<int> unsharded; +}; + +class NumHostsTargetedMetrics { + +public: + enum QueryType { + kFindCmd, + kInsertCmd, + kUpdateCmd, + kDeleteCmd, + kAggregateCmd, + kNumQueryType, + }; + + enum TargetType { kAllShards, kManyShards, kOneShard, kUnsharded }; + + void addNumHostsTargeted(QueryType queryType, TargetType targetType); + + void appendSection(BSONObjBuilder* builder); + + TargetType parseTargetType(OperationContext* opCtx, + int nShardsTargeted, + int nShardsOwningChunks); + + static NumHostsTargetedMetrics& get(ServiceContext* serviceContext); + static NumHostsTargetedMetrics& get(OperationContext* opCtx); + + void startup(); + +private: + std::vector<std::unique_ptr<TargetStats>> _numHostsTargeted; +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 7262819e5d1..4a48f8819dd 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -59,6 +59,7 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/commands/pipeline_s.h" @@ -924,6 +925,54 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, appendEmptyResultSet(opCtx, *result, status, nss.ns()); } +void updateHostsTargetedMetrics(OperationContext* opCtx, + const NamespaceString& executionNss, + boost::optional<CachedCollectionRoutingInfo> executionNsRoutingInfo, + stdx::unordered_set<NamespaceString> involvedNamespaces) { + if (!executionNsRoutingInfo) + return; + + // Create a set of ShardIds that own a chunk belonging to any of the collections involved in + // this pipeline. This will be used to determine whether the pipeline targeted all of the shards + // that own chunks for any collection involved or not. + std::set<ShardId> shardsOwningChunks = [&]() { + std::set<ShardId> shardsIds; + + if (executionNsRoutingInfo->cm()) { + std::set<ShardId> shardIdsForNs; + executionNsRoutingInfo->cm()->getAllShardIds(&shardIdsForNs); + for (const auto& shardId : shardIdsForNs) { + shardsIds.insert(shardId); + } + } + + for (const auto& nss : involvedNamespaces) { + if (nss == executionNss) + continue; + + const auto resolvedNsRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + if (resolvedNsRoutingInfo.cm()) { + std::set<ShardId> shardIdsForNs; + resolvedNsRoutingInfo.cm()->getAllShardIds(&shardIdsForNs); + for (const auto& shardId : shardIdsForNs) { + shardsIds.insert(shardId); + } + } + } + + return shardsIds; + }(); + + auto nShardsTargeted = CurOp::get(opCtx)->debug().nShards; + if (nShardsTargeted > 0) { + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nShardsTargeted, shardsOwningChunks.size()); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted( + NumHostsTargetedMetrics::QueryType::kAggregateCmd, targetType); + } +} + } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, @@ -968,73 +1017,91 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); - // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does - // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(), - // then go ahead and pass it through to the owning shard unmodified. Note that we first call - // resolveInvolvedNamespaces to validate that none of the namespaces are sharded. - if (routingInfo && !routingInfo->cm() && !mustRunOnAll && - litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { - resolveInvolvedNamespaces(opCtx, litePipe); - const auto primaryShardId = routingInfo->db().primary()->getId(); - return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); - } + auto status = [&]() { + // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, + // does not need to run on all shards, and doesn't need to go through + // DocumentSource::serialize(), then go ahead and pass it through to the owning shard + // unmodified. Note that we first call resolveInvolvedNamespaces to validate that none of + // the namespaces are sharded. + if (routingInfo && !routingInfo->cm() && !mustRunOnAll && + litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { + resolveInvolvedNamespaces(opCtx, litePipe); + const auto primaryShardId = routingInfo->db().primary()->getId(); + return aggPassthrough( + opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); + } - // Populate the collection UUID and the appropriate collation to use. - auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); - BSONObj collationObj = collInfo.first; - boost::optional<UUID> uuid = collInfo.second; + // Populate the collection UUID and the appropriate collation to use. + auto collInfo = + getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); + BSONObj collationObj = collInfo.first; + boost::optional<UUID> uuid = collInfo.second; + + // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, + // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by + // the pipeline's stages. + auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid); + + // Parse and optimize the full pipeline. + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + pipeline->optimizePipeline(); + + // Check whether the entire pipeline must be run on mongoS. + if (pipeline->requiredToRunOnMongos()) { + return runPipelineOnMongoS( + expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result); + } - // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, - // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the - // pipeline's stages. - auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid); + // If not, split the pipeline as necessary and dispatch to the relevant shards. + auto shardDispatchResults = dispatchShardPipeline(expCtx, + namespaces.executionNss, + cmdObj, + request, + litePipe, + std::move(pipeline), + collationObj); + + // If the operation is an explain, then we verify that it succeeded on all targeted shards, + // write the results to the output builder, and return immediately. + if (expCtx->explain) { + uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput); + return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput), + expCtx, + shardDispatchResults.pipelineForTargetedShards, + shardDispatchResults.pipelineForMerging, + result); + } - // Parse and optimize the full pipeline. - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); - pipeline->optimizePipeline(); + // If this isn't an explain, then we must have established cursors on at least one shard. + invariant(shardDispatchResults.remoteCursors.size() > 0); + + // If we sent the entire pipeline to a single shard, store the remote cursor and return. + if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) { + invariant(shardDispatchResults.remoteCursors.size() == 1); + auto& remoteCursor = shardDispatchResults.remoteCursors.front(); + const auto reply = uassertStatusOK(storePossibleCursor( + opCtx, namespaces.requestedNss, remoteCursor, expCtx->tailableMode)); + return appendCursorResponseToCommandResult( + remoteCursor.getShardId().toString(), reply, result); + } - // Check whether the entire pipeline must be run on mongoS. - if (pipeline->requiredToRunOnMongos()) { - return runPipelineOnMongoS( - expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), {}, result); - } + // If we reach here, we have a merge pipeline to dispatch. + return dispatchMergingPipeline(expCtx, + namespaces, + request, + cmdObj, + litePipe, + routingInfo, + shardDispatchResults, + result); - // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline(expCtx, - namespaces.executionNss, - cmdObj, - request, - litePipe, - std::move(pipeline), - collationObj); - - // If the operation is an explain, then we verify that it succeeded on all targeted shards, - // write the results to the output builder, and return immediately. - if (expCtx->explain) { - uassertAllShardsSupportExplain(shardDispatchResults.remoteExplainOutput); - return appendExplainResults(std::move(shardDispatchResults.remoteExplainOutput), - expCtx, - shardDispatchResults.pipelineForTargetedShards, - shardDispatchResults.pipelineForMerging, - result); - } + }(); - // If this isn't an explain, then we must have established cursors on at least one shard. - invariant(shardDispatchResults.remoteCursors.size() > 0); - - // If we sent the entire pipeline to a single shard, store the remote cursor and return. - if (!shardDispatchResults.pipelineForTargetedShards->isSplitForShards()) { - invariant(shardDispatchResults.remoteCursors.size() == 1); - auto& remoteCursor = shardDispatchResults.remoteCursors.front(); - const auto reply = uassertStatusOK(storePossibleCursor( - opCtx, namespaces.requestedNss, remoteCursor, expCtx->tailableMode)); - return appendCursorResponseToCommandResult( - remoteCursor.getShardId().toString(), reply, result); - } + if (status.isOK()) + updateHostsTargetedMetrics( + opCtx, namespaces.executionNss, routingInfo, litePipe.getInvolvedNamespaces()); - // If we reach here, we have a merge pipeline to dispatch. - return dispatchMergingPipeline( - expCtx, namespaces, request, cmdObj, litePipe, routingInfo, shardDispatchResults, result); + return status; } void ClusterAggregate::uassertAllShardsSupportExplain( diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index d13f9a53cfc..75787a9e4b8 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -43,6 +43,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/async_requests_sender.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/cluster_explain.h" @@ -121,6 +122,30 @@ void batchErrorToLastError(const BatchedCommandRequest& request, } } +void updateHostsTargetedMetrics(OperationContext* opCtx, + BatchedCommandRequest::BatchType batchType, + int nShardsOwningChunks, + int nShardsTargeted) { + NumHostsTargetedMetrics::QueryType writeType; + switch (batchType) { + case BatchedCommandRequest::BatchType_Insert: + writeType = NumHostsTargetedMetrics::QueryType::kInsertCmd; + break; + case BatchedCommandRequest::BatchType_Update: + writeType = NumHostsTargetedMetrics::QueryType::kUpdateCmd; + break; + case BatchedCommandRequest::BatchType_Delete: + writeType = NumHostsTargetedMetrics::QueryType::kDeleteCmd; + break; + + MONGO_UNREACHABLE; + } + + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nShardsTargeted, nShardsOwningChunks); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(writeType, targetType); +} + /** * Base class for mongos write commands. */ @@ -300,6 +325,12 @@ private: // Record the number of shards targeted by this write. CurOp::get(opCtx)->debug().nShards = stats.getTargetedShards().size(); + if (stats.getNumShardsOwningChunks().is_initialized()) + updateHostsTargetedMetrics(opCtx, + _batchedRequest.getBatchType(), + stats.getNumShardsOwningChunks().get(), + stats.getTargetedShards().size()); + result.appendElements(response.toBSON()); return response.getOk(); } diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index ddd2237de40..0e4bd5845dd 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -160,6 +160,11 @@ public: * Returns !OK with message if could not refresh */ virtual Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0; + + /** + * Returns the number of shards that own one or more chunks for the targeted collection. + */ + virtual int getNShardsOwningChunks() const = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index aef247fcc48..5e8f1ecec9a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -53,6 +53,7 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -216,6 +217,20 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( return requests; } +void updateNumHostsTargetedMetrics(OperationContext* opCtx, + const CachedCollectionRoutingInfo& routingInfo, + int nTargetedShards) { + int nShardsOwningChunks = 0; + if (routingInfo.cm()) { + nShardsOwningChunks = routingInfo.cm()->getNShardsOwningChunks(); + } + + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nTargetedShards, nShardsOwningChunks); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted( + NumHostsTargetedMetrics::QueryType::kFindCmd, targetType); +} + CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, @@ -355,6 +370,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // allocate a cursor id. if (cursorState == ClusterCursorManager::CursorState::Exhausted) { CurOp::get(opCtx)->debug().cursorExhausted = true; + + if (shardIds.size() > 0) { + updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + } return CursorId(0); } @@ -371,6 +390,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Record the cursorID in CurOp. CurOp::get(opCtx)->debug().cursorid = cursorId; + + if (shardIds.size() > 0) { + updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + } + return cursorId; } diff --git a/src/mongo/s/s_sharding_server_status.cpp b/src/mongo/s/s_sharding_server_status.cpp index 01c14f9ca22..dadaae1aa8f 100644 --- a/src/mongo/s/s_sharding_server_status.cpp +++ b/src/mongo/s/s_sharding_server_status.cpp @@ -34,6 +34,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -81,8 +82,11 @@ public: const BSONElement& configElement) const override { auto const grid = Grid::get(opCtx); auto const catalogCache = grid->catalogCache(); + auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx); BSONObjBuilder result; + + numHostsTargetedMetrics.appendSection(&result); catalogCache->report(&result); return result.obj(); } diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 493d160ba45..addb251dd4a 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -59,6 +59,7 @@ #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" @@ -228,6 +229,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx, std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize); executorPool->startup(); + auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx); + numHostsTargetedMetrics.startup(); + auto const grid = Grid::get(opCtx); grid->init( makeCatalogClient(opCtx->getServiceContext(), distLockProcessId), diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index 38bf1b708ef..d74c03bed43 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -59,6 +59,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" @@ -142,6 +143,8 @@ ShardingTestFixture::ShardingTestFixture() { stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager))); catalogClient->startup(); + NumHostsTargetedMetrics::get(service).startup(); + ConnectionString configCS = ConnectionString::forReplicaSet( "configRS", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}}); diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index f30a063d671..54bfdf6572f 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -364,6 +364,10 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, } } + auto nShardsOwningChunks = batchOp.getNShardsOwningChunks(); + if (nShardsOwningChunks.is_initialized()) + stats->noteNumShardsOwningChunks(nShardsOwningChunks.get()); + batchOp.buildClientResponse(clientResponse); LOG(4) << "Finished execution of write batch" @@ -385,6 +389,10 @@ void BatchWriteExecStats::noteWriteAt(const HostAndPort& host, _writeOpTimes[ConnectionString(host)] = HostOpTime(opTime, electionId); } +void BatchWriteExecStats::noteNumShardsOwningChunks(const int nShardsOwningChunks) { + _numShardsOwningChunks.emplace(nShardsOwningChunks); +} + const std::set<ShardId>& BatchWriteExecStats::getTargetedShards() const { return _targetedShards; } @@ -393,4 +401,8 @@ const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { return _writeOpTimes; } -} // namespace +const boost::optional<int> BatchWriteExecStats::getNumShardsOwningChunks() const { + return _numShardsOwningChunks; +} + +} // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index d53a29c0d64..04519bad696 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -91,9 +91,11 @@ public: void noteWriteAt(const HostAndPort& host, repl::OpTime opTime, const OID& electionId); void noteTargetedShard(const ShardId& shardId); + void noteNumShardsOwningChunks(const int nShardsOwningChunks); const std::set<ShardId>& getTargetedShards() const; const HostOpTimeMap& getWriteOpTimes() const; + const boost::optional<int> getNumShardsOwningChunks() const; // Expose via helpers if this gets more complex @@ -109,6 +111,7 @@ public: private: std::set<ShardId> _targetedShards; HostOpTimeMap _writeOpTimes; + boost::optional<int> _numShardsOwningChunks; }; } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 14e3fce88c1..7a4b0f88efe 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -36,6 +36,7 @@ #include "mongo/base/error_codes.h" #include "mongo/db/operation_context.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/stdx/memory.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -378,6 +379,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, targetedBatches->emplace(batch->getEndpoint().shardName, batch); } + _nShardsOwningChunks = targeter.getNShardsOwningChunks(); + return Status::OK(); } @@ -750,6 +753,10 @@ int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const { }); } +boost::optional<int> BatchWriteOp::getNShardsOwningChunks() { + return _nShardsOwningChunks; +} + void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) { const auto batchType = _clientRequest.getBatchType(); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 5ea5f1261c5..8e36927ffa3 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -190,6 +190,8 @@ public: */ int numWriteOpsIn(WriteOpState state) const; + boost::optional<int> getNShardsOwningChunks(); + private: /** * Maintains the batch execution statistics when a response is received. @@ -228,6 +230,8 @@ private: int _numMatched{0}; int _numModified{0}; int _numDeleted{0}; + + boost::optional<int> _nShardsOwningChunks; }; /** diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index 6cd063bf1f7..0bc433155bf 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -761,6 +761,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC MONGO_UNREACHABLE; } +int ChunkManagerTargeter::getNShardsOwningChunks() const { + if (_routingInfo->cm()) { + return _routingInfo->cm()->getNShardsOwningChunks(); + } + + return 0; +} + Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) { Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*_routingInfo)); diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 28fb1300d62..151902b8087 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -100,6 +100,8 @@ public: */ Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override; + int getNShardsOwningChunks() const override; + private: using ShardVersionMap = std::map<ShardId, ChunkVersion>; diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index 1eee9dfc0ea..e456e67f24e 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -130,6 +130,11 @@ public: return Status::OK(); } + int getNShardsOwningChunks() const override { + // No-op + return 0; + } + private: static ChunkRange _parseRange(const BSONObj& query) { const StringData fieldName(query.firstElement().fieldName()); |