diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/s/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/s/balance.cpp | 56 | ||||
-rw-r--r-- | src/mongo/s/balance.h | 10 | ||||
-rw-r--r-- | src/mongo/s/balancer/cluster_statistics.cpp | 80 | ||||
-rw-r--r-- | src/mongo/s/balancer/cluster_statistics.h | 110 | ||||
-rw-r--r-- | src/mongo/s/balancer/cluster_statistics_impl.cpp | 151 | ||||
-rw-r--r-- | src/mongo/s/balancer/cluster_statistics_impl.h | 71 | ||||
-rw-r--r-- | src/mongo/s/balancer/cluster_statistics_test.cpp | 48 | ||||
-rw-r--r-- | src/mongo/s/balancer_policy.cpp | 198 | ||||
-rw-r--r-- | src/mongo/s/balancer_policy.h | 80 | ||||
-rw-r--r-- | src/mongo/s/balancer_policy_tests.cpp | 198 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 29 |
12 files changed, 648 insertions, 388 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index c858e9fe136..e8343dc4ee5 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -158,7 +158,10 @@ env.Library( target='coreshard', source=[ # This is only here temporarily for auto-split logic in chunk.cpp. + 'balance.cpp', 'balancer_policy.cpp', + 'balancer/cluster_statistics.cpp', + 'balancer/cluster_statistics_impl.cpp', 'chunk.cpp', 'chunk_manager.cpp', 'config.cpp', @@ -201,7 +204,6 @@ env.Library( env.Library( target='mongoscore', source=[ - 'balance.cpp', 'cluster_cursor_stats.cpp', 'request.cpp', 's_only.cpp', @@ -226,6 +228,7 @@ env.CppUnitTest( target='mongoscore_test', source=[ 'balancer_policy_tests.cpp', + 'balancer/cluster_statistics_test.cpp', 'shard_key_pattern_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 2a2cbbd0804..cc1b71d804f 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -35,6 +35,7 @@ #include <algorithm> #include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" @@ -45,6 +46,7 @@ #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/cluster_statistics_impl.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" @@ -52,13 +54,16 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_mongos.h" #include "mongo/s/catalog/type_settings.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/shard_util.h" #include "mongo/s/migration_secondary_throttle_options.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -123,10 +128,10 @@ private: * Occasionally prints a log message with shard versions if the versions are not the same * in the cluster. */ -void warnOnMultiVersion(const ShardInfoMap& shardInfo) { +void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) { bool isMultiVersion = false; - for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { - if (!isSameMajorVersion(i->second.getMongoVersion().c_str())) { + for (const auto& stat : clusterStats) { + if (!isSameMajorVersion(stat.mongoVersion.c_str())) { isMultiVersion = true; break; } @@ -136,10 +141,15 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) { if (!isMultiVersion) return; - warning() << "multiVersion cluster detected, my version is " << versionString; - for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { - log() << i->first << " is at version " << i->second.getMongoVersion(); + StringBuilder sb; + sb << "Multi version cluster detected. Local version: " << versionString + << ", shard versions: "; + + for (const auto& stat : clusterStats) { + sb << stat.shardId << " is at " << stat.mongoVersion << "; "; } + + warning() << sb.str(); } const Seconds kBalanceRoundDefaultInterval(10); @@ -152,7 +162,8 @@ MONGO_FP_DECLARE(balancerRoundIntervalSetting); } // namespace -Balancer::Balancer() : _balancedLastTime(0) {} +Balancer::Balancer() + : _balancedLastTime(0), _clusterStats(stdx::make_unique<ClusterStatisticsImpl>()) {} Balancer::~Balancer() = default; @@ -185,7 +196,7 @@ void Balancer::run() { try { // ping has to be first so we keep things in the config server in sync - _ping(txn.get()); + _ping(txn.get(), false); MONGO_FAIL_POINT_BLOCK(balancerRoundIntervalSetting, scopedBalancerRoundInterval) { const BSONObj& data = scopedBalancerRoundInterval.getData(); @@ -412,23 +423,12 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext* return vector<MigrateInfo>(); } - // Get a list of all the shards that are participating in this balance round along with any - // maximum allowed quotas and current utilization. We get the latter by issuing - // db.serverStatus() (mem.mapped) to all shards. - // - // TODO: skip unresponsive shards and mark information as stale. - auto shardInfoStatus = DistributionStatus::populateShardInfoMap(txn); - if (!shardInfoStatus.isOK()) { - return shardInfoStatus.getStatus(); - } - - const ShardInfoMap shardInfo(std::move(shardInfoStatus.getValue())); - - if (shardInfo.size() < 2) { + const auto clusterStats = uassertStatusOK(_clusterStats->getStats(txn)); + if (clusterStats.size() < 2) { return vector<MigrateInfo>(); } - OCCASIONALLY warnOnMultiVersion(shardInfo); + OCCASIONALLY warnOnMultiVersion(clusterStats); std::vector<MigrateInfo> candidateChunks; @@ -459,9 +459,7 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext* for (const ChunkType& chunk : allNsChunks) { allChunkMinimums.insert(chunk.getMin().getOwned()); - - vector<ChunkType>& chunksList = shardToChunksMap[chunk.getShard()]; - chunksList.push_back(chunk); + shardToChunksMap[chunk.getShard()].push_back(chunk); } if (shardToChunksMap.empty()) { @@ -469,12 +467,12 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext* continue; } - for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + for (const auto& stat : clusterStats) { // This loop just makes sure there is an entry in shardToChunksMap for every shard - shardToChunksMap[i->first]; + shardToChunksMap[stat.shardId]; } - DistributionStatus distStatus(shardInfo, shardToChunksMap); + DistributionStatus distStatus(clusterStats, shardToChunksMap); // TODO: TagRange contains all the information from TagsType except for the namespace, // so maybe the two can be merged at some point in order to avoid the @@ -610,7 +608,7 @@ int Balancer::_moveChunks(OperationContext* txn, << " was deleted while balancing was active. Aborting balancing round.", cm); - ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); + shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); if (c->getMin().woCompare(migrateInfo.chunk.min) || c->getMax().woCompare(migrateInfo.chunk.max)) { diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 17b58384590..3b808f485d9 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -36,6 +36,7 @@ namespace mongo { +class ClusterStatistics; struct MigrateInfo; class MigrationSecondaryThrottleOptions; class OperationContext; @@ -62,6 +63,10 @@ public: */ static Balancer* get(OperationContext* operationContext); + ClusterStatistics* getClusterStatistics() const { + return _clusterStats.get(); + } + // BackgroundJob methods virtual void run(); @@ -83,7 +88,7 @@ private: /** * Marks this balancer as being live on the config server(s). */ - void _ping(OperationContext* txn, bool waiting = false); + void _ping(OperationContext* txn, bool waiting); /** * Returns true if all the servers listed in configdb as being shards are reachable and are @@ -120,6 +125,9 @@ private: // number of moved chunks in last round int _balancedLastTime; + + // Source for cluster statistics + std::unique_ptr<ClusterStatistics> _clusterStats; }; } // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics.cpp b/src/mongo/s/balancer/cluster_statistics.cpp new file mode 100644 index 00000000000..0c66348ef36 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +ClusterStatistics::ClusterStatistics() = default; + +ClusterStatistics::~ClusterStatistics() = default; + +ClusterStatistics::ShardStatistics::ShardStatistics() = default; + +ClusterStatistics::ShardStatistics::ShardStatistics(ShardId inShardId, + uint64_t inMaxSizeMB, + uint64_t inCurrSizeMB, + bool inIsDraining, + const std::set<std::string> inShardTags, + const std::string inMongoVersion) + : shardId(std::move(inShardId)), + maxSizeMB(std::move(inMaxSizeMB)), + currSizeMB(std::move(inCurrSizeMB)), + isDraining(std::move(inIsDraining)), + shardTags(std::move(inShardTags)), + mongoVersion(std::move(inMongoVersion)) {} + +bool ClusterStatistics::ShardStatistics::isSizeMaxed() const { + if (!maxSizeMB || !currSizeMB) { + return false; + } + + return currSizeMB >= maxSizeMB; +} + +BSONObj ClusterStatistics::ShardStatistics::toBSON() const { + BSONObjBuilder builder; + builder.append("id", shardId); + builder.append("maxSizeMB", static_cast<long long>(maxSizeMB)); + builder.append("currSizeMB", static_cast<long long>(currSizeMB)); + builder.append("draining", isDraining); + if (!shardTags.empty()) { + BSONArrayBuilder arrayBuilder(builder.subarrayStart("tags")); + arrayBuilder.append(shardTags); + } + + builder.append("version", mongoVersion); + return builder.obj(); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics.h b/src/mongo/s/balancer/cluster_statistics.h new file mode 100644 index 00000000000..748b80c2956 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics.h @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <set> +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/client/shard.h" + +namespace mongo { + +class BSONObj; +class OperationContext; +template <typename T> +class StatusWith; + +/** + * This interface serves as means for obtaining data distribution and shard utilization statistics + * for the entire sharded cluster. Implementations may choose whatever means necessary to perform + * the statistics collection. There should be one instance of this object per service context. + */ +class ClusterStatistics { + MONGO_DISALLOW_COPYING(ClusterStatistics); + +public: + /** + * Structure, which describes the statistics of a single shard host. + */ + struct ShardStatistics { + public: + ShardStatistics(); + ShardStatistics(ShardId shardId, + uint64_t maxSizeMB, + uint64_t currSizeMB, + bool isDraining, + const std::set<std::string> shardTags, + const std::string mongoVersion); + + /** + * Returns if a shard cannot receive any new chunks because it has reached the per-shard + * data size limit. + */ + bool isSizeMaxed() const; + + /** + * Returns BSON representation of this shard's statistics, for reporting purposes. + */ + BSONObj toBSON() const; + + // The id of the shard for which this statistic applies + ShardId shardId; + + // The maximum size allowed for the shard + uint64_t maxSizeMB{0}; + + // The current size of the shard + uint64_t currSizeMB{0}; + + // Whether the shard is in draining mode + bool isDraining{false}; + + // Set of tags for the shard + std::set<std::string> shardTags; + + // Version of mongod, which runs on this shard's primary + std::string mongoVersion; + }; + + virtual ~ClusterStatistics(); + + /** + * Retrieves a snapshot of the current shard utilization state. The implementation of this + * method may block if necessary in order to refresh its state or may return a cached value. + */ + virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) = 0; + +protected: + ClusterStatistics(); +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp new file mode 100644 index 00000000000..a6e0b3dcf60 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_util.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using std::string; +using std::vector; + +namespace { + +const char kVersionField[] = "version"; + +/** + * Executes the serverStatus command against the specified shard and obtains the version of the + * running MongoD service. + * + * Returns the MongoD version in strig format or an error. Known error codes are: + * ShardNotFound if shard by that id is not available on the registry + * NoSuchKey if the version could not be retrieved + */ +StatusWith<string> retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId) { + auto shardRegistry = Grid::get(txn)->shardRegistry(); + auto commandStatus = shardRegistry->runIdempotentCommandOnShard( + txn, + shardId, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("serverStatus" << 1)); + if (!commandStatus.isOK()) { + return commandStatus.getStatus(); + } + + BSONObj serverStatus = std::move(commandStatus.getValue()); + + string version; + Status status = bsonExtractStringField(serverStatus, kVersionField, &version); + if (!status.isOK()) { + return status; + } + + return version; +} + +} // namespace + +using ShardStatistics = ClusterStatistics::ShardStatistics; + +ClusterStatisticsImpl::ClusterStatisticsImpl() = default; + +ClusterStatisticsImpl::~ClusterStatisticsImpl() = default; + +StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* txn) { + try { + _refreshShardStats(txn); + } catch (const DBException& e) { + return e.toStatus(); + } + + vector<ShardStatistics> stats; + + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + for (const auto& stat : _shardStatsMap) { + stats.push_back(stat.second); + } + + return stats; +} + +void ClusterStatisticsImpl::_refreshShardStats(OperationContext* txn) { + // Get a list of all the shards that are participating in this balance round along with any + // maximum allowed quotas and current utilization. We get the latter by issuing + // db.serverStatus() (mem.mapped) to all shards. + // + // TODO: skip unresponsive shards and mark information as stale. + auto shardsStatus = Grid::get(txn)->catalogManager(txn)->getAllShards(txn); + uassertStatusOK(shardsStatus.getStatus()); + + const vector<ShardType> shards(std::move(shardsStatus.getValue().value)); + + for (const auto& shard : shards) { + auto shardSizeStatus = shardutil::retrieveTotalShardSize(txn, shard.getName()); + if (!shardSizeStatus.isOK()) { + continue; + } + + auto mongoDVersionStatus = retrieveShardMongoDVersion(txn, shard.getName()); + if (!mongoDVersionStatus.isOK()) { + continue; + } + const string mongoDVersion(std::move(mongoDVersionStatus.getValue())); + + std::set<string> shardTags; + for (const auto& shardTag : shard.getTags()) { + shardTags.insert(shardTag); + } + + ShardStatistics newShardStat(shard.getName(), + shard.getMaxSizeMB(), + shardSizeStatus.getValue() / 1024 / 1024, + shard.getDraining(), + shardTags, + mongoDVersion); + + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + _shardStatsMap[shard.getName()] = std::move(newShardStat); + } +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_impl.h b/src/mongo/s/balancer/cluster_statistics_impl.h new file mode 100644 index 00000000000..a1ea1829ac1 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_impl.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> + +#include "mongo/s/balancer/cluster_statistics.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class OperationContext; +class Status; + +/** + * Default implementation for the cluster statistics gathering utility. Uses a blocking method to + * fetch the statistics and does not perform any caching. + */ +class ClusterStatisticsImpl final : public ClusterStatistics { +public: + ClusterStatisticsImpl(); + ~ClusterStatisticsImpl(); + + StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) override; + +private: + typedef std::map<ShardId, ShardStatistics> ShardStatisticsMap; + + /** + * Refreshes the list of available shards and loops through them in order to collect usage + * statistics. If any of the shards fails to report statistics, skips it and continues with the + * next. + * + * If the list of shards cannot be retrieved throws an exception. + */ + void _refreshShardStats(OperationContext* txn); + + // Mutex to protect the mutable state below + stdx::mutex _mutex; + + // The most up-to-date shard statistics + ShardStatisticsMap _shardStatsMap; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_test.cpp b/src/mongo/s/balancer/cluster_statistics_test.cpp new file mode 100644 index 00000000000..4fe4f0b27e5 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_test.cpp @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using ShardStatistics = ClusterStatistics::ShardStatistics; + +const auto emptyTagSet = std::set<std::string>(); + +TEST(ShardStatistics, SizeMaxedTest) { + ASSERT(!ShardStatistics("TestShardId", 0, 0, false, emptyTagSet, "3.2.0").isSizeMaxed()); + ASSERT(!ShardStatistics("TestShardId", 100LL, 80LL, false, emptyTagSet, "3.2.0").isSizeMaxed()); + ASSERT(ShardStatistics("TestShardId", 100LL, 110LL, false, emptyTagSet, "3.2.0").isSizeMaxed()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 1a307df60b2..8737dca75cd 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -56,52 +56,13 @@ using std::set; using std::string; using std::vector; -namespace { - -/** - * Executes the serverStatus command against the specified shard and obtains the version of the - * running MongoD service. - * - * The MongoD version or throws an exception. Known exception codes are: - * ShardNotFound if shard by that id is not available on the registry - * NoSuchKey if the version could not be retrieved - */ -std::string retrieveShardMongoDVersion(OperationContext* txn, - ShardId shardId, - ShardRegistry* shardRegistry) { - BSONObj serverStatus = uassertStatusOK(shardRegistry->runIdempotentCommandOnShard( - txn, - shardId, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - BSON("serverStatus" << 1))); - BSONElement versionElement = serverStatus["version"]; - if (versionElement.type() != String) { - uassertStatusOK({ErrorCodes::NoSuchKey, "version field not found in serverStatus"}); - } - - return versionElement.str(); -} - -} // namespace - string TagRange::toString() const { return str::stream() << min << " -->> " << max << " on " << tag; } -DistributionStatus::DistributionStatus(const ShardInfoMap& shardInfo, +DistributionStatus::DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap) - : _shardInfo(shardInfo), _shardChunks(shardToChunksMap) { - for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { - _shardIds.insert(i->first); - } -} - -const ShardInfo& DistributionStatus::shardInfo(const ShardId& shardId) const { - ShardInfoMap::const_iterator i = _shardInfo.find(shardId); - verify(i != _shardInfo.end()); - return i->second; -} + : _shardInfo(std::move(shardInfo)), _shardChunks(shardToChunksMap) {} unsigned DistributionStatus::totalChunks() const { unsigned total = 0; @@ -145,30 +106,30 @@ string DistributionStatus::getBestReceieverShard(const string& tag) const { string best; unsigned minChunks = numeric_limits<unsigned>::max(); - for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { - if (i->second.isSizeMaxed()) { - LOG(1) << i->first << " has already reached the maximum total chunk size."; + for (const auto& stat : _shardInfo) { + if (stat.isSizeMaxed()) { + LOG(1) << stat.shardId << " has already reached the maximum total chunk size."; continue; } - if (i->second.isDraining()) { - LOG(1) << i->first << " is currently draining."; + if (stat.isDraining) { + LOG(1) << stat.shardId << " is currently draining."; continue; } - if (!i->second.hasTag(tag)) { - LOG(1) << i->first << " doesn't have right tag"; + if (!tag.empty() && !stat.shardTags.count(tag)) { + LOG(1) << stat.shardId << " doesn't have right tag"; continue; } - unsigned myChunks = numberOfChunksInShard(i->first); + unsigned myChunks = numberOfChunksInShard(stat.shardId); if (myChunks >= minChunks) { - LOG(1) << i->first << " has more chunks me:" << myChunks << " best: " << best << ":" + LOG(1) << stat.shardId << " has more chunks me:" << myChunks << " best: " << best << ":" << minChunks; continue; } - best = i->first; + best = stat.shardId; minChunks = myChunks; } @@ -179,12 +140,12 @@ string DistributionStatus::getMostOverloadedShard(const string& tag) const { string worst; unsigned maxChunks = 0; - for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { - unsigned myChunks = numberOfChunksInShardWithTag(i->first, tag); + for (const auto& stat : _shardInfo) { + unsigned myChunks = numberOfChunksInShardWithTag(stat.shardId, tag); if (myChunks <= maxChunks) continue; - worst = i->first; + worst = stat.shardId; maxChunks = myChunks; } @@ -250,10 +211,10 @@ void DistributionStatus::dump() const { log() << "DistributionStatus"; log() << " shards"; - for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { - log() << " " << i->first << "\t" << i->second.toString(); + for (const auto& stat : _shardInfo) { + log() << " " << stat.shardId << "\t" << stat.toBSON(); - ShardToChunksMap::const_iterator j = _shardChunks.find(i->first); + ShardToChunksMap::const_iterator j = _shardChunks.find(stat.shardId); verify(j != _shardChunks.end()); const vector<ChunkType>& v = j->second; @@ -271,50 +232,12 @@ void DistributionStatus::dump() const { } } -StatusWith<ShardInfoMap> DistributionStatus::populateShardInfoMap(OperationContext* txn) { - try { - auto shardsStatus = grid.catalogManager(txn)->getAllShards(txn); - if (!shardsStatus.isOK()) { - return shardsStatus.getStatus(); - } - const vector<ShardType> shards(std::move(shardsStatus.getValue().value)); - - ShardInfoMap shardInfo; - - for (const ShardType& shardData : shards) { - std::set<std::string> dummy; - - const long long shardSizeBytes = - uassertStatusOK(shardutil::retrieveTotalShardSize(txn, shardData.getName())); - - const std::string shardMongodVersion = - retrieveShardMongoDVersion(txn, shardData.getName(), grid.shardRegistry()); - - ShardInfo newShardEntry(shardData.getMaxSizeMB(), - shardSizeBytes / 1024 / 1024, - shardData.getDraining(), - dummy, - shardMongodVersion); - - for (const string& shardTag : shardData.getTags()) { - newShardEntry.addTag(shardTag); - } - - shardInfo.insert(make_pair(shardData.getName(), newShardEntry)); - } - - return std::move(shardInfo); - } catch (const DBException& ex) { - return ex.toStatus(); - } -} - -void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards, +void DistributionStatus::populateShardToChunksMap(const ShardStatisticsVector& allShards, const ChunkManager& chunkMgr, ShardToChunksMap* shardToChunksMap) { // Makes sure there is an entry in shardToChunksMap for every shard. - for (ShardInfoMap::const_iterator it = allShards.begin(); it != allShards.end(); ++it) { - (*shardToChunksMap)[it->first]; + for (const auto& stat : allShards) { + (*shardToChunksMap)[stat.shardId]; } const ChunkMap& chunkMap = chunkMgr.getChunkMap(); @@ -346,18 +269,16 @@ MigrateInfo* BalancerPolicy::balance(const string& ns, // 1) check things we have to move { - for (const ShardId& shardId : distribution.shardIds()) { - const ShardInfo& info = distribution.shardInfo(shardId); - - if (!info.isDraining()) + for (const auto& stat : distribution.getStats()) { + if (!stat.isDraining) continue; - if (distribution.numberOfChunksInShard(shardId) == 0) + if (distribution.numberOfChunksInShard(stat.shardId) == 0) continue; // now we know we need to move to chunks off this shard // we will if we are allowed - const vector<ChunkType>& chunks = distribution.getChunks(shardId); + const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); unsigned numJumboChunks = 0; // since we have to move all chunks, lets just do in order @@ -373,32 +294,32 @@ MigrateInfo* BalancerPolicy::balance(const string& ns, if (to.size() == 0) { warning() << "want to move chunk: " << chunkToMove << "(" << tag << ") " - << "from " << shardId << " but can't find anywhere to put it"; + << "from " << stat.shardId << " but can't find anywhere to put it"; continue; } - log() << "going to move " << chunkToMove << " from " << shardId << "(" << tag << ")" + log() << "going to move " << chunkToMove << " from " << stat.shardId << "(" << tag + << ")" << " to " << to; - return new MigrateInfo(ns, to, shardId, chunkToMove.toBSON()); + return new MigrateInfo(ns, to, stat.shardId, chunkToMove.toBSON()); } - warning() << "can't find any chunk to move from: " << shardId << " but we want to. " + warning() << "can't find any chunk to move from: " << stat.shardId + << " but we want to. " << " numJumboChunks: " << numJumboChunks; } } // 2) tag violations if (distribution.tags().size() > 0) { - for (const ShardId& shardId : distribution.shardIds()) { - const ShardInfo& info = distribution.shardInfo(shardId); - - const vector<ChunkType>& chunks = distribution.getChunks(shardId); + for (const auto& stat : distribution.getStats()) { + const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); for (unsigned j = 0; j < chunks.size(); j++) { const ChunkType& chunk = chunks[j]; string tag = distribution.getTagForChunk(chunk); - if (info.hasTag(tag)) + if (tag.empty() || stat.shardTags.count(tag)) continue; // uh oh, this chunk is in the wrong place @@ -414,9 +335,10 @@ MigrateInfo* BalancerPolicy::balance(const string& ns, log() << "no where to put it :("; continue; } - verify(to != shardId); + + invariant(to != stat.shardId); log() << " going to move to: " << to; - return new MigrateInfo(ns, to, shardId, chunk.toBSON()); + return new MigrateInfo(ns, to, stat.shardId, chunk.toBSON()); } } } @@ -501,52 +423,6 @@ MigrateInfo* BalancerPolicy::balance(const string& ns, return NULL; } - -ShardInfo::ShardInfo(long long maxSizeMB, - long long currSizeMB, - bool draining, - const set<string>& tags, - const string& mongoVersion) - : _maxSizeMB(maxSizeMB), - _currSizeMB(currSizeMB), - _draining(draining), - _tags(tags), - _mongoVersion(mongoVersion) {} - -ShardInfo::ShardInfo() : _maxSizeMB(0), _currSizeMB(0), _draining(false) {} - -void ShardInfo::addTag(const string& tag) { - _tags.insert(tag); -} - - -bool ShardInfo::isSizeMaxed() const { - if (_maxSizeMB == 0 || _currSizeMB == 0) - return false; - - return _currSizeMB >= _maxSizeMB; -} - -bool ShardInfo::hasTag(const string& tag) const { - if (tag.size() == 0) - return true; - return _tags.count(tag) > 0; -} - -string ShardInfo::toString() const { - StringBuilder ss; - ss << " maxSizeMB: " << _maxSizeMB; - ss << " currSizeMB: " << _currSizeMB; - ss << " draining: " << _draining; - if (_tags.size() > 0) { - ss << "tags : "; - for (set<string>::const_iterator i = _tags.begin(); i != _tags.end(); ++i) - ss << *i << ","; - } - ss << " version: " << _mongoVersion; - return ss.str(); -} - string ChunkInfo::toString() const { StringBuilder buf; buf << " min: " << min; diff --git a/src/mongo/s/balancer_policy.h b/src/mongo/s/balancer_policy.h index 4d764b62135..96accafa482 100644 --- a/src/mongo/s/balancer_policy.h +++ b/src/mongo/s/balancer_policy.h @@ -32,6 +32,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/jsobj.h" +#include "mongo/s/balancer/cluster_statistics.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard.h" @@ -65,59 +66,6 @@ struct TagRange { std::string toString() const; }; - -class ShardInfo { -public: - ShardInfo(); - ShardInfo(long long maxSizeMB, - long long currSizeMB, - bool draining, - const std::set<std::string>& tags = std::set<std::string>(), - const std::string& _mongoVersion = std::string("")); - - void addTag(const std::string& tag); - - /** @return true if we have the tag OR if the tag is "" */ - bool hasTag(const std::string& tag) const; - - /** - * @return true if a shard cannot receive any new chunks because it reaches 'shardLimits'. - * Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size - * in MB, on 'shardLimits'. - */ - bool isSizeMaxed() const; - - /** - * @return true if 'shardLimist' contains a field "draining". Expects the optional field - * "isDraining" on 'shrdLimits'. - */ - bool isDraining() const { - return _draining; - } - - long long getMaxSizeMB() const { - return _maxSizeMB; - } - - long long getCurrSizeMB() const { - return _currSizeMB; - } - - std::string getMongoVersion() const { - return _mongoVersion; - } - - std::string toString() const; - -private: - long long _maxSizeMB; - long long _currSizeMB; - bool _draining; - std::set<std::string> _tags; - std::string _mongoVersion; -}; - - struct MigrateInfo { MigrateInfo(const std::string& a_ns, const ShardId& a_to, @@ -131,15 +79,14 @@ struct MigrateInfo { const ChunkInfo chunk; }; -typedef std::map<ShardId, ShardInfo> ShardInfoMap; +typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector; typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap; - class DistributionStatus { MONGO_DISALLOW_COPYING(DistributionStatus); public: - DistributionStatus(const ShardInfoMap& shardInfo, const ShardToChunksMap& shardToChunksMap); + DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap); // only used when building @@ -185,36 +132,25 @@ public: /** @return the right tag for chunk, possibly "" */ std::string getTagForChunk(const ChunkType& chunk) const; - /** @return all shard ids we know about */ - const std::set<ShardId>& shardIds() const { - return _shardIds; - } - - /** @return the ShardInfo for the shard */ - const ShardInfo& shardInfo(const ShardId& shardId) const; - /** writes all state to log() */ void dump() const; - /** - * Retrieves shard metadata information from the config server as well as some stats - * from the shards. - */ - static StatusWith<ShardInfoMap> populateShardInfoMap(OperationContext* txn); + const ShardStatisticsVector& getStats() const { + return _shardInfo; + } /** * Note: jumbo and versions are not set. */ - static void populateShardToChunksMap(const ShardInfoMap& allShards, + static void populateShardToChunksMap(const ShardStatisticsVector& allShards, const ChunkManager& chunkMgr, ShardToChunksMap* shardToChunksMap); private: - const ShardInfoMap& _shardInfo; + const ShardStatisticsVector _shardInfo; const ShardToChunksMap& _shardChunks; std::map<BSONObj, TagRange> _tagRanges; std::set<std::string> _allTags; - std::set<ShardId> _shardIds; }; diff --git a/src/mongo/s/balancer_policy_tests.cpp b/src/mongo/s/balancer_policy_tests.cpp index b200576ee7b..d3550cd3cf3 100644 --- a/src/mongo/s/balancer_policy_tests.cpp +++ b/src/mongo/s/balancer_policy_tests.cpp @@ -28,10 +28,11 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault +#include "mongo/platform/basic.h" + #include "mongo/platform/random.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/config.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -44,11 +45,18 @@ using std::string; using std::stringstream; using std::vector; +using ShardStatistics = ClusterStatistics::ShardStatistics; + +const auto emptyTagSet = std::set<std::string>(); +const std::string emptyShardVersion = ""; + +ShardStatistics& findStat(std::vector<ShardStatistics>& stats, const ShardId& shardId) { + for (auto& stat : stats) { + if (stat.shardId == shardId) + return stat; + } -TEST(BalancerPolicyTests, SizeMaxedShardTest) { - ASSERT(!ShardInfo(0, 0, false).isSizeMaxed()); - ASSERT(!ShardInfo(100LL, 80LL, false).isSizeMaxed()); - ASSERT(ShardInfo(100LL, 110LL, false).isSizeMaxed()); + MONGO_UNREACHABLE; } TEST(BalancerPolicyTests, BalanceNormalTest) { @@ -72,14 +80,13 @@ TEST(BalancerPolicyTests, BalanceNormalTest) { chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector<ChunkType>(); - // no limits - ShardInfoMap info; - info["shard0"] = ShardInfo(0, 2, false); - info["shard1"] = ShardInfo(0, 0, false); + // No limits + DistributionStatus status( + {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)}, + chunkMap); - DistributionStatus status(info, chunkMap); std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 1)); - ASSERT(c); } @@ -129,12 +136,11 @@ TEST(BalancerPolicyTests, BalanceJumbo) { chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector<ChunkType>(); - // no limits - ShardInfoMap info; - info["shard0"] = ShardInfo(0, 2, false); - info["shard1"] = ShardInfo(0, 0, false); - - DistributionStatus status(info, chunkMap); + // No limits + DistributionStatus status( + {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)}, + chunkMap); std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 1)); ASSERT(c); @@ -163,11 +169,10 @@ TEST(BalanceNormalTests, BalanceDrainingTest) { chunkMap["shard1"] = vector<ChunkType>(); // shard0 is draining - ShardInfoMap limitsMap; - limitsMap["shard0"] = ShardInfo(0LL, 2LL, true); - limitsMap["shard1"] = ShardInfo(0LL, 0LL, false); - - DistributionStatus status(limitsMap, chunkMap); + DistributionStatus status( + {ShardStatistics("shard0", 0, 2, true, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)}, + chunkMap); std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0)); ASSERT(c); @@ -197,12 +202,11 @@ TEST(BalancerPolicyTests, BalanceEndedDrainingTest) { chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector<ChunkType>(); - // no limits - ShardInfoMap limitsMap; - limitsMap["shard0"] = ShardInfo(0, 2, false); - limitsMap["shard1"] = ShardInfo(0, 0, true); - - DistributionStatus status(limitsMap, chunkMap); + // shard1 is draining + DistributionStatus status( + {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 0, true, emptyTagSet, emptyShardVersion)}, + chunkMap); std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0)); ASSERT(!c); @@ -231,12 +235,11 @@ TEST(BalancerPolicyTests, BalanceImpasseTest) { chunkMap["shard2"] = vector<ChunkType>(); // shard0 is draining, shard1 is maxed out, shard2 has writebacks pending - ShardInfoMap limitsMap; - limitsMap["shard0"] = ShardInfo(0, 2, true); - limitsMap["shard1"] = ShardInfo(1, 1, false); - limitsMap["shard2"] = ShardInfo(0, 1, true); - - DistributionStatus status(limitsMap, chunkMap); + DistributionStatus status( + {ShardStatistics("shard0", 0, 2, true, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 1, 1, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard2", 0, 1, true, emptyTagSet, emptyShardVersion)}, + chunkMap); std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0)); ASSERT(!c); @@ -297,12 +300,10 @@ TEST(BalancerPolicyTests, MultipleDraining) { addShard(chunks, 10, false); addShard(chunks, 5, true); - ShardInfoMap shards; - shards["shard0"] = ShardInfo(0, 5, true); - shards["shard1"] = ShardInfo(0, 5, true); - shards["shard2"] = ShardInfo(0, 5, false); - - DistributionStatus d(shards, chunks); + DistributionStatus d({ShardStatistics("shard0", 0, 5, true, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 5, true, emptyTagSet, emptyShardVersion), + ShardStatistics("shard2", 0, 5, false, emptyTagSet, emptyShardVersion)}, + chunks); std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0)); ASSERT(m); @@ -316,18 +317,12 @@ TEST(BalancerPolicyTests, TagsDraining) { addShard(chunks, 5, false); addShard(chunks, 5, true); - ShardInfoMap shards; - shards["shard0"] = ShardInfo(0, 5, false); - shards["shard1"] = ShardInfo(0, 5, true); - shards["shard2"] = ShardInfo(0, 5, false); - - shards["shard0"].addTag("a"); - shards["shard1"].addTag("a"); - shards["shard1"].addTag("b"); - shards["shard2"].addTag("b"); - while (true) { - DistributionStatus d(shards, chunks); + DistributionStatus d({ShardStatistics("shard0", 0, 5, false, {"a"}, emptyShardVersion), + ShardStatistics("shard1", 0, 5, true, {"a", "b"}, emptyShardVersion), + ShardStatistics("shard2", 0, 5, false, {"b"}, emptyShardVersion)}, + chunks); + d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 7), "a")); d.addTagRange(TagRange(BSON("x" << 7), BSON("x" << 1000), "b")); @@ -357,16 +352,12 @@ TEST(BalancerPolicyTests, TagsPolicyChange) { addShard(chunks, 5, false); addShard(chunks, 5, true); - ShardInfoMap shards; - shards["shard0"] = ShardInfo(0, 5, false); - shards["shard1"] = ShardInfo(0, 5, false); - shards["shard2"] = ShardInfo(0, 5, false); - - shards["shard0"].addTag("a"); - shards["shard1"].addTag("a"); - while (true) { - DistributionStatus d(shards, chunks); + DistributionStatus d( + {ShardStatistics("shard0", 0, 5, false, {"a"}, emptyShardVersion), + ShardStatistics("shard1", 0, 5, false, {"a"}, emptyShardVersion), + ShardStatistics("shard2", 0, 5, false, emptyTagSet, emptyShardVersion)}, + chunks); d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 1000), "a")); std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0)); @@ -388,8 +379,7 @@ TEST(BalancerPolicyTests, TagsPolicyChange) { TEST(BalancerPolicyTests, TagsSelector) { ShardToChunksMap chunks; - ShardInfoMap shards; - DistributionStatus d(shards, chunks); + DistributionStatus d({}, chunks); ASSERT(d.addTagRange(TagRange(BSON("x" << 1), BSON("x" << 10), "a"))); ASSERT(d.addTagRange(TagRange(BSON("x" << 10), BSON("x" << 20), "b"))); @@ -455,13 +445,10 @@ TEST(BalancerPolicyTests, MaxSizeRespect) { // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3. // Other shards have maxSize = 0 = unset. - - ShardInfoMap shards; - shards["shard0"] = ShardInfo(1, 3, false); - shards["shard1"] = ShardInfo(0, 4, false); - shards["shard2"] = ShardInfo(0, 6, false); - - DistributionStatus d(shards, chunks); + DistributionStatus d({ShardStatistics("shard0", 1, 3, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 4, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard2", 0, 6, false, emptyTagSet, emptyShardVersion)}, + chunks); std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0)); ASSERT(m); @@ -481,16 +468,12 @@ TEST(BalancerPolicyTests, MaxSizeNoDrain) { addShard(chunks, 4, false); addShard(chunks, 4, true); - // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. - // Other shards have maxSize = 0 = unset. - - ShardInfoMap shards; - // ShardInfo(maxSize, currSize, draining, opsQueued) - shards["shard0"] = ShardInfo(1, 4, false); - shards["shard1"] = ShardInfo(0, 4, false); - shards["shard2"] = ShardInfo(0, 4, false); - - DistributionStatus d(shards, chunks); + // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. Other + // shards have maxSize = 0 = unset. + DistributionStatus d({ShardStatistics("shard0", 1, 4, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard1", 0, 4, false, emptyTagSet, emptyShardVersion), + ShardStatistics("shard2", 0, 4, false, emptyTagSet, emptyShardVersion)}, + chunks); std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0)); ASSERT(!m); @@ -522,7 +505,7 @@ TEST(BalancerPolicyTests, Simulation) { int numChunks = 0; ShardToChunksMap chunks; - ShardInfoMap shards; + vector<ShardStatistics> shards; map<string, int> expected; @@ -542,12 +525,16 @@ TEST(BalancerPolicyTests, Simulation) { addShard(chunks, numShardChunks, false); numChunks += numShardChunks; - shards[str::stream() << "shard" << i] = - ShardInfo(maxed ? numShardChunks + 1 : 0, numShardChunks, draining); + shards.emplace_back(str::stream() << "shard" << i, + maxed ? numShardChunks + 1 : 0, + numShardChunks, + draining, + emptyTagSet, + emptyShardVersion); } - for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { - log() << it->first << " : " << it->second.toString(); + for (const auto& stat : shards) { + log() << stat.shardId << " : " << stat.toBSON(); } // Perform migrations and increment data size as chunks move @@ -561,55 +548,44 @@ TEST(BalancerPolicyTests, Simulation) { moveChunk(chunks, m.get()); - { - ShardInfo& info = shards[m->from]; - shards[m->from] = - ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() - 1, info.isDraining()); - } - - { - ShardInfo& info = shards[m->to]; - shards[m->to] = - ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() + 1, info.isDraining()); - } + findStat(shards, m->from).currSizeMB -= 1; + findStat(shards, m->to).currSizeMB += 1; } // Make sure our balance is correct and our data size is low. // The balanced value is the count on the last shard, since it's not draining or // limited. - int balancedSize = (--shards.end())->second.getCurrSizeMB(); + const int64_t balancedSize = (--shards.end())->currSizeMB; - for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { - log() << it->first << " : " << it->second.toString(); - } + for (const auto& stat : shards) { + log() << stat.shardId << " : " << stat.toBSON(); - for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { - log() << it->first << " : " << it->second.toString(); + // Cast the size once and use it from here in order to avoid typecast errors + const int shardCurrSizeMB = static_cast<int>(stat.currSizeMB); - map<string, int>::iterator expectedIt = expected.find(it->first); + map<string, int>::iterator expectedIt = expected.find(stat.shardId); if (expectedIt == expected.end()) { - bool isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 && - it->second.getCurrSizeMB() <= balancedSize + 1; + const bool isInRange = + shardCurrSizeMB >= balancedSize - 1 && shardCurrSizeMB <= balancedSize + 1; if (!isInRange) { - warning() << "non-limited and non-draining shard had " - << it->second.getCurrSizeMB() << " chunks, expected near " - << balancedSize; + warning() << "non-limited and non-draining shard had " << shardCurrSizeMB + << " chunks, expected near " << balancedSize; } ASSERT(isInRange); } else { int expectedSize = expectedIt->second; - bool isInRange = it->second.getCurrSizeMB() <= expectedSize; + bool isInRange = shardCurrSizeMB <= expectedSize; if (isInRange && expectedSize >= balancedSize) { - isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 && - it->second.getCurrSizeMB() <= balancedSize + 1; + isInRange = + shardCurrSizeMB >= balancedSize - 1 && shardCurrSizeMB <= balancedSize + 1; } if (!isInRange) { - warning() << "limited or draining shard had " << it->second.getCurrSizeMB() + warning() << "limited or draining shard had " << shardCurrSizeMB << " chunks, expected less than " << expectedSize << " and (if less than expected) near " << balancedSize; } diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 60ef9294cf9..cd3b226db65 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -33,7 +33,6 @@ #include "mongo/s/chunk.h" #include "mongo/client/connpool.h" -#include "mongo/client/dbclientcursor.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" @@ -42,7 +41,9 @@ #include "mongo/db/write_concern_options.h" #include "mongo/platform/random.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balance.h" #include "mongo/s/balancer_policy.h" +#include "mongo/s/balancer/cluster_statistics.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_settings.h" @@ -78,25 +79,27 @@ const int kTooManySplitPoints = 4; bool tryMoveToOtherShard(OperationContext* txn, const ChunkManager& manager, const ChunkType& chunk) { - // reload sharding metadata before starting migration - ChunkManagerPtr chunkMgr = manager.reload(txn, false /* just reloaded in mulitsplit */); - - auto shardInfoMapStatus = DistributionStatus::populateShardInfoMap(txn); - if (!shardInfoMapStatus.isOK()) { - warning() << "failed to load shard metadata while trying to moveChunk after " - << "auto-splitting" << causedBy(shardInfoMapStatus.getStatus()); + auto clusterStatsStatus(Balancer::get(txn)->getClusterStatistics()->getStats(txn)); + if (!clusterStatsStatus.isOK()) { + warning() << "Could not get cluster statistics " + << causedBy(clusterStatsStatus.getStatus()); return false; } - const ShardInfoMap shardInfo(std::move(shardInfoMapStatus.getValue())); + const auto clusterStats(std::move(clusterStatsStatus.getValue())); - if (shardInfo.size() < 2) { + if (clusterStats.size() < 2) { LOG(0) << "no need to move top chunk since there's only 1 shard"; return false; } + // Reload sharding metadata before starting migration. Only reload the differences though, + // because the entire chunk manager was reloaded during the call to split, which immediately + // precedes this move logic + shared_ptr<ChunkManager> chunkMgr = manager.reload(txn, false); + map<string, vector<ChunkType>> shardToChunkMap; - DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap); + DistributionStatus::populateShardToChunksMap(clusterStats, *chunkMgr, &shardToChunkMap); StatusWith<string> tagStatus = grid.catalogManager(txn)->getTagForChunk(txn, manager.getns(), chunk); @@ -106,7 +109,7 @@ bool tryMoveToOtherShard(OperationContext* txn, return false; } - DistributionStatus chunkDistribution(shardInfo, shardToChunkMap); + DistributionStatus chunkDistribution(clusterStats, shardToChunkMap); const string newLocation(chunkDistribution.getBestReceieverShard(tagStatus.getValue())); if (newLocation.empty()) { @@ -120,7 +123,7 @@ bool tryMoveToOtherShard(OperationContext* txn, return false; } - ChunkPtr toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin()); + shared_ptr<Chunk> toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin()); if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) { LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate " |