diff options
author | Janna Golden <janna.golden@mongodb.com> | 2019-12-17 16:41:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-11 19:44:06 +0000 |
commit | c6533b269898f6cd077ffca7b2cc9545863313e9 (patch) | |
tree | 045e5caa5e58cd6916e8204d35af5a5a67f93b38 | |
parent | 93c36da06fa36454a7f8ff77ce2c86a07ba97e4f (diff) | |
download | mongo-c6533b269898f6cd077ffca7b2cc9545863313e9.tar.gz |
SERVER-43617 Add metrics on the mongos to indicate the number of shards targeted for CRUD and agg commands
(cherry picked from commit 1fe9dfb1ade548488831bf29cdcc57636e5e3b8a)
20 files changed, 710 insertions, 80 deletions
diff --git a/jstests/sharding/num_hosts_targeted_metrics.js b/jstests/sharding/num_hosts_targeted_metrics.js new file mode 100644 index 00000000000..be5cc88e2e2 --- /dev/null +++ b/jstests/sharding/num_hosts_targeted_metrics.js @@ -0,0 +1,220 @@ +/* + * This test checks that mongos reports the number of hosts targeted for a given command in + * serverStatus. + * @tags: [uses_transactions, uses_multi_shard_transaction] + */ + +(function() { +'use strict'; + +const st = new ShardingTest({shards: 3}); +const mongos = st.s; +const testDb = mongos.getDB("test"); + +assert.commandWorked(mongos.adminCommand({enablesharding: "test"})); +st.ensurePrimaryShard("test", st.shard1.shardName); + +// Set up "test.coll0" +assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll0", key: {x: 1}})); +assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 5}})); +assert.commandWorked(testDb.adminCommand({split: "test.coll0", middle: {x: 200}})); + +// // Move chunks so each shard has one chunk. +assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll0", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true})); +assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll0", find: {x: 200}, to: st.shard2.shardName, _waitForDelete: true})); + +// Set up "test.coll1" +assert.commandWorked(mongos.adminCommand({shardcollection: "test.coll1", key: {x: 1}})); +assert.commandWorked(testDb.adminCommand({split: "test.coll1", middle: {x: 5}})); + +// // Move chunk so only shards 0 and 1 have chunks. +assert.commandWorked(mongos.adminCommand( + {moveChunk: "test.coll1", find: {x: 2}, to: st.shard0.shardName, _waitForDelete: true})); + +function assertShardingStats(initialStats, updatedStats, expectedChanges) { + for (let [cmd, targetedHostsVals] of Object.entries(expectedChanges)) { + for (let [numTargetedHosts, count] of Object.entries(targetedHostsVals)) { + if (!initialStats) { + assert.eq(NumberInt(updatedStats[cmd][numTargetedHosts]), count); + } else { + assert.eq(NumberInt(initialStats[cmd][numTargetedHosts]) + count, + NumberInt(updatedStats[cmd][numTargetedHosts])); + } + } + } +} + +// ----- Check write targeting stats ----- + +let serverStatusInitial = testDb.serverStatus(); +assert.commandWorked(testDb.coll0.insert({x: 9})); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"oneShard": 1}}); + +serverStatusInitial = testDb.serverStatus(); +let bulk = testDb.coll0.initializeUnorderedBulkOp(); +for (var x = 0; x < 10; x++) { + bulk.insert({x: x}); +} +assert.commandWorked(bulk.execute()); +assertShardingStats({"insert": {"manyShards": 0}}, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"manyShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +bulk = testDb.unshardedCollection.initializeOrderedBulkOp(); +bulk.insert({j: -25}); +bulk.insert({j: 8}); +assert.commandWorked(bulk.execute()); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"unsharded": 1}}); + +bulk = testDb.coll0.initializeUnorderedBulkOp(); +bulk.find({x: 200}).update({$set: {a: -21}}); +bulk.find({x: -100}).update({$set: {a: -21}}); +bulk.find({x: 45}).update({$set: {a: -21}}); +assert.commandWorked(bulk.execute()); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"update": {"allShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +bulk = testDb.coll0.initializeUnorderedBulkOp(); +bulk.insert({x: -20}); +bulk.find({x: -20}).update({$set: {a: -21}}); +bulk.insert({x: 40}); +bulk.insert({x: 90}); +assert.commandWorked(bulk.execute()); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"oneShard": 2}, "update": {"oneShard": 1}}); + +// If delete targets more than one shard, we broadcast to all shards if the write is not in a +// transaction +serverStatusInitial = testDb.serverStatus(); +assert.commandWorked(testDb.coll0.remove({x: {$lt: 9}})); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"delete": {"allShards": 1}}); + +// If delete targets more than one shard, we *do not* broadcast to all shards when the write is in a +// transaction +if (jsTestOptions().mongosBinVersion !== 'last-lts' && + jsTestOptions().mongosBinVersion !== 'last-stable') { + let session = mongos.startSession(); + let sessionDB = session.getDatabase("test"); + session.startTransaction(); + serverStatusInitial = testDb.serverStatus(); + assert.commandWorked(sessionDB.coll0.remove({x: {$lt: 9}})); + session.commitTransaction(); + assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"delete": {"manyShards": 1}}); +} + +serverStatusInitial = testDb.serverStatus(); +bulk = testDb.coll1.initializeUnorderedBulkOp(); +bulk.insert({x: -100, a: 100}); +bulk.insert({x: 100, a: 100}); +assert.commandWorked(bulk.execute()); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"insert": {"allShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +assert.commandWorked(testDb.coll1.update({a: 100}, {$set: {a: -21}}, {multi: true})); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"update": {"allShards": 1}}); + +// ----- Check find targeting stats ----- + +serverStatusInitial = testDb.serverStatus(); +let findRes = testDb.coll0.find({"x": {$gt: 8}}).itcount(); +assert.gte(findRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"manyShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +findRes = testDb.coll0.find({"x": {$lt: 300}}).itcount(); +assert.gte(findRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"allShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +findRes = testDb.coll0.find({"x": 40}).itcount(); +assert.gte(findRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"oneShard": 1}}); + +serverStatusInitial = testDb.serverStatus(); +findRes = testDb.unshardedCollection.find({"j": 8}).itcount(); +assert.gte(findRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"unsharded": 1}}); + +serverStatusInitial = testDb.serverStatus(); +findRes = testDb.coll1.find({a: -21}).itcount(); +assert.gte(findRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"find": {"allShards": 1}}); + +// ----- Check aggregate targeting stats ----- + +serverStatusInitial = testDb.serverStatus(); +let aggRes = + testDb.coll0.aggregate([{$match: {x: {$gte: 15, $lte: 100}}}].concat([{$sort: {x: 1}}])) + .itcount(); +assert.gte(aggRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"oneShard": 1}}); + +serverStatusInitial = testDb.serverStatus(); +aggRes = testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 50}}}].concat([{$sort: {x: 1}}])) + .itcount(); +assert.gte(aggRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"manyShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +aggRes = testDb.coll0.aggregate([{$match: {x: {$gte: -150, $lte: 500}}}].concat([{$sort: {x: 1}}])) + .itcount(); +assert.gte(aggRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"allShards": 1}}); + +serverStatusInitial = testDb.serverStatus(); +aggRes = testDb.unshardedCollection + .aggregate([{$match: {j: {$gte: -150, $lte: 50}}}].concat([{$sort: {j: 1}}])) + .itcount(); +assert.gte(aggRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"unsharded": 1}}); + +serverStatusInitial = testDb.serverStatus(); +aggRes = + testDb.coll1 + .aggregate([{ + $lookup: {from: "unshardedCollection", localField: "x", foreignField: "j", as: "lookup"} + }]) + .itcount(); +assert.gte(aggRes, 1); +assertShardingStats(serverStatusInitial.shardingStatistics.numHostsTargeted, + testDb.serverStatus().shardingStatistics.numHostsTargeted, + {"aggregate": {"allShards": 1}}); + +st.stop(); +})(); diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index bd8a4cf2e72..2c79b51be75 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -258,6 +258,10 @@ void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const { [](const ShardVersionMap::value_type& pair) { return pair.first; }); } +int RoutingTableHistory::getNShardsOwningChunks() const { + return _shardVersions.size(); +} + std::pair<ChunkInfoMap::const_iterator, ChunkInfoMap::const_iterator> RoutingTableHistory::overlappingRanges(const BSONObj& min, const BSONObj& max, diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 8544eccf878..c6ece9146e8 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -132,6 +132,11 @@ public: void getAllShardIds(std::set<ShardId>* all) const; /** + * Returns the number of shards on which the collection has any chunks + */ + int getNShardsOwningChunks() const; + + /** * Returns true if, for this shard, the chunks are identical in both chunk managers */ bool compatibleWith(const RoutingTableHistory& other, const ShardId& shard) const; @@ -361,6 +366,13 @@ public: _rt->getAllShardIds(all); } + /** + * Returns the number of shards on which the collection has any chunks + */ + int getNShardsOwningChunks() { + return _rt->getNShardsOwningChunks(); + } + // Transforms query into bounds for each field in the shard key // for example : // Key { a: 1, b: 1 }, diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 6f978b359b7..e099979a909 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -8,6 +8,7 @@ env.Library( target='sharding_client', source=[ 'shard_remote.cpp', + 'num_hosts_targeted_metrics.cpp', env.Idlc('shard_remote.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.cpp b/src/mongo/s/client/num_hosts_targeted_metrics.cpp new file mode 100644 index 00000000000..82188f4f300 --- /dev/null +++ b/src/mongo/s/client/num_hosts_targeted_metrics.cpp @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/client/num_hosts_targeted_metrics.h" + +#include "mongo/base/init.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/curop.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/s/grid.h" + +namespace mongo { + +const auto getNumHostsTargeted = ServiceContext::declareDecoration<NumHostsTargetedMetrics>(); + +namespace { +std::string queryTypeToString(NumHostsTargetedMetrics::QueryType queryType) { + switch (queryType) { + case NumHostsTargetedMetrics::QueryType::kFindCmd: + return "find"; + case NumHostsTargetedMetrics::QueryType::kInsertCmd: + return "insert"; + case NumHostsTargetedMetrics::QueryType::kUpdateCmd: + return "update"; + case NumHostsTargetedMetrics::QueryType::kDeleteCmd: + return "delete"; + case NumHostsTargetedMetrics::QueryType::kAggregateCmd: + return "aggregate"; + default: + return ""; + } +} +} // namespace + +void NumHostsTargetedMetrics::addNumHostsTargeted(NumHostsTargetedMetrics::QueryType queryType, + NumHostsTargetedMetrics::TargetType targetType) { + switch (targetType) { + case TargetType::kAllShards: + _numHostsTargeted[queryType]->allShards.addAndFetch(1); + return; + case TargetType::kManyShards: + _numHostsTargeted[queryType]->manyShards.addAndFetch(1); + return; + case TargetType::kOneShard: + _numHostsTargeted[queryType]->oneShard.addAndFetch(1); + return; + case TargetType::kUnsharded: + _numHostsTargeted[queryType]->unsharded.addAndFetch(1); + return; + } +} + +void NumHostsTargetedMetrics::appendSection(BSONObjBuilder* builder) { + BSONObjBuilder numHostsTargetedStatsBuilder(builder->subobjStart("numHostsTargeted")); + for (auto i = 0; i < kNumQueryType; i++) { + auto& targetStat = _numHostsTargeted[i]; + auto queryType = static_cast<QueryType>(i); + BSONObjBuilder queryStatsBuilder( + numHostsTargetedStatsBuilder.subobjStart(queryTypeToString(queryType))); + queryStatsBuilder.appendNumber("allShards", targetStat->allShards.load()); + queryStatsBuilder.appendNumber("manyShards", targetStat->manyShards.load()); + queryStatsBuilder.appendNumber("oneShard", targetStat->oneShard.load()); + queryStatsBuilder.appendNumber("unsharded", targetStat->unsharded.load()); + } +} + +NumHostsTargetedMetrics::TargetType NumHostsTargetedMetrics::parseTargetType( + OperationContext* opCtx, int nShardsTargeted, int nShardsOwningChunks) { + // If nShardsOwningChunks == 0, this means the routing info did not contain a chunk manager so + // the collection is unsharded + if (nShardsOwningChunks == 0) + return TargetType::kUnsharded; + + if (nShardsTargeted == 1) + return TargetType::kOneShard; + + // If an update or delete targets > 1 shard, it will be broadcast to all shards in the cluster + // *regardless* of whether the shard owns any data for that collection. We should count this as + // 'allShards'. + if (nShardsTargeted >= nShardsOwningChunks) + return TargetType::kAllShards; + + return TargetType::kManyShards; +} + +NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(ServiceContext* serviceContext) { + return getNumHostsTargeted(serviceContext); +} + +NumHostsTargetedMetrics& NumHostsTargetedMetrics::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +void NumHostsTargetedMetrics::startup() { + _numHostsTargeted.reserve(kNumQueryType); + for (auto i = 0; i < kNumQueryType; i++) { + _numHostsTargeted.push_back(std::make_unique<TargetStats>()); + } +} + +} // namespace mongo diff --git a/src/mongo/s/client/num_hosts_targeted_metrics.h b/src/mongo/s/client/num_hosts_targeted_metrics.h new file mode 100644 index 00000000000..5f75cbbb069 --- /dev/null +++ b/src/mongo/s/client/num_hosts_targeted_metrics.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/platform/atomic_word.h" + + +namespace mongo { + +class BSONObjBuilder; +class OperationContext; +class ServiceContext; + +struct TargetStats { + // the op targeted all shards that own chunks for the collection as long as the total number of + // shards in the cluster is > 1 + AtomicWord<int> allShards; + + // the op targeted > 1 shard that own chunks for the collection as long as the total number of + // shards in the cluster is > 1 + AtomicWord<int> manyShards; + + // the op targeted 1 shard (if there exists only one shard, the metric will count as 'oneShard') + AtomicWord<int> oneShard; + + // the collection is unsharded + AtomicWord<int> unsharded; +}; + +class NumHostsTargetedMetrics { + +public: + enum QueryType { + kFindCmd, + kInsertCmd, + kUpdateCmd, + kDeleteCmd, + kAggregateCmd, + kNumQueryType, + }; + + enum TargetType { kAllShards, kManyShards, kOneShard, kUnsharded }; + + void addNumHostsTargeted(QueryType queryType, TargetType targetType); + + void appendSection(BSONObjBuilder* builder); + + TargetType parseTargetType(OperationContext* opCtx, + int nShardsTargeted, + int nShardsOwningChunks); + + static NumHostsTargetedMetrics& get(ServiceContext* serviceContext); + static NumHostsTargetedMetrics& get(OperationContext* opCtx); + + void startup(); + +private: + std::vector<std::unique_ptr<TargetStats>> _numHostsTargeted; +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 0be5d7c0af4..fce0171ed69 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -46,6 +46,7 @@ #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_last_error_info.h" @@ -320,6 +321,30 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, return updatedShardKey; } +void updateHostsTargetedMetrics(OperationContext* opCtx, + BatchedCommandRequest::BatchType batchType, + int nShardsOwningChunks, + int nShardsTargeted) { + NumHostsTargetedMetrics::QueryType writeType; + switch (batchType) { + case BatchedCommandRequest::BatchType_Insert: + writeType = NumHostsTargetedMetrics::QueryType::kInsertCmd; + break; + case BatchedCommandRequest::BatchType_Update: + writeType = NumHostsTargetedMetrics::QueryType::kUpdateCmd; + break; + case BatchedCommandRequest::BatchType_Delete: + writeType = NumHostsTargetedMetrics::QueryType::kDeleteCmd; + break; + + MONGO_UNREACHABLE; + } + + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nShardsTargeted, nShardsOwningChunks); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted(writeType, targetType); +} + /** * Base class for mongos write commands. */ @@ -534,6 +559,13 @@ private: CurOp::get(opCtx)->debug().nShards = stats.getTargetedShards().size() + (updatedShardKey ? 1 : 0); + if (stats.getNumShardsOwningChunks().is_initialized()) + updateHostsTargetedMetrics(opCtx, + _batchedRequest.getBatchType(), + stats.getNumShardsOwningChunks().get(), + stats.getTargetedShards().size() + + (updatedShardKey ? 1 : 0)); + if (auto txnRouter = TransactionRouter::get(opCtx)) { auto writeCmdStatus = response.toStatus(); if (!writeCmdStatus.isOK()) { diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 92e29547705..e7f01f3139a 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -159,6 +159,11 @@ public: * Returns !OK with message if could not refresh */ virtual Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0; + + /** + * Returns the number of shards that own one or more chunks for the targeted collection. + */ + virtual int getNShardsOwningChunks() const = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index bc4b9191c90..74f2132b4b0 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -60,6 +60,7 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -742,6 +743,54 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, appendEmptyResultSet(opCtx, *result, status, nss.ns()); } +void updateHostsTargetedMetrics(OperationContext* opCtx, + const NamespaceString& executionNss, + boost::optional<CachedCollectionRoutingInfo> executionNsRoutingInfo, + stdx::unordered_set<NamespaceString> involvedNamespaces) { + if (!executionNsRoutingInfo) + return; + + // Create a set of ShardIds that own a chunk belonging to any of the collections involved in + // this pipeline. This will be used to determine whether the pipeline targeted all of the shards + // that own chunks for any collection involved or not. + std::set<ShardId> shardsOwningChunks = [&]() { + std::set<ShardId> shardsIds; + + if (executionNsRoutingInfo->cm()) { + std::set<ShardId> shardIdsForNs; + executionNsRoutingInfo->cm()->getAllShardIds(&shardIdsForNs); + for (const auto& shardId : shardIdsForNs) { + shardsIds.insert(shardId); + } + } + + for (const auto& nss : involvedNamespaces) { + if (nss == executionNss) + continue; + + const auto resolvedNsRoutingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + if (resolvedNsRoutingInfo.cm()) { + std::set<ShardId> shardIdsForNs; + resolvedNsRoutingInfo.cm()->getAllShardIds(&shardIdsForNs); + for (const auto& shardId : shardIdsForNs) { + shardsIds.insert(shardId); + } + } + } + + return shardsIds; + }(); + + auto nShardsTargeted = CurOp::get(opCtx)->debug().nShards; + if (nShardsTargeted > 0) { + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nShardsTargeted, shardsOwningChunks.size()); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted( + NumHostsTargetedMetrics::QueryType::kAggregateCmd, targetType); + } +} + } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, @@ -813,94 +862,103 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto resolvedNamespaces = resolveInvolvedNamespaces(opCtx, litePipe, request.getNamespaceString()); - // A pipeline is allowed to passthrough to the primary shard iff the following conditions are - // met: - // - // 1. The namespace of the aggregate and any other involved namespaces are unsharded. - // 2. Is allowed to be forwarded to shards. - // 3. Does not need to run on all shards. - // 4. Doesn't need transformation via DocumentSource::serialize(). - if (routingInfo && !routingInfo->cm() && !mustRunOnAll && - litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) { - const auto primaryShardId = routingInfo->db().primary()->getId(); - return aggPassthrough( - opCtx, namespaces, primaryShardId, request, litePipe, privileges, result); - } - - // Populate the collection UUID and the appropriate collation to use. - auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); - BSONObj collationObj = collInfo.first; - boost::optional<UUID> uuid = collInfo.second; - - // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, - // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the - // pipeline's stages. - auto expCtx = makeExpressionContext( - opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces)); - - // Parse and optimize the full pipeline. - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); - pipeline->optimizePipeline(); - - // Check whether the entire pipeline must be run on mongoS. - if (pipeline->requiredToRunOnMongos()) { - // If this is an explain write the explain output and return. - if (expCtx->explain) { - *result << "splitPipeline" << BSONNULL << "mongos" - << Document{{"host", getHostNameCachedAndPort()}, - {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; - return Status::OK(); + auto status = [&]() { + // A pipeline is allowed to passthrough to the primary shard iff the following conditions + // are met: + // + // 1. The namespace of the aggregate and any other involved namespaces are unsharded. + // 2. Is allowed to be forwarded to shards. + // 3. Does not need to run on all shards. + // 4. Doesn't need transformation via DocumentSource::serialize(). + if (routingInfo && !routingInfo->cm() && !mustRunOnAll && + litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) { + const auto primaryShardId = routingInfo->db().primary()->getId(); + return aggPassthrough( + opCtx, namespaces, primaryShardId, request, litePipe, privileges, result); } - return runPipelineOnMongoS( - expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges); - } + // Populate the collection UUID and the appropriate collation to use. + auto collInfo = + getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); + BSONObj collationObj = collInfo.first; + boost::optional<UUID> uuid = collInfo.second; + + // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, + // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by + // the pipeline's stages. + auto expCtx = makeExpressionContext( + opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces)); + + // Parse and optimize the full pipeline. + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + pipeline->optimizePipeline(); + + // Check whether the entire pipeline must be run on mongoS. + if (pipeline->requiredToRunOnMongos()) { + // If this is an explain write the explain output and return. + if (expCtx->explain) { + *result << "splitPipeline" << BSONNULL << "mongos" + << Document{{"host", getHostNameCachedAndPort()}, + {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; + return Status::OK(); + } - // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( - expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); + return runPipelineOnMongoS( + expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges); + } - // If the operation is an explain, then we verify that it succeeded on all targeted shards, - // write the results to the output builder, and return immediately. - if (expCtx->explain) { - return appendExplainResults(std::move(shardDispatchResults), expCtx, result); - } + // If not, split the pipeline as necessary and dispatch to the relevant shards. + auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( + expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); - // If this isn't an explain, then we must have established cursors on at least one shard. - invariant(shardDispatchResults.remoteCursors.size() > 0); + // If the operation is an explain, then we verify that it succeeded on all targeted shards, + // write the results to the output builder, and return immediately. + if (expCtx->explain) { + return appendExplainResults(std::move(shardDispatchResults), expCtx, result); + } - // If we sent the entire pipeline to a single shard, store the remote cursor and return. - if (!shardDispatchResults.splitPipeline) { - invariant(shardDispatchResults.remoteCursors.size() == 1); - auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front()); - const auto shardId = remoteCursor->getShardId().toString(); - const auto reply = uassertStatusOK(storePossibleCursor(opCtx, - namespaces.requestedNss, - std::move(remoteCursor), - privileges, - expCtx->tailableMode)); - return appendCursorResponseToCommandResult(shardId, reply, result); - } + // If this isn't an explain, then we must have established cursors on at least one shard. + invariant(shardDispatchResults.remoteCursors.size() > 0); + + // If we sent the entire pipeline to a single shard, store the remote cursor and return. + if (!shardDispatchResults.splitPipeline) { + invariant(shardDispatchResults.remoteCursors.size() == 1); + auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front()); + const auto shardId = remoteCursor->getShardId().toString(); + const auto reply = uassertStatusOK(storePossibleCursor(opCtx, + namespaces.requestedNss, + std::move(remoteCursor), + privileges, + expCtx->tailableMode)); + return appendCursorResponseToCommandResult(shardId, reply, result); + } - // If we have the exchange spec then dispatch all consumers. - if (shardDispatchResults.exchangeSpec) { - shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx, - namespaces.executionNss, - request, - litePipe, - collationObj, - &shardDispatchResults); - } + // If we have the exchange spec then dispatch all consumers. + if (shardDispatchResults.exchangeSpec) { + shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx, + namespaces.executionNss, + request, + litePipe, + collationObj, + &shardDispatchResults); + } - // If we reach here, we have a merge pipeline to dispatch. - return dispatchMergingPipeline(expCtx, - namespaces, - request, - litePipe, - routingInfo, - std::move(shardDispatchResults), - result, - privileges); + // If we reach here, we have a merge pipeline to dispatch. + return dispatchMergingPipeline(expCtx, + namespaces, + request, + litePipe, + routingInfo, + std::move(shardDispatchResults), + result, + privileges); + }(); + + if (status.isOK()) + updateHostsTargetedMetrics( + opCtx, namespaces.executionNss, routingInfo, litePipe.getInvolvedNamespaces()); + + return status; } Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 567110ae134..ad19dc32da3 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -52,6 +52,7 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -203,6 +204,20 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( return requests; } +void updateNumHostsTargetedMetrics(OperationContext* opCtx, + const CachedCollectionRoutingInfo& routingInfo, + int nTargetedShards) { + int nShardsOwningChunks = 0; + if (routingInfo.cm()) { + nShardsOwningChunks = routingInfo.cm()->getNShardsOwningChunks(); + } + + auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( + opCtx, nTargetedShards, nShardsOwningChunks); + NumHostsTargetedMetrics::get(opCtx).addNumHostsTargeted( + NumHostsTargetedMetrics::QueryType::kFindCmd, targetType); +} + CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, @@ -337,6 +352,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // allocate a cursor id. if (cursorState == ClusterCursorManager::CursorState::Exhausted) { CurOp::get(opCtx)->debug().cursorExhausted = true; + + if (shardIds.size() > 0) { + updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + } return CursorId(0); } @@ -354,6 +373,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Record the cursorID in CurOp. CurOp::get(opCtx)->debug().cursorid = cursorId; + + if (shardIds.size() > 0) { + updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + } + return cursorId; } diff --git a/src/mongo/s/s_sharding_server_status.cpp b/src/mongo/s/s_sharding_server_status.cpp index f06df21aff6..0bb94bc5775 100644 --- a/src/mongo/s/s_sharding_server_status.cpp +++ b/src/mongo/s/s_sharding_server_status.cpp @@ -33,6 +33,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -80,8 +81,11 @@ public: const BSONElement& configElement) const override { auto const grid = Grid::get(opCtx); auto const catalogCache = grid->catalogCache(); + auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx); BSONObjBuilder result; + + numHostsTargetedMetrics.appendSection(&result); catalogCache->report(&result); return result.obj(); } diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 7f21ec10c84..29da54a9a30 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -57,6 +57,7 @@ #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/cluster_identity_loader.h" @@ -179,6 +180,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx, std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize); executorPool->startup(); + auto& numHostsTargetedMetrics = NumHostsTargetedMetrics::get(opCtx); + numHostsTargetedMetrics.startup(); + const auto service = opCtx->getServiceContext(); auto const grid = Grid::get(service); diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index c533f14a125..a48437febfd 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -58,6 +58,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" @@ -140,6 +141,8 @@ ShardingTestFixture::ShardingTestFixture() { stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager))); catalogClient->startup(); + NumHostsTargetedMetrics::get(service).startup(); + ConnectionString configCS = ConnectionString::forReplicaSet( "configRS", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}}); diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index b06b0c1c63b..86ced587670 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -435,6 +435,10 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, } } + auto nShardsOwningChunks = batchOp.getNShardsOwningChunks(); + if (nShardsOwningChunks.is_initialized()) + stats->noteNumShardsOwningChunks(nShardsOwningChunks.get()); + batchOp.buildClientResponse(clientResponse); LOG(4) << "Finished execution of write batch" @@ -456,6 +460,10 @@ void BatchWriteExecStats::noteWriteAt(const HostAndPort& host, _writeOpTimes[ConnectionString(host)] = HostOpTime(opTime, electionId); } +void BatchWriteExecStats::noteNumShardsOwningChunks(const int nShardsOwningChunks) { + _numShardsOwningChunks.emplace(nShardsOwningChunks); +} + const std::set<ShardId>& BatchWriteExecStats::getTargetedShards() const { return _targetedShards; } @@ -464,4 +472,8 @@ const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { return _writeOpTimes; } +const boost::optional<int> BatchWriteExecStats::getNumShardsOwningChunks() const { + return _numShardsOwningChunks; +} + } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index f61926889b8..846668967a6 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -90,9 +90,11 @@ public: void noteWriteAt(const HostAndPort& host, repl::OpTime opTime, const OID& electionId); void noteTargetedShard(const ShardId& shardId); + void noteNumShardsOwningChunks(const int nShardsOwningChunks); const std::set<ShardId>& getTargetedShards() const; const HostOpTimeMap& getWriteOpTimes() const; + const boost::optional<int> getNumShardsOwningChunks() const; // Expose via helpers if this gets more complex @@ -108,6 +110,7 @@ public: private: std::set<ShardId> _targetedShards; HostOpTimeMap _writeOpTimes; + boost::optional<int> _numShardsOwningChunks; }; } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 08f40961c07..5a822387116 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -36,6 +36,10 @@ #include "mongo/base/error_codes.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/s/database_sharding_state.h" +#include "mongo/s/client/num_hosts_targeted_metrics.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" #include "mongo/s/transaction_router.h" #include "mongo/stdx/memory.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -432,6 +436,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, targetedBatches->emplace(batch->getEndpoint().shardName, batch); } + _nShardsOwningChunks = targeter.getNShardsOwningChunks(); + return Status::OK(); } @@ -813,6 +819,10 @@ int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const { }); } +boost::optional<int> BatchWriteOp::getNShardsOwningChunks() { + return _nShardsOwningChunks; +} + void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) { const auto batchType = _clientRequest.getBatchType(); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 41525d1dcc7..718e6de75e7 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -201,6 +201,8 @@ public: */ int numWriteOpsIn(WriteOpState state) const; + boost::optional<int> getNShardsOwningChunks(); + private: /** * Maintains the batch execution statistics when a response is received. @@ -242,6 +244,8 @@ private: // Set to true if this write is part of a transaction. const bool _inTransaction{false}; + + boost::optional<int> _nShardsOwningChunks; }; /** diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index 39bb70a734b..44415122efa 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -797,6 +797,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC MONGO_UNREACHABLE; } +int ChunkManagerTargeter::getNShardsOwningChunks() const { + if (_routingInfo->cm()) { + return _routingInfo->cm()->getNShardsOwningChunks(); + } + + return 0; +} + Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) { Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*_routingInfo)); diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 2ace173e625..1c6b527d54a 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -108,6 +108,8 @@ public: */ Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override; + int getNShardsOwningChunks() const override; + private: using ShardVersionMap = std::map<ShardId, ChunkVersion>; diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index 2e368fadf74..1223af8be82 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -129,6 +129,11 @@ public: return Status::OK(); } + int getNShardsOwningChunks() const override { + // No-op + return 0; + } + private: static ChunkRange _parseRange(const BSONObj& query) { const StringData fieldName(query.firstElement().fieldName()); |