summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/SConscript5
-rw-r--r--src/mongo/s/balance.cpp56
-rw-r--r--src/mongo/s/balance.h10
-rw-r--r--src/mongo/s/balancer/cluster_statistics.cpp80
-rw-r--r--src/mongo/s/balancer/cluster_statistics.h110
-rw-r--r--src/mongo/s/balancer/cluster_statistics_impl.cpp151
-rw-r--r--src/mongo/s/balancer/cluster_statistics_impl.h71
-rw-r--r--src/mongo/s/balancer/cluster_statistics_test.cpp48
-rw-r--r--src/mongo/s/balancer_policy.cpp198
-rw-r--r--src/mongo/s/balancer_policy.h80
-rw-r--r--src/mongo/s/balancer_policy_tests.cpp198
-rw-r--r--src/mongo/s/chunk.cpp29
12 files changed, 648 insertions, 388 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index c858e9fe136..e8343dc4ee5 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -158,7 +158,10 @@ env.Library(
target='coreshard',
source=[
# This is only here temporarily for auto-split logic in chunk.cpp.
+ 'balance.cpp',
'balancer_policy.cpp',
+ 'balancer/cluster_statistics.cpp',
+ 'balancer/cluster_statistics_impl.cpp',
'chunk.cpp',
'chunk_manager.cpp',
'config.cpp',
@@ -201,7 +204,6 @@ env.Library(
env.Library(
target='mongoscore',
source=[
- 'balance.cpp',
'cluster_cursor_stats.cpp',
'request.cpp',
's_only.cpp',
@@ -226,6 +228,7 @@ env.CppUnitTest(
target='mongoscore_test',
source=[
'balancer_policy_tests.cpp',
+ 'balancer/cluster_statistics_test.cpp',
'shard_key_pattern_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index 2a2cbbd0804..cc1b71d804f 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -35,6 +35,7 @@
#include <algorithm>
#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/client.h"
@@ -45,6 +46,7 @@
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer/cluster_statistics_impl.h"
#include "mongo/s/balancer_policy.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
@@ -52,13 +54,16 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_mongos.h"
#include "mongo/s/catalog/type_settings.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/shard_util.h"
#include "mongo/s/migration_secondary_throttle_options.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -123,10 +128,10 @@ private:
* Occasionally prints a log message with shard versions if the versions are not the same
* in the cluster.
*/
-void warnOnMultiVersion(const ShardInfoMap& shardInfo) {
+void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) {
bool isMultiVersion = false;
- for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
- if (!isSameMajorVersion(i->second.getMongoVersion().c_str())) {
+ for (const auto& stat : clusterStats) {
+ if (!isSameMajorVersion(stat.mongoVersion.c_str())) {
isMultiVersion = true;
break;
}
@@ -136,10 +141,15 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) {
if (!isMultiVersion)
return;
- warning() << "multiVersion cluster detected, my version is " << versionString;
- for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
- log() << i->first << " is at version " << i->second.getMongoVersion();
+ StringBuilder sb;
+ sb << "Multi version cluster detected. Local version: " << versionString
+ << ", shard versions: ";
+
+ for (const auto& stat : clusterStats) {
+ sb << stat.shardId << " is at " << stat.mongoVersion << "; ";
}
+
+ warning() << sb.str();
}
const Seconds kBalanceRoundDefaultInterval(10);
@@ -152,7 +162,8 @@ MONGO_FP_DECLARE(balancerRoundIntervalSetting);
} // namespace
-Balancer::Balancer() : _balancedLastTime(0) {}
+Balancer::Balancer()
+ : _balancedLastTime(0), _clusterStats(stdx::make_unique<ClusterStatisticsImpl>()) {}
Balancer::~Balancer() = default;
@@ -185,7 +196,7 @@ void Balancer::run() {
try {
// ping has to be first so we keep things in the config server in sync
- _ping(txn.get());
+ _ping(txn.get(), false);
MONGO_FAIL_POINT_BLOCK(balancerRoundIntervalSetting, scopedBalancerRoundInterval) {
const BSONObj& data = scopedBalancerRoundInterval.getData();
@@ -412,23 +423,12 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext*
return vector<MigrateInfo>();
}
- // Get a list of all the shards that are participating in this balance round along with any
- // maximum allowed quotas and current utilization. We get the latter by issuing
- // db.serverStatus() (mem.mapped) to all shards.
- //
- // TODO: skip unresponsive shards and mark information as stale.
- auto shardInfoStatus = DistributionStatus::populateShardInfoMap(txn);
- if (!shardInfoStatus.isOK()) {
- return shardInfoStatus.getStatus();
- }
-
- const ShardInfoMap shardInfo(std::move(shardInfoStatus.getValue()));
-
- if (shardInfo.size() < 2) {
+ const auto clusterStats = uassertStatusOK(_clusterStats->getStats(txn));
+ if (clusterStats.size() < 2) {
return vector<MigrateInfo>();
}
- OCCASIONALLY warnOnMultiVersion(shardInfo);
+ OCCASIONALLY warnOnMultiVersion(clusterStats);
std::vector<MigrateInfo> candidateChunks;
@@ -459,9 +459,7 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext*
for (const ChunkType& chunk : allNsChunks) {
allChunkMinimums.insert(chunk.getMin().getOwned());
-
- vector<ChunkType>& chunksList = shardToChunksMap[chunk.getShard()];
- chunksList.push_back(chunk);
+ shardToChunksMap[chunk.getShard()].push_back(chunk);
}
if (shardToChunksMap.empty()) {
@@ -469,12 +467,12 @@ StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext*
continue;
}
- for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
+ for (const auto& stat : clusterStats) {
// This loop just makes sure there is an entry in shardToChunksMap for every shard
- shardToChunksMap[i->first];
+ shardToChunksMap[stat.shardId];
}
- DistributionStatus distStatus(shardInfo, shardToChunksMap);
+ DistributionStatus distStatus(clusterStats, shardToChunksMap);
// TODO: TagRange contains all the information from TagsType except for the namespace,
// so maybe the two can be merged at some point in order to avoid the
@@ -610,7 +608,7 @@ int Balancer::_moveChunks(OperationContext* txn,
<< " was deleted while balancing was active. Aborting balancing round.",
cm);
- ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min);
+ shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min);
if (c->getMin().woCompare(migrateInfo.chunk.min) ||
c->getMax().woCompare(migrateInfo.chunk.max)) {
diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h
index 17b58384590..3b808f485d9 100644
--- a/src/mongo/s/balance.h
+++ b/src/mongo/s/balance.h
@@ -36,6 +36,7 @@
namespace mongo {
+class ClusterStatistics;
struct MigrateInfo;
class MigrationSecondaryThrottleOptions;
class OperationContext;
@@ -62,6 +63,10 @@ public:
*/
static Balancer* get(OperationContext* operationContext);
+ ClusterStatistics* getClusterStatistics() const {
+ return _clusterStats.get();
+ }
+
// BackgroundJob methods
virtual void run();
@@ -83,7 +88,7 @@ private:
/**
* Marks this balancer as being live on the config server(s).
*/
- void _ping(OperationContext* txn, bool waiting = false);
+ void _ping(OperationContext* txn, bool waiting);
/**
* Returns true if all the servers listed in configdb as being shards are reachable and are
@@ -120,6 +125,9 @@ private:
// number of moved chunks in last round
int _balancedLastTime;
+
+ // Source for cluster statistics
+ std::unique_ptr<ClusterStatistics> _clusterStats;
};
} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics.cpp b/src/mongo/s/balancer/cluster_statistics.cpp
new file mode 100644
index 00000000000..0c66348ef36
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+
+ClusterStatistics::ClusterStatistics() = default;
+
+ClusterStatistics::~ClusterStatistics() = default;
+
+ClusterStatistics::ShardStatistics::ShardStatistics() = default;
+
+ClusterStatistics::ShardStatistics::ShardStatistics(ShardId inShardId,
+ uint64_t inMaxSizeMB,
+ uint64_t inCurrSizeMB,
+ bool inIsDraining,
+ const std::set<std::string> inShardTags,
+ const std::string inMongoVersion)
+ : shardId(std::move(inShardId)),
+ maxSizeMB(std::move(inMaxSizeMB)),
+ currSizeMB(std::move(inCurrSizeMB)),
+ isDraining(std::move(inIsDraining)),
+ shardTags(std::move(inShardTags)),
+ mongoVersion(std::move(inMongoVersion)) {}
+
+bool ClusterStatistics::ShardStatistics::isSizeMaxed() const {
+ if (!maxSizeMB || !currSizeMB) {
+ return false;
+ }
+
+ return currSizeMB >= maxSizeMB;
+}
+
+BSONObj ClusterStatistics::ShardStatistics::toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("id", shardId);
+ builder.append("maxSizeMB", static_cast<long long>(maxSizeMB));
+ builder.append("currSizeMB", static_cast<long long>(currSizeMB));
+ builder.append("draining", isDraining);
+ if (!shardTags.empty()) {
+ BSONArrayBuilder arrayBuilder(builder.subarrayStart("tags"));
+ arrayBuilder.append(shardTags);
+ }
+
+ builder.append("version", mongoVersion);
+ return builder.obj();
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics.h b/src/mongo/s/balancer/cluster_statistics.h
new file mode 100644
index 00000000000..748b80c2956
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics.h
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/client/shard.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+template <typename T>
+class StatusWith;
+
+/**
+ * This interface serves as means for obtaining data distribution and shard utilization statistics
+ * for the entire sharded cluster. Implementations may choose whatever means necessary to perform
+ * the statistics collection. There should be one instance of this object per service context.
+ */
+class ClusterStatistics {
+ MONGO_DISALLOW_COPYING(ClusterStatistics);
+
+public:
+ /**
+ * Structure, which describes the statistics of a single shard host.
+ */
+ struct ShardStatistics {
+ public:
+ ShardStatistics();
+ ShardStatistics(ShardId shardId,
+ uint64_t maxSizeMB,
+ uint64_t currSizeMB,
+ bool isDraining,
+ const std::set<std::string> shardTags,
+ const std::string mongoVersion);
+
+ /**
+ * Returns if a shard cannot receive any new chunks because it has reached the per-shard
+ * data size limit.
+ */
+ bool isSizeMaxed() const;
+
+ /**
+ * Returns BSON representation of this shard's statistics, for reporting purposes.
+ */
+ BSONObj toBSON() const;
+
+ // The id of the shard for which this statistic applies
+ ShardId shardId;
+
+ // The maximum size allowed for the shard
+ uint64_t maxSizeMB{0};
+
+ // The current size of the shard
+ uint64_t currSizeMB{0};
+
+ // Whether the shard is in draining mode
+ bool isDraining{false};
+
+ // Set of tags for the shard
+ std::set<std::string> shardTags;
+
+ // Version of mongod, which runs on this shard's primary
+ std::string mongoVersion;
+ };
+
+ virtual ~ClusterStatistics();
+
+ /**
+ * Retrieves a snapshot of the current shard utilization state. The implementation of this
+ * method may block if necessary in order to refresh its state or may return a cached value.
+ */
+ virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) = 0;
+
+protected:
+ ClusterStatistics();
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp
new file mode 100644
index 00000000000..a6e0b3dcf60
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp
@@ -0,0 +1,151 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics_impl.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/s/catalog/catalog_manager.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using std::string;
+using std::vector;
+
+namespace {
+
+const char kVersionField[] = "version";
+
+/**
+ * Executes the serverStatus command against the specified shard and obtains the version of the
+ * running MongoD service.
+ *
+ * Returns the MongoD version in strig format or an error. Known error codes are:
+ * ShardNotFound if shard by that id is not available on the registry
+ * NoSuchKey if the version could not be retrieved
+ */
+StatusWith<string> retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId) {
+ auto shardRegistry = Grid::get(txn)->shardRegistry();
+ auto commandStatus = shardRegistry->runIdempotentCommandOnShard(
+ txn,
+ shardId,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("serverStatus" << 1));
+ if (!commandStatus.isOK()) {
+ return commandStatus.getStatus();
+ }
+
+ BSONObj serverStatus = std::move(commandStatus.getValue());
+
+ string version;
+ Status status = bsonExtractStringField(serverStatus, kVersionField, &version);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return version;
+}
+
+} // namespace
+
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+ClusterStatisticsImpl::ClusterStatisticsImpl() = default;
+
+ClusterStatisticsImpl::~ClusterStatisticsImpl() = default;
+
+StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* txn) {
+ try {
+ _refreshShardStats(txn);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+
+ vector<ShardStatistics> stats;
+
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ for (const auto& stat : _shardStatsMap) {
+ stats.push_back(stat.second);
+ }
+
+ return stats;
+}
+
+void ClusterStatisticsImpl::_refreshShardStats(OperationContext* txn) {
+ // Get a list of all the shards that are participating in this balance round along with any
+ // maximum allowed quotas and current utilization. We get the latter by issuing
+ // db.serverStatus() (mem.mapped) to all shards.
+ //
+ // TODO: skip unresponsive shards and mark information as stale.
+ auto shardsStatus = Grid::get(txn)->catalogManager(txn)->getAllShards(txn);
+ uassertStatusOK(shardsStatus.getStatus());
+
+ const vector<ShardType> shards(std::move(shardsStatus.getValue().value));
+
+ for (const auto& shard : shards) {
+ auto shardSizeStatus = shardutil::retrieveTotalShardSize(txn, shard.getName());
+ if (!shardSizeStatus.isOK()) {
+ continue;
+ }
+
+ auto mongoDVersionStatus = retrieveShardMongoDVersion(txn, shard.getName());
+ if (!mongoDVersionStatus.isOK()) {
+ continue;
+ }
+ const string mongoDVersion(std::move(mongoDVersionStatus.getValue()));
+
+ std::set<string> shardTags;
+ for (const auto& shardTag : shard.getTags()) {
+ shardTags.insert(shardTag);
+ }
+
+ ShardStatistics newShardStat(shard.getName(),
+ shard.getMaxSizeMB(),
+ shardSizeStatus.getValue() / 1024 / 1024,
+ shard.getDraining(),
+ shardTags,
+ mongoDVersion);
+
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ _shardStatsMap[shard.getName()] = std::move(newShardStat);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.h b/src/mongo/s/balancer/cluster_statistics_impl.h
new file mode 100644
index 00000000000..a1ea1829ac1
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_impl.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <map>
+
+#include "mongo/s/balancer/cluster_statistics.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class OperationContext;
+class Status;
+
+/**
+ * Default implementation for the cluster statistics gathering utility. Uses a blocking method to
+ * fetch the statistics and does not perform any caching.
+ */
+class ClusterStatisticsImpl final : public ClusterStatistics {
+public:
+ ClusterStatisticsImpl();
+ ~ClusterStatisticsImpl();
+
+ StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) override;
+
+private:
+ typedef std::map<ShardId, ShardStatistics> ShardStatisticsMap;
+
+ /**
+ * Refreshes the list of available shards and loops through them in order to collect usage
+ * statistics. If any of the shards fails to report statistics, skips it and continues with the
+ * next.
+ *
+ * If the list of shards cannot be retrieved throws an exception.
+ */
+ void _refreshShardStats(OperationContext* txn);
+
+ // Mutex to protect the mutable state below
+ stdx::mutex _mutex;
+
+ // The most up-to-date shard statistics
+ ShardStatisticsMap _shardStatsMap;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_test.cpp b/src/mongo/s/balancer/cluster_statistics_test.cpp
new file mode 100644
index 00000000000..4fe4f0b27e5
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_test.cpp
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+const auto emptyTagSet = std::set<std::string>();
+
+TEST(ShardStatistics, SizeMaxedTest) {
+ ASSERT(!ShardStatistics("TestShardId", 0, 0, false, emptyTagSet, "3.2.0").isSizeMaxed());
+ ASSERT(!ShardStatistics("TestShardId", 100LL, 80LL, false, emptyTagSet, "3.2.0").isSizeMaxed());
+ ASSERT(ShardStatistics("TestShardId", 100LL, 110LL, false, emptyTagSet, "3.2.0").isSizeMaxed());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp
index 1a307df60b2..8737dca75cd 100644
--- a/src/mongo/s/balancer_policy.cpp
+++ b/src/mongo/s/balancer_policy.cpp
@@ -56,52 +56,13 @@ using std::set;
using std::string;
using std::vector;
-namespace {
-
-/**
- * Executes the serverStatus command against the specified shard and obtains the version of the
- * running MongoD service.
- *
- * The MongoD version or throws an exception. Known exception codes are:
- * ShardNotFound if shard by that id is not available on the registry
- * NoSuchKey if the version could not be retrieved
- */
-std::string retrieveShardMongoDVersion(OperationContext* txn,
- ShardId shardId,
- ShardRegistry* shardRegistry) {
- BSONObj serverStatus = uassertStatusOK(shardRegistry->runIdempotentCommandOnShard(
- txn,
- shardId,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- BSON("serverStatus" << 1)));
- BSONElement versionElement = serverStatus["version"];
- if (versionElement.type() != String) {
- uassertStatusOK({ErrorCodes::NoSuchKey, "version field not found in serverStatus"});
- }
-
- return versionElement.str();
-}
-
-} // namespace
-
string TagRange::toString() const {
return str::stream() << min << " -->> " << max << " on " << tag;
}
-DistributionStatus::DistributionStatus(const ShardInfoMap& shardInfo,
+DistributionStatus::DistributionStatus(ShardStatisticsVector shardInfo,
const ShardToChunksMap& shardToChunksMap)
- : _shardInfo(shardInfo), _shardChunks(shardToChunksMap) {
- for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
- _shardIds.insert(i->first);
- }
-}
-
-const ShardInfo& DistributionStatus::shardInfo(const ShardId& shardId) const {
- ShardInfoMap::const_iterator i = _shardInfo.find(shardId);
- verify(i != _shardInfo.end());
- return i->second;
-}
+ : _shardInfo(std::move(shardInfo)), _shardChunks(shardToChunksMap) {}
unsigned DistributionStatus::totalChunks() const {
unsigned total = 0;
@@ -145,30 +106,30 @@ string DistributionStatus::getBestReceieverShard(const string& tag) const {
string best;
unsigned minChunks = numeric_limits<unsigned>::max();
- for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
- if (i->second.isSizeMaxed()) {
- LOG(1) << i->first << " has already reached the maximum total chunk size.";
+ for (const auto& stat : _shardInfo) {
+ if (stat.isSizeMaxed()) {
+ LOG(1) << stat.shardId << " has already reached the maximum total chunk size.";
continue;
}
- if (i->second.isDraining()) {
- LOG(1) << i->first << " is currently draining.";
+ if (stat.isDraining) {
+ LOG(1) << stat.shardId << " is currently draining.";
continue;
}
- if (!i->second.hasTag(tag)) {
- LOG(1) << i->first << " doesn't have right tag";
+ if (!tag.empty() && !stat.shardTags.count(tag)) {
+ LOG(1) << stat.shardId << " doesn't have right tag";
continue;
}
- unsigned myChunks = numberOfChunksInShard(i->first);
+ unsigned myChunks = numberOfChunksInShard(stat.shardId);
if (myChunks >= minChunks) {
- LOG(1) << i->first << " has more chunks me:" << myChunks << " best: " << best << ":"
+ LOG(1) << stat.shardId << " has more chunks me:" << myChunks << " best: " << best << ":"
<< minChunks;
continue;
}
- best = i->first;
+ best = stat.shardId;
minChunks = myChunks;
}
@@ -179,12 +140,12 @@ string DistributionStatus::getMostOverloadedShard(const string& tag) const {
string worst;
unsigned maxChunks = 0;
- for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
- unsigned myChunks = numberOfChunksInShardWithTag(i->first, tag);
+ for (const auto& stat : _shardInfo) {
+ unsigned myChunks = numberOfChunksInShardWithTag(stat.shardId, tag);
if (myChunks <= maxChunks)
continue;
- worst = i->first;
+ worst = stat.shardId;
maxChunks = myChunks;
}
@@ -250,10 +211,10 @@ void DistributionStatus::dump() const {
log() << "DistributionStatus";
log() << " shards";
- for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
- log() << " " << i->first << "\t" << i->second.toString();
+ for (const auto& stat : _shardInfo) {
+ log() << " " << stat.shardId << "\t" << stat.toBSON();
- ShardToChunksMap::const_iterator j = _shardChunks.find(i->first);
+ ShardToChunksMap::const_iterator j = _shardChunks.find(stat.shardId);
verify(j != _shardChunks.end());
const vector<ChunkType>& v = j->second;
@@ -271,50 +232,12 @@ void DistributionStatus::dump() const {
}
}
-StatusWith<ShardInfoMap> DistributionStatus::populateShardInfoMap(OperationContext* txn) {
- try {
- auto shardsStatus = grid.catalogManager(txn)->getAllShards(txn);
- if (!shardsStatus.isOK()) {
- return shardsStatus.getStatus();
- }
- const vector<ShardType> shards(std::move(shardsStatus.getValue().value));
-
- ShardInfoMap shardInfo;
-
- for (const ShardType& shardData : shards) {
- std::set<std::string> dummy;
-
- const long long shardSizeBytes =
- uassertStatusOK(shardutil::retrieveTotalShardSize(txn, shardData.getName()));
-
- const std::string shardMongodVersion =
- retrieveShardMongoDVersion(txn, shardData.getName(), grid.shardRegistry());
-
- ShardInfo newShardEntry(shardData.getMaxSizeMB(),
- shardSizeBytes / 1024 / 1024,
- shardData.getDraining(),
- dummy,
- shardMongodVersion);
-
- for (const string& shardTag : shardData.getTags()) {
- newShardEntry.addTag(shardTag);
- }
-
- shardInfo.insert(make_pair(shardData.getName(), newShardEntry));
- }
-
- return std::move(shardInfo);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-}
-
-void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards,
+void DistributionStatus::populateShardToChunksMap(const ShardStatisticsVector& allShards,
const ChunkManager& chunkMgr,
ShardToChunksMap* shardToChunksMap) {
// Makes sure there is an entry in shardToChunksMap for every shard.
- for (ShardInfoMap::const_iterator it = allShards.begin(); it != allShards.end(); ++it) {
- (*shardToChunksMap)[it->first];
+ for (const auto& stat : allShards) {
+ (*shardToChunksMap)[stat.shardId];
}
const ChunkMap& chunkMap = chunkMgr.getChunkMap();
@@ -346,18 +269,16 @@ MigrateInfo* BalancerPolicy::balance(const string& ns,
// 1) check things we have to move
{
- for (const ShardId& shardId : distribution.shardIds()) {
- const ShardInfo& info = distribution.shardInfo(shardId);
-
- if (!info.isDraining())
+ for (const auto& stat : distribution.getStats()) {
+ if (!stat.isDraining)
continue;
- if (distribution.numberOfChunksInShard(shardId) == 0)
+ if (distribution.numberOfChunksInShard(stat.shardId) == 0)
continue;
// now we know we need to move to chunks off this shard
// we will if we are allowed
- const vector<ChunkType>& chunks = distribution.getChunks(shardId);
+ const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
unsigned numJumboChunks = 0;
// since we have to move all chunks, lets just do in order
@@ -373,32 +294,32 @@ MigrateInfo* BalancerPolicy::balance(const string& ns,
if (to.size() == 0) {
warning() << "want to move chunk: " << chunkToMove << "(" << tag << ") "
- << "from " << shardId << " but can't find anywhere to put it";
+ << "from " << stat.shardId << " but can't find anywhere to put it";
continue;
}
- log() << "going to move " << chunkToMove << " from " << shardId << "(" << tag << ")"
+ log() << "going to move " << chunkToMove << " from " << stat.shardId << "(" << tag
+ << ")"
<< " to " << to;
- return new MigrateInfo(ns, to, shardId, chunkToMove.toBSON());
+ return new MigrateInfo(ns, to, stat.shardId, chunkToMove.toBSON());
}
- warning() << "can't find any chunk to move from: " << shardId << " but we want to. "
+ warning() << "can't find any chunk to move from: " << stat.shardId
+ << " but we want to. "
<< " numJumboChunks: " << numJumboChunks;
}
}
// 2) tag violations
if (distribution.tags().size() > 0) {
- for (const ShardId& shardId : distribution.shardIds()) {
- const ShardInfo& info = distribution.shardInfo(shardId);
-
- const vector<ChunkType>& chunks = distribution.getChunks(shardId);
+ for (const auto& stat : distribution.getStats()) {
+ const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
for (unsigned j = 0; j < chunks.size(); j++) {
const ChunkType& chunk = chunks[j];
string tag = distribution.getTagForChunk(chunk);
- if (info.hasTag(tag))
+ if (tag.empty() || stat.shardTags.count(tag))
continue;
// uh oh, this chunk is in the wrong place
@@ -414,9 +335,10 @@ MigrateInfo* BalancerPolicy::balance(const string& ns,
log() << "no where to put it :(";
continue;
}
- verify(to != shardId);
+
+ invariant(to != stat.shardId);
log() << " going to move to: " << to;
- return new MigrateInfo(ns, to, shardId, chunk.toBSON());
+ return new MigrateInfo(ns, to, stat.shardId, chunk.toBSON());
}
}
}
@@ -501,52 +423,6 @@ MigrateInfo* BalancerPolicy::balance(const string& ns,
return NULL;
}
-
-ShardInfo::ShardInfo(long long maxSizeMB,
- long long currSizeMB,
- bool draining,
- const set<string>& tags,
- const string& mongoVersion)
- : _maxSizeMB(maxSizeMB),
- _currSizeMB(currSizeMB),
- _draining(draining),
- _tags(tags),
- _mongoVersion(mongoVersion) {}
-
-ShardInfo::ShardInfo() : _maxSizeMB(0), _currSizeMB(0), _draining(false) {}
-
-void ShardInfo::addTag(const string& tag) {
- _tags.insert(tag);
-}
-
-
-bool ShardInfo::isSizeMaxed() const {
- if (_maxSizeMB == 0 || _currSizeMB == 0)
- return false;
-
- return _currSizeMB >= _maxSizeMB;
-}
-
-bool ShardInfo::hasTag(const string& tag) const {
- if (tag.size() == 0)
- return true;
- return _tags.count(tag) > 0;
-}
-
-string ShardInfo::toString() const {
- StringBuilder ss;
- ss << " maxSizeMB: " << _maxSizeMB;
- ss << " currSizeMB: " << _currSizeMB;
- ss << " draining: " << _draining;
- if (_tags.size() > 0) {
- ss << "tags : ";
- for (set<string>::const_iterator i = _tags.begin(); i != _tags.end(); ++i)
- ss << *i << ",";
- }
- ss << " version: " << _mongoVersion;
- return ss.str();
-}
-
string ChunkInfo::toString() const {
StringBuilder buf;
buf << " min: " << min;
diff --git a/src/mongo/s/balancer_policy.h b/src/mongo/s/balancer_policy.h
index 4d764b62135..96accafa482 100644
--- a/src/mongo/s/balancer_policy.h
+++ b/src/mongo/s/balancer_policy.h
@@ -32,6 +32,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/jsobj.h"
+#include "mongo/s/balancer/cluster_statistics.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
@@ -65,59 +66,6 @@ struct TagRange {
std::string toString() const;
};
-
-class ShardInfo {
-public:
- ShardInfo();
- ShardInfo(long long maxSizeMB,
- long long currSizeMB,
- bool draining,
- const std::set<std::string>& tags = std::set<std::string>(),
- const std::string& _mongoVersion = std::string(""));
-
- void addTag(const std::string& tag);
-
- /** @return true if we have the tag OR if the tag is "" */
- bool hasTag(const std::string& tag) const;
-
- /**
- * @return true if a shard cannot receive any new chunks because it reaches 'shardLimits'.
- * Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size
- * in MB, on 'shardLimits'.
- */
- bool isSizeMaxed() const;
-
- /**
- * @return true if 'shardLimist' contains a field "draining". Expects the optional field
- * "isDraining" on 'shrdLimits'.
- */
- bool isDraining() const {
- return _draining;
- }
-
- long long getMaxSizeMB() const {
- return _maxSizeMB;
- }
-
- long long getCurrSizeMB() const {
- return _currSizeMB;
- }
-
- std::string getMongoVersion() const {
- return _mongoVersion;
- }
-
- std::string toString() const;
-
-private:
- long long _maxSizeMB;
- long long _currSizeMB;
- bool _draining;
- std::set<std::string> _tags;
- std::string _mongoVersion;
-};
-
-
struct MigrateInfo {
MigrateInfo(const std::string& a_ns,
const ShardId& a_to,
@@ -131,15 +79,14 @@ struct MigrateInfo {
const ChunkInfo chunk;
};
-typedef std::map<ShardId, ShardInfo> ShardInfoMap;
+typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector;
typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap;
-
class DistributionStatus {
MONGO_DISALLOW_COPYING(DistributionStatus);
public:
- DistributionStatus(const ShardInfoMap& shardInfo, const ShardToChunksMap& shardToChunksMap);
+ DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap);
// only used when building
@@ -185,36 +132,25 @@ public:
/** @return the right tag for chunk, possibly "" */
std::string getTagForChunk(const ChunkType& chunk) const;
- /** @return all shard ids we know about */
- const std::set<ShardId>& shardIds() const {
- return _shardIds;
- }
-
- /** @return the ShardInfo for the shard */
- const ShardInfo& shardInfo(const ShardId& shardId) const;
-
/** writes all state to log() */
void dump() const;
- /**
- * Retrieves shard metadata information from the config server as well as some stats
- * from the shards.
- */
- static StatusWith<ShardInfoMap> populateShardInfoMap(OperationContext* txn);
+ const ShardStatisticsVector& getStats() const {
+ return _shardInfo;
+ }
/**
* Note: jumbo and versions are not set.
*/
- static void populateShardToChunksMap(const ShardInfoMap& allShards,
+ static void populateShardToChunksMap(const ShardStatisticsVector& allShards,
const ChunkManager& chunkMgr,
ShardToChunksMap* shardToChunksMap);
private:
- const ShardInfoMap& _shardInfo;
+ const ShardStatisticsVector _shardInfo;
const ShardToChunksMap& _shardChunks;
std::map<BSONObj, TagRange> _tagRanges;
std::set<std::string> _allTags;
- std::set<ShardId> _shardIds;
};
diff --git a/src/mongo/s/balancer_policy_tests.cpp b/src/mongo/s/balancer_policy_tests.cpp
index b200576ee7b..d3550cd3cf3 100644
--- a/src/mongo/s/balancer_policy_tests.cpp
+++ b/src/mongo/s/balancer_policy_tests.cpp
@@ -28,10 +28,11 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+#include "mongo/platform/basic.h"
+
#include "mongo/platform/random.h"
#include "mongo/s/balancer_policy.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/config.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
@@ -44,11 +45,18 @@ using std::string;
using std::stringstream;
using std::vector;
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+const auto emptyTagSet = std::set<std::string>();
+const std::string emptyShardVersion = "";
+
+ShardStatistics& findStat(std::vector<ShardStatistics>& stats, const ShardId& shardId) {
+ for (auto& stat : stats) {
+ if (stat.shardId == shardId)
+ return stat;
+ }
-TEST(BalancerPolicyTests, SizeMaxedShardTest) {
- ASSERT(!ShardInfo(0, 0, false).isSizeMaxed());
- ASSERT(!ShardInfo(100LL, 80LL, false).isSizeMaxed());
- ASSERT(ShardInfo(100LL, 110LL, false).isSizeMaxed());
+ MONGO_UNREACHABLE;
}
TEST(BalancerPolicyTests, BalanceNormalTest) {
@@ -72,14 +80,13 @@ TEST(BalancerPolicyTests, BalanceNormalTest) {
chunkMap["shard0"] = chunks;
chunkMap["shard1"] = vector<ChunkType>();
- // no limits
- ShardInfoMap info;
- info["shard0"] = ShardInfo(0, 2, false);
- info["shard1"] = ShardInfo(0, 0, false);
+ // No limits
+ DistributionStatus status(
+ {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)},
+ chunkMap);
- DistributionStatus status(info, chunkMap);
std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 1));
-
ASSERT(c);
}
@@ -129,12 +136,11 @@ TEST(BalancerPolicyTests, BalanceJumbo) {
chunkMap["shard0"] = chunks;
chunkMap["shard1"] = vector<ChunkType>();
- // no limits
- ShardInfoMap info;
- info["shard0"] = ShardInfo(0, 2, false);
- info["shard1"] = ShardInfo(0, 0, false);
-
- DistributionStatus status(info, chunkMap);
+ // No limits
+ DistributionStatus status(
+ {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)},
+ chunkMap);
std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 1));
ASSERT(c);
@@ -163,11 +169,10 @@ TEST(BalanceNormalTests, BalanceDrainingTest) {
chunkMap["shard1"] = vector<ChunkType>();
// shard0 is draining
- ShardInfoMap limitsMap;
- limitsMap["shard0"] = ShardInfo(0LL, 2LL, true);
- limitsMap["shard1"] = ShardInfo(0LL, 0LL, false);
-
- DistributionStatus status(limitsMap, chunkMap);
+ DistributionStatus status(
+ {ShardStatistics("shard0", 0, 2, true, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 0, false, emptyTagSet, emptyShardVersion)},
+ chunkMap);
std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0));
ASSERT(c);
@@ -197,12 +202,11 @@ TEST(BalancerPolicyTests, BalanceEndedDrainingTest) {
chunkMap["shard0"] = chunks;
chunkMap["shard1"] = vector<ChunkType>();
- // no limits
- ShardInfoMap limitsMap;
- limitsMap["shard0"] = ShardInfo(0, 2, false);
- limitsMap["shard1"] = ShardInfo(0, 0, true);
-
- DistributionStatus status(limitsMap, chunkMap);
+ // shard1 is draining
+ DistributionStatus status(
+ {ShardStatistics("shard0", 0, 2, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 0, true, emptyTagSet, emptyShardVersion)},
+ chunkMap);
std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0));
ASSERT(!c);
@@ -231,12 +235,11 @@ TEST(BalancerPolicyTests, BalanceImpasseTest) {
chunkMap["shard2"] = vector<ChunkType>();
// shard0 is draining, shard1 is maxed out, shard2 has writebacks pending
- ShardInfoMap limitsMap;
- limitsMap["shard0"] = ShardInfo(0, 2, true);
- limitsMap["shard1"] = ShardInfo(1, 1, false);
- limitsMap["shard2"] = ShardInfo(0, 1, true);
-
- DistributionStatus status(limitsMap, chunkMap);
+ DistributionStatus status(
+ {ShardStatistics("shard0", 0, 2, true, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 1, 1, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard2", 0, 1, true, emptyTagSet, emptyShardVersion)},
+ chunkMap);
std::unique_ptr<MigrateInfo> c(BalancerPolicy::balance("ns", status, 0));
ASSERT(!c);
@@ -297,12 +300,10 @@ TEST(BalancerPolicyTests, MultipleDraining) {
addShard(chunks, 10, false);
addShard(chunks, 5, true);
- ShardInfoMap shards;
- shards["shard0"] = ShardInfo(0, 5, true);
- shards["shard1"] = ShardInfo(0, 5, true);
- shards["shard2"] = ShardInfo(0, 5, false);
-
- DistributionStatus d(shards, chunks);
+ DistributionStatus d({ShardStatistics("shard0", 0, 5, true, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 5, true, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard2", 0, 5, false, emptyTagSet, emptyShardVersion)},
+ chunks);
std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0));
ASSERT(m);
@@ -316,18 +317,12 @@ TEST(BalancerPolicyTests, TagsDraining) {
addShard(chunks, 5, false);
addShard(chunks, 5, true);
- ShardInfoMap shards;
- shards["shard0"] = ShardInfo(0, 5, false);
- shards["shard1"] = ShardInfo(0, 5, true);
- shards["shard2"] = ShardInfo(0, 5, false);
-
- shards["shard0"].addTag("a");
- shards["shard1"].addTag("a");
- shards["shard1"].addTag("b");
- shards["shard2"].addTag("b");
-
while (true) {
- DistributionStatus d(shards, chunks);
+ DistributionStatus d({ShardStatistics("shard0", 0, 5, false, {"a"}, emptyShardVersion),
+ ShardStatistics("shard1", 0, 5, true, {"a", "b"}, emptyShardVersion),
+ ShardStatistics("shard2", 0, 5, false, {"b"}, emptyShardVersion)},
+ chunks);
+
d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 7), "a"));
d.addTagRange(TagRange(BSON("x" << 7), BSON("x" << 1000), "b"));
@@ -357,16 +352,12 @@ TEST(BalancerPolicyTests, TagsPolicyChange) {
addShard(chunks, 5, false);
addShard(chunks, 5, true);
- ShardInfoMap shards;
- shards["shard0"] = ShardInfo(0, 5, false);
- shards["shard1"] = ShardInfo(0, 5, false);
- shards["shard2"] = ShardInfo(0, 5, false);
-
- shards["shard0"].addTag("a");
- shards["shard1"].addTag("a");
-
while (true) {
- DistributionStatus d(shards, chunks);
+ DistributionStatus d(
+ {ShardStatistics("shard0", 0, 5, false, {"a"}, emptyShardVersion),
+ ShardStatistics("shard1", 0, 5, false, {"a"}, emptyShardVersion),
+ ShardStatistics("shard2", 0, 5, false, emptyTagSet, emptyShardVersion)},
+ chunks);
d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 1000), "a"));
std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0));
@@ -388,8 +379,7 @@ TEST(BalancerPolicyTests, TagsPolicyChange) {
TEST(BalancerPolicyTests, TagsSelector) {
ShardToChunksMap chunks;
- ShardInfoMap shards;
- DistributionStatus d(shards, chunks);
+ DistributionStatus d({}, chunks);
ASSERT(d.addTagRange(TagRange(BSON("x" << 1), BSON("x" << 10), "a")));
ASSERT(d.addTagRange(TagRange(BSON("x" << 10), BSON("x" << 20), "b")));
@@ -455,13 +445,10 @@ TEST(BalancerPolicyTests, MaxSizeRespect) {
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3.
// Other shards have maxSize = 0 = unset.
-
- ShardInfoMap shards;
- shards["shard0"] = ShardInfo(1, 3, false);
- shards["shard1"] = ShardInfo(0, 4, false);
- shards["shard2"] = ShardInfo(0, 6, false);
-
- DistributionStatus d(shards, chunks);
+ DistributionStatus d({ShardStatistics("shard0", 1, 3, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 4, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard2", 0, 6, false, emptyTagSet, emptyShardVersion)},
+ chunks);
std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0));
ASSERT(m);
@@ -481,16 +468,12 @@ TEST(BalancerPolicyTests, MaxSizeNoDrain) {
addShard(chunks, 4, false);
addShard(chunks, 4, true);
- // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4.
- // Other shards have maxSize = 0 = unset.
-
- ShardInfoMap shards;
- // ShardInfo(maxSize, currSize, draining, opsQueued)
- shards["shard0"] = ShardInfo(1, 4, false);
- shards["shard1"] = ShardInfo(0, 4, false);
- shards["shard2"] = ShardInfo(0, 4, false);
-
- DistributionStatus d(shards, chunks);
+ // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. Other
+ // shards have maxSize = 0 = unset.
+ DistributionStatus d({ShardStatistics("shard0", 1, 4, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard1", 0, 4, false, emptyTagSet, emptyShardVersion),
+ ShardStatistics("shard2", 0, 4, false, emptyTagSet, emptyShardVersion)},
+ chunks);
std::unique_ptr<MigrateInfo> m(BalancerPolicy::balance("ns", d, 0));
ASSERT(!m);
@@ -522,7 +505,7 @@ TEST(BalancerPolicyTests, Simulation) {
int numChunks = 0;
ShardToChunksMap chunks;
- ShardInfoMap shards;
+ vector<ShardStatistics> shards;
map<string, int> expected;
@@ -542,12 +525,16 @@ TEST(BalancerPolicyTests, Simulation) {
addShard(chunks, numShardChunks, false);
numChunks += numShardChunks;
- shards[str::stream() << "shard" << i] =
- ShardInfo(maxed ? numShardChunks + 1 : 0, numShardChunks, draining);
+ shards.emplace_back(str::stream() << "shard" << i,
+ maxed ? numShardChunks + 1 : 0,
+ numShardChunks,
+ draining,
+ emptyTagSet,
+ emptyShardVersion);
}
- for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
- log() << it->first << " : " << it->second.toString();
+ for (const auto& stat : shards) {
+ log() << stat.shardId << " : " << stat.toBSON();
}
// Perform migrations and increment data size as chunks move
@@ -561,55 +548,44 @@ TEST(BalancerPolicyTests, Simulation) {
moveChunk(chunks, m.get());
- {
- ShardInfo& info = shards[m->from];
- shards[m->from] =
- ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() - 1, info.isDraining());
- }
-
- {
- ShardInfo& info = shards[m->to];
- shards[m->to] =
- ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() + 1, info.isDraining());
- }
+ findStat(shards, m->from).currSizeMB -= 1;
+ findStat(shards, m->to).currSizeMB += 1;
}
// Make sure our balance is correct and our data size is low.
// The balanced value is the count on the last shard, since it's not draining or
// limited.
- int balancedSize = (--shards.end())->second.getCurrSizeMB();
+ const int64_t balancedSize = (--shards.end())->currSizeMB;
- for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
- log() << it->first << " : " << it->second.toString();
- }
+ for (const auto& stat : shards) {
+ log() << stat.shardId << " : " << stat.toBSON();
- for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
- log() << it->first << " : " << it->second.toString();
+ // Cast the size once and use it from here in order to avoid typecast errors
+ const int shardCurrSizeMB = static_cast<int>(stat.currSizeMB);
- map<string, int>::iterator expectedIt = expected.find(it->first);
+ map<string, int>::iterator expectedIt = expected.find(stat.shardId);
if (expectedIt == expected.end()) {
- bool isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 &&
- it->second.getCurrSizeMB() <= balancedSize + 1;
+ const bool isInRange =
+ shardCurrSizeMB >= balancedSize - 1 && shardCurrSizeMB <= balancedSize + 1;
if (!isInRange) {
- warning() << "non-limited and non-draining shard had "
- << it->second.getCurrSizeMB() << " chunks, expected near "
- << balancedSize;
+ warning() << "non-limited and non-draining shard had " << shardCurrSizeMB
+ << " chunks, expected near " << balancedSize;
}
ASSERT(isInRange);
} else {
int expectedSize = expectedIt->second;
- bool isInRange = it->second.getCurrSizeMB() <= expectedSize;
+ bool isInRange = shardCurrSizeMB <= expectedSize;
if (isInRange && expectedSize >= balancedSize) {
- isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 &&
- it->second.getCurrSizeMB() <= balancedSize + 1;
+ isInRange =
+ shardCurrSizeMB >= balancedSize - 1 && shardCurrSizeMB <= balancedSize + 1;
}
if (!isInRange) {
- warning() << "limited or draining shard had " << it->second.getCurrSizeMB()
+ warning() << "limited or draining shard had " << shardCurrSizeMB
<< " chunks, expected less than " << expectedSize
<< " and (if less than expected) near " << balancedSize;
}
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 60ef9294cf9..cd3b226db65 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -33,7 +33,6 @@
#include "mongo/s/chunk.h"
#include "mongo/client/connpool.h"
-#include "mongo/client/dbclientcursor.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
@@ -42,7 +41,9 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/platform/random.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balance.h"
#include "mongo/s/balancer_policy.h"
+#include "mongo/s/balancer/cluster_statistics.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_settings.h"
@@ -78,25 +79,27 @@ const int kTooManySplitPoints = 4;
bool tryMoveToOtherShard(OperationContext* txn,
const ChunkManager& manager,
const ChunkType& chunk) {
- // reload sharding metadata before starting migration
- ChunkManagerPtr chunkMgr = manager.reload(txn, false /* just reloaded in mulitsplit */);
-
- auto shardInfoMapStatus = DistributionStatus::populateShardInfoMap(txn);
- if (!shardInfoMapStatus.isOK()) {
- warning() << "failed to load shard metadata while trying to moveChunk after "
- << "auto-splitting" << causedBy(shardInfoMapStatus.getStatus());
+ auto clusterStatsStatus(Balancer::get(txn)->getClusterStatistics()->getStats(txn));
+ if (!clusterStatsStatus.isOK()) {
+ warning() << "Could not get cluster statistics "
+ << causedBy(clusterStatsStatus.getStatus());
return false;
}
- const ShardInfoMap shardInfo(std::move(shardInfoMapStatus.getValue()));
+ const auto clusterStats(std::move(clusterStatsStatus.getValue()));
- if (shardInfo.size() < 2) {
+ if (clusterStats.size() < 2) {
LOG(0) << "no need to move top chunk since there's only 1 shard";
return false;
}
+ // Reload sharding metadata before starting migration. Only reload the differences though,
+ // because the entire chunk manager was reloaded during the call to split, which immediately
+ // precedes this move logic
+ shared_ptr<ChunkManager> chunkMgr = manager.reload(txn, false);
+
map<string, vector<ChunkType>> shardToChunkMap;
- DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap);
+ DistributionStatus::populateShardToChunksMap(clusterStats, *chunkMgr, &shardToChunkMap);
StatusWith<string> tagStatus =
grid.catalogManager(txn)->getTagForChunk(txn, manager.getns(), chunk);
@@ -106,7 +109,7 @@ bool tryMoveToOtherShard(OperationContext* txn,
return false;
}
- DistributionStatus chunkDistribution(shardInfo, shardToChunkMap);
+ DistributionStatus chunkDistribution(clusterStats, shardToChunkMap);
const string newLocation(chunkDistribution.getBestReceieverShard(tagStatus.getValue()));
if (newLocation.empty()) {
@@ -120,7 +123,7 @@ bool tryMoveToOtherShard(OperationContext* txn,
return false;
}
- ChunkPtr toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin());
+ shared_ptr<Chunk> toMove = chunkMgr->findIntersectingChunk(txn, chunk.getMin());
if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) {
LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate "