summaryrefslogtreecommitdiff
path: root/src/mongo/s/balancer_policy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/balancer_policy.cpp')
-rw-r--r--src/mongo/s/balancer_policy.cpp126
1 files changed, 110 insertions, 16 deletions
diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp
index 207de2b6680..2b1faa88006 100644
--- a/src/mongo/s/balancer_policy.cpp
+++ b/src/mongo/s/balancer_policy.cpp
@@ -34,6 +34,7 @@
#include "mongo/s/balancer_policy.h"
#include "mongo/s/chunk.h"
#include "mongo/s/config.h"
+#include "mongo/s/type_shard.h"
#include "mongo/s/type_tags.h"
#include "mongo/util/log.h"
#include "mongo/util/stringutils.h"
@@ -42,6 +43,52 @@
namespace mongo {
+ namespace {
+
+ Status extractShardInfo(const std::string& shardHost,
+ std::string* version,
+ bool* hasOpsQueued,
+ long long* mappedMB) {
+ BSONObj serverStatus;
+
+ try {
+ ScopedDbConnection conn(shardHost);
+
+ bool ok = conn->runCommand("admin", BSON("serverStatus" << 1), serverStatus);
+ conn.done();
+
+ uassert(28598,
+ str::stream() << "call to serverStatus on " << shardHost
+ << " failed: " << serverStatus,
+ ok);
+ }
+ catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ BSONElement versionElement = serverStatus["version"];
+ if (versionElement.type() != String) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ "supports only string type for version field in serverStatus");
+ }
+
+ *version = serverStatus["version"].String();
+
+ *hasOpsQueued = serverStatus["writeBacksQueued"].booleanSafe();
+
+ BSONElement memMappedElement = serverStatus.getFieldDotted("mem.mapped");
+
+ if (!memMappedElement.isNumber()) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ "supports only numeric type for mem.mapped field in serverStatus");
+ }
+
+ *mappedMB = memMappedElement.numberLong();
+
+ return Status::OK();
+ }
+ }
+
string TagRange::toString() const {
return str::stream() << min << " -->> " << max << " on " << tag;
}
@@ -241,31 +288,78 @@ namespace mongo {
}
}
- void DistributionStatus::populateShardInfoMap(const vector<Shard> allShards,
- ShardInfoMap* shardInfo) {
- for (vector<Shard>::const_iterator it = allShards.begin();
- it != allShards.end(); ++it ) {
- const Shard& shard = *it;
- ShardStatus status = shard.getStatus();
- shardInfo->insert(make_pair(shard.getName(),
- ShardInfo(shard.getMaxSize(),
- status.mapped(),
- shard.isDraining(),
- status.hasOpsQueued(),
- shard.tags(),
- status.mongoVersion())));
+ Status DistributionStatus::populateShardInfoMap(ShardInfoMap* shardInfo) {
+ try {
+ ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
+
+ auto_ptr<DBClientCursor> cursor(conn->query(ShardType::ConfigNS , Query()));
+ uassert(28597, "Failed to load shard config", cursor.get() != NULL);
+
+ while (cursor->more()) {
+ ShardType shard;
+ std::string errMsg;
+ bool parseOk = shard.parseBSON(cursor->next(), &errMsg);
+
+ if (!parseOk) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ errMsg);
+ }
+
+ std::string mongoShardVersion;
+ bool hasWriteBacksQueued = false;
+ long long mMappedMB = 0;
+ Status shardStatus = extractShardInfo(shard.getHost(),
+ &mongoShardVersion,
+ &hasWriteBacksQueued,
+ &mMappedMB);
+
+ if (!shardStatus.isOK()) {
+ return shardStatus;
+ }
+
+ std::set<std::string> dummy;
+ ShardInfo newShardEntry(shard.getMaxSize(),
+ mMappedMB,
+ shard.getDraining(),
+ hasWriteBacksQueued,
+ dummy,
+ mongoShardVersion);
+
+ if (shard.isTagsSet()) {
+ BSONArrayIteratorSorted tagIter(shard.getTags());
+ while (tagIter.more()) {
+ BSONElement tagElement = tagIter.next();
+ if (tagElement.type() != String) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ str::stream() << "shard tags only supports strings, "
+ << "found " << typeName(tagElement.type()));
+ }
+
+ newShardEntry.addTag(tagElement.String());
+ }
+ }
+
+ shardInfo->insert(make_pair(shard.getName(), newShardEntry));
+ }
+
+ conn.done();
}
+ catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ return Status::OK();
}
- void DistributionStatus::populateShardToChunksMap(const vector<Shard>& allShards,
+ void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards,
const ChunkManager& chunkMgr,
ShardToChunksMap* shardToChunksMap) {
// Makes sure there is an entry in shardToChunksMap for every shard.
- for (vector<Shard>::const_iterator it = allShards.begin();
+ for (ShardInfoMap::const_iterator it = allShards.begin();
it != allShards.end(); ++it) {
OwnedPointerVector<ChunkType>*& chunkList =
- (*shardToChunksMap)[it->getName()];
+ (*shardToChunksMap)[it->first];
if (chunkList == NULL) {
chunkList = new OwnedPointerVector<ChunkType>();