diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/s/balance.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/s/balance.cpp')
-rw-r--r-- | src/mongo/s/balance.cpp | 895 |
1 files changed, 433 insertions, 462 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 0ef5d3c42ce..dc429e85d80 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -64,588 +64,559 @@ namespace mongo { - using std::map; - using std::set; - using std::shared_ptr; - using std::string; - using std::unique_ptr; - using std::vector; +using std::map; +using std::set; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; - MONGO_FP_DECLARE(skipBalanceRound); +MONGO_FP_DECLARE(skipBalanceRound); - Balancer balancer; +Balancer balancer; - Balancer::Balancer() - : _balancedLastTime(0), - _policy(new BalancerPolicy()) { +Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {} - } +Balancer::~Balancer() = default; - Balancer::~Balancer() = default; +int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks, + const WriteConcernOptions* writeConcern, + bool waitForDelete) { + int movedCount = 0; - int Balancer::_moveChunks(const vector<shared_ptr<MigrateInfo>>& candidateChunks, - const WriteConcernOptions* writeConcern, - bool waitForDelete) - { - int movedCount = 0; + for (const auto& migrateInfo : candidateChunks) { + // If the balancer was disabled since we started this round, don't start new chunks + // moves. + const auto balSettingsResult = + grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); - for (const auto& migrateInfo : candidateChunks) { - // If the balancer was disabled since we started this round, don't start new chunks - // moves. - const auto balSettingsResult = - grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); + const bool isBalSettingsAbsent = + balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; + if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { + warning() << balSettingsResult.getStatus(); + return movedCount; + } - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return movedCount; - } + const SettingsType& balancerConfig = + isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); - const SettingsType& balancerConfig = isBalSettingsAbsent ? - SettingsType{} : balSettingsResult.getValue(); + if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "Stopping balancing round early as balancing was disabled"; + return movedCount; + } - if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { - LOG(1) << "Stopping balancing round early as balancing was disabled"; - return movedCount; - } + // Changes to metadata, borked metadata, and connectivity problems between shards + // should cause us to abort this chunk move, but shouldn't cause us to abort the entire + // round of chunks. + // + // TODO(spencer): We probably *should* abort the whole round on issues communicating + // with the config servers, but its impossible to distinguish those types of failures + // at the moment. + // + // TODO: Handle all these things more cleanly, since they're expected problems - // Changes to metadata, borked metadata, and connectivity problems between shards - // should cause us to abort this chunk move, but shouldn't cause us to abort the entire - // round of chunks. - // - // TODO(spencer): We probably *should* abort the whole round on issues communicating - // with the config servers, but its impossible to distinguish those types of failures - // at the moment. - // - // TODO: Handle all these things more cleanly, since they're expected problems + const NamespaceString nss(migrateInfo->ns); - const NamespaceString nss(migrateInfo->ns); + try { + auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + fassert(28628, status.getStatus()); + + shared_ptr<DBConfig> cfg = status.getValue(); - try { - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); - fassert(28628, status.getStatus()); + // NOTE: We purposely do not reload metadata here, since _doBalanceRound already + // tried to do so once. + shared_ptr<ChunkManager> cm = cfg->getChunkManager(migrateInfo->ns); + invariant(cm); - shared_ptr<DBConfig> cfg = status.getValue(); + ChunkPtr c = cm->findIntersectingChunk(migrateInfo->chunk.min); - // NOTE: We purposely do not reload metadata here, since _doBalanceRound already - // tried to do so once. - shared_ptr<ChunkManager> cm = cfg->getChunkManager(migrateInfo->ns); + if (c->getMin().woCompare(migrateInfo->chunk.min) || + c->getMax().woCompare(migrateInfo->chunk.max)) { + // Likely a split happened somewhere, so force reload the chunk manager + cm = cfg->getChunkManager(migrateInfo->ns, true); invariant(cm); - ChunkPtr c = cm->findIntersectingChunk(migrateInfo->chunk.min); + c = cm->findIntersectingChunk(migrateInfo->chunk.min); if (c->getMin().woCompare(migrateInfo->chunk.min) || - c->getMax().woCompare(migrateInfo->chunk.max)) { - - // Likely a split happened somewhere, so force reload the chunk manager - cm = cfg->getChunkManager(migrateInfo->ns, true); - invariant(cm); - - c = cm->findIntersectingChunk(migrateInfo->chunk.min); + c->getMax().woCompare(migrateInfo->chunk.max)) { + log() << "chunk mismatch after reload, ignoring will retry issue " + << migrateInfo->chunk.toString(); - if (c->getMin().woCompare(migrateInfo->chunk.min) || - c->getMax().woCompare(migrateInfo->chunk.max)) { - - log() << "chunk mismatch after reload, ignoring will retry issue " - << migrateInfo->chunk.toString(); - - continue; - } - } - - BSONObj res; - if (c->moveAndCommit(migrateInfo->to, - Chunk::MaxChunkSize, - writeConcern, - waitForDelete, - 0, /* maxTimeMS */ - res)) { - - movedCount++; continue; } + } - // The move requires acquiring the collection metadata's lock, which can fail. - log() << "balancer move failed: " << res - << " from: " << migrateInfo->from - << " to: " << migrateInfo->to - << " chunk: " << migrateInfo->chunk; + BSONObj res; + if (c->moveAndCommit(migrateInfo->to, + Chunk::MaxChunkSize, + writeConcern, + waitForDelete, + 0, /* maxTimeMS */ + res)) { + movedCount++; + continue; + } - if (res["chunkTooBig"].trueValue()) { - // Reload just to be safe - cm = cfg->getChunkManager(migrateInfo->ns); - invariant(cm); + // The move requires acquiring the collection metadata's lock, which can fail. + log() << "balancer move failed: " << res << " from: " << migrateInfo->from + << " to: " << migrateInfo->to << " chunk: " << migrateInfo->chunk; - c = cm->findIntersectingChunk(migrateInfo->chunk.min); + if (res["chunkTooBig"].trueValue()) { + // Reload just to be safe + cm = cfg->getChunkManager(migrateInfo->ns); + invariant(cm); + + c = cm->findIntersectingChunk(migrateInfo->chunk.min); - log() << "performing a split because migrate failed for size reasons"; + log() << "performing a split because migrate failed for size reasons"; - Status status = c->split(Chunk::normal, NULL, NULL); - log() << "split results: " << status; + Status status = c->split(Chunk::normal, NULL, NULL); + log() << "split results: " << status; - if (!status.isOK()) { - log() << "marking chunk as jumbo: " << c->toString(); + if (!status.isOK()) { + log() << "marking chunk as jumbo: " << c->toString(); - c->markAsJumbo(); + c->markAsJumbo(); - // We increment moveCount so we do another round right away - movedCount++; - } + // We increment moveCount so we do another round right away + movedCount++; } } - catch (const DBException& ex) { - warning() << "could not move chunk " << migrateInfo->chunk.toString() - << ", continuing balancing round" << causedBy(ex); - } + } catch (const DBException& ex) { + warning() << "could not move chunk " << migrateInfo->chunk.toString() + << ", continuing balancing round" << causedBy(ex); } - - return movedCount; } - void Balancer::_ping(bool waiting) { - grid.catalogManager()->update( - MongosType::ConfigNS, - BSON(MongosType::name(_myid)), - BSON("$set" << BSON(MongosType::ping(jsTime()) << - MongosType::up(static_cast<int>(time(0) - _started)) << - MongosType::waiting(waiting) << - MongosType::mongoVersion(versionString))), - true, - false, - NULL); - } + return movedCount; +} + +void Balancer::_ping(bool waiting) { + grid.catalogManager()->update( + MongosType::ConfigNS, + BSON(MongosType::name(_myid)), + BSON("$set" << BSON(MongosType::ping(jsTime()) + << MongosType::up(static_cast<int>(time(0) - _started)) + << MongosType::waiting(waiting) + << MongosType::mongoVersion(versionString))), + true, + false, + NULL); +} + +/* +* Builds the details object for the actionlog. +* Current formats for detail are: +* Success: { +* "candidateChunks" : , +* "chunksMoved" : , +* "executionTimeMillis" : , +* "errorOccured" : false +* } +* Failure: { +* "executionTimeMillis" : , +* "errmsg" : , +* "errorOccured" : true +* } +* @param didError, did this round end in an error? +* @param executionTime, the time this round took to run +* @param candidateChunks, the number of chunks identified to be moved +* @param chunksMoved, the number of chunks moved +* @param errmsg, the error message for this round +*/ - /* - * Builds the details object for the actionlog. - * Current formats for detail are: - * Success: { - * "candidateChunks" : , - * "chunksMoved" : , - * "executionTimeMillis" : , - * "errorOccured" : false - * } - * Failure: { - * "executionTimeMillis" : , - * "errmsg" : , - * "errorOccured" : true - * } - * @param didError, did this round end in an error? - * @param executionTime, the time this round took to run - * @param candidateChunks, the number of chunks identified to be moved - * @param chunksMoved, the number of chunks moved - * @param errmsg, the error message for this round - */ - - static BSONObj _buildDetails( bool didError, int executionTime, - int candidateChunks, int chunksMoved, const std::string& errmsg ) { - - BSONObjBuilder builder; - builder.append("executionTimeMillis", executionTime); - builder.append("errorOccured", didError); - - if ( didError ) { - builder.append("errmsg", errmsg); - } else { - builder.append("candidateChunks", candidateChunks); - builder.append("chunksMoved", chunksMoved); - } - return builder.obj(); +static BSONObj _buildDetails(bool didError, + int executionTime, + int candidateChunks, + int chunksMoved, + const std::string& errmsg) { + BSONObjBuilder builder; + builder.append("executionTimeMillis", executionTime); + builder.append("errorOccured", didError); + + if (didError) { + builder.append("errmsg", errmsg); + } else { + builder.append("candidateChunks", candidateChunks); + builder.append("chunksMoved", chunksMoved); } + return builder.obj(); +} - bool Balancer::_checkOIDs() { - vector<ShardId> all; - grid.shardRegistry()->getAllShardIds(&all); +bool Balancer::_checkOIDs() { + vector<ShardId> all; + grid.shardRegistry()->getAllShardIds(&all); - // map of OID machine ID => shardId - map<int, string> oids; + // map of OID machine ID => shardId + map<int, string> oids; - for (const ShardId& shardId : all) { - const auto s = grid.shardRegistry()->getShard(shardId); - if (!s) { - continue; - } + for (const ShardId& shardId : all) { + const auto s = grid.shardRegistry()->getShard(shardId); + if (!s) { + continue; + } - BSONObj f = s->runCommand("admin", "features"); - if ( f["oidMachine"].isNumber() ) { - int x = f["oidMachine"].numberInt(); - if (oids.count(x) == 0) { - oids[x] = shardId; + BSONObj f = s->runCommand("admin", "features"); + if (f["oidMachine"].isNumber()) { + int x = f["oidMachine"].numberInt(); + if (oids.count(x) == 0) { + oids[x] = shardId; + } else { + log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId + << " and " << oids[x]; + s->runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); + + const auto otherShard = grid.shardRegistry()->getShard(oids[x]); + if (otherShard) { + otherShard->runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); } - else { - log() << "error: 2 machines have " << x - << " as oid machine piece: " << shardId - << " and " << oids[x]; - s->runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); - - const auto otherShard = grid.shardRegistry()->getShard(oids[x]); - if (otherShard) { - otherShard->runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); - } - return false; - } - } - else { - log() << "warning: oidMachine not set on: " << s->toString(); + return false; } + } else { + log() << "warning: oidMachine not set on: " << s->toString(); } - return true; } - - /** - * Occasionally prints a log message with shard versions if the versions are not the same - * in the cluster. - */ - void warnOnMultiVersion( const ShardInfoMap& shardInfo ) { - - bool isMultiVersion = false; - for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) { - if ( !isSameMajorVersion( i->second.getMongoVersion().c_str() ) ) { - isMultiVersion = true; - break; - } - } + return true; +} - // If we're all the same version, don't message - if ( !isMultiVersion ) return; - - warning() << "multiVersion cluster detected, my version is " << versionString; - for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) { - log() << i->first << " is at version " << i->second.getMongoVersion(); - } +/** + * Occasionally prints a log message with shard versions if the versions are not the same + * in the cluster. + */ +void warnOnMultiVersion(const ShardInfoMap& shardInfo) { + bool isMultiVersion = false; + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + if (!isSameMajorVersion(i->second.getMongoVersion().c_str())) { + isMultiVersion = true; + break; + } } - void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) { - invariant(candidateChunks); + // If we're all the same version, don't message + if (!isMultiVersion) + return; - vector<CollectionType> collections; - Status collsStatus = grid.catalogManager()->getCollections(nullptr, &collections); - if (!collsStatus.isOK()) { - warning() << "Failed to retrieve the set of collections during balancing round " - << collsStatus; - return; - } + warning() << "multiVersion cluster detected, my version is " << versionString; + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + log() << i->first << " is at version " << i->second.getMongoVersion(); + } +} - if (collections.empty()) { - LOG(1) << "no collections to balance"; - return; - } +void Balancer::_doBalanceRound(vector<shared_ptr<MigrateInfo>>* candidateChunks) { + invariant(candidateChunks); - // Get a list of all the shards that are participating in this balance round along with any - // maximum allowed quotas and current utilization. We get the latter by issuing - // db.serverStatus() (mem.mapped) to all shards. - // - // TODO: skip unresponsive shards and mark information as stale. - ShardInfoMap shardInfo; - Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); - if (!loadStatus.isOK()) { - warning() << "failed to load shard metadata" << causedBy(loadStatus); - return; - } + vector<CollectionType> collections; + Status collsStatus = grid.catalogManager()->getCollections(nullptr, &collections); + if (!collsStatus.isOK()) { + warning() << "Failed to retrieve the set of collections during balancing round " + << collsStatus; + return; + } - if (shardInfo.size() < 2) { - LOG(1) << "can't balance without more active shards"; - return; - } - - OCCASIONALLY warnOnMultiVersion( shardInfo ); + if (collections.empty()) { + LOG(1) << "no collections to balance"; + return; + } - // For each collection, check if the balancing policy recommends moving anything around. - for (const auto& coll : collections) { - // Skip collections for which balancing is disabled - const NamespaceString& ns = coll.getNs(); + // Get a list of all the shards that are participating in this balance round along with any + // maximum allowed quotas and current utilization. We get the latter by issuing + // db.serverStatus() (mem.mapped) to all shards. + // + // TODO: skip unresponsive shards and mark information as stale. + ShardInfoMap shardInfo; + Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); + if (!loadStatus.isOK()) { + warning() << "failed to load shard metadata" << causedBy(loadStatus); + return; + } - if (!coll.getAllowBalance()) { - LOG(1) << "Not balancing collection " << ns << "; explicitly disabled."; - continue; - } + if (shardInfo.size() < 2) { + LOG(1) << "can't balance without more active shards"; + return; + } - std::vector<ChunkType> allNsChunks; - grid.catalogManager()->getChunks(Query(BSON(ChunkType::ns(ns))) - .sort(ChunkType::min()), - 0, // all chunks - &allNsChunks); + OCCASIONALLY warnOnMultiVersion(shardInfo); - set<BSONObj> allChunkMinimums; - map<string, vector<ChunkType>> shardToChunksMap; + // For each collection, check if the balancing policy recommends moving anything around. + for (const auto& coll : collections) { + // Skip collections for which balancing is disabled + const NamespaceString& ns = coll.getNs(); - for (const ChunkType& chunk : allNsChunks) { - allChunkMinimums.insert(chunk.getMin().getOwned()); + if (!coll.getAllowBalance()) { + LOG(1) << "Not balancing collection " << ns << "; explicitly disabled."; + continue; + } - vector<ChunkType>& chunksList = shardToChunksMap[chunk.getShard()]; - chunksList.push_back(chunk); - } + std::vector<ChunkType> allNsChunks; + grid.catalogManager()->getChunks(Query(BSON(ChunkType::ns(ns))).sort(ChunkType::min()), + 0, // all chunks + &allNsChunks); - if (shardToChunksMap.empty()) { - LOG(1) << "skipping empty collection (" << ns << ")"; - continue; - } + set<BSONObj> allChunkMinimums; + map<string, vector<ChunkType>> shardToChunksMap; - for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { - // This loop just makes sure there is an entry in shardToChunksMap for every shard - shardToChunksMap[i->first]; - } + for (const ChunkType& chunk : allNsChunks) { + allChunkMinimums.insert(chunk.getMin().getOwned()); - DistributionStatus status(shardInfo, shardToChunksMap); + vector<ChunkType>& chunksList = shardToChunksMap[chunk.getShard()]; + chunksList.push_back(chunk); + } - // TODO: TagRange contains all the information from TagsType except for the namespace, - // so maybe the two can be merged at some point in order to avoid the - // transformation below. - vector<TagRange> ranges; + if (shardToChunksMap.empty()) { + LOG(1) << "skipping empty collection (" << ns << ")"; + continue; + } - { - vector<TagsType> collectionTags; - uassertStatusOK(grid.catalogManager()->getTagsForCollection(ns.toString(), - &collectionTags)); - for (const auto& tt : collectionTags) { - ranges.push_back(TagRange(tt.getMinKey().getOwned(), - tt.getMaxKey().getOwned(), - tt.getTag())); - uassert(16356, - str::stream() << "tag ranges not valid for: " << ns.toString(), - status.addTagRange(ranges.back())); - } - } + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + // This loop just makes sure there is an entry in shardToChunksMap for every shard + shardToChunksMap[i->first]; + } - auto statusGetDb = grid.catalogCache()->getDatabase(ns.db().toString()); - if (!statusGetDb.isOK()) { - warning() << "could not load db config to balance collection [" << ns << "]: " - << statusGetDb.getStatus(); - continue; + DistributionStatus status(shardInfo, shardToChunksMap); + + // TODO: TagRange contains all the information from TagsType except for the namespace, + // so maybe the two can be merged at some point in order to avoid the + // transformation below. + vector<TagRange> ranges; + + { + vector<TagsType> collectionTags; + uassertStatusOK( + grid.catalogManager()->getTagsForCollection(ns.toString(), &collectionTags)); + for (const auto& tt : collectionTags) { + ranges.push_back( + TagRange(tt.getMinKey().getOwned(), tt.getMaxKey().getOwned(), tt.getTag())); + uassert(16356, + str::stream() << "tag ranges not valid for: " << ns.toString(), + status.addTagRange(ranges.back())); } + } - shared_ptr<DBConfig> cfg = statusGetDb.getValue(); + auto statusGetDb = grid.catalogCache()->getDatabase(ns.db().toString()); + if (!statusGetDb.isOK()) { + warning() << "could not load db config to balance collection [" << ns + << "]: " << statusGetDb.getStatus(); + continue; + } - // This line reloads the chunk manager once if this process doesn't know the collection - // is sharded yet. - shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(ns, true); - if (!cm) { - warning() << "could not load chunks to balance " << ns << " collection"; - continue; - } + shared_ptr<DBConfig> cfg = statusGetDb.getValue(); - // Loop through tags to make sure no chunk spans tags. Split on tag min for all chunks. - bool didAnySplits = false; + // This line reloads the chunk manager once if this process doesn't know the collection + // is sharded yet. + shared_ptr<ChunkManager> cm = cfg->getChunkManagerIfExists(ns, true); + if (!cm) { + warning() << "could not load chunks to balance " << ns << " collection"; + continue; + } - for (const TagRange& range : ranges) { - BSONObj min = - cm->getShardKeyPattern().getKeyPattern().extendRangeBound(range.min, false); + // Loop through tags to make sure no chunk spans tags. Split on tag min for all chunks. + bool didAnySplits = false; - if (allChunkMinimums.count(min) > 0) { - continue; - } + for (const TagRange& range : ranges) { + BSONObj min = + cm->getShardKeyPattern().getKeyPattern().extendRangeBound(range.min, false); - didAnySplits = true; + if (allChunkMinimums.count(min) > 0) { + continue; + } - log() << "ns: " << ns << " need to split on " << min - << " because there is a range there"; + didAnySplits = true; - ChunkPtr c = cm->findIntersectingChunk(min); + log() << "ns: " << ns << " need to split on " << min + << " because there is a range there"; - vector<BSONObj> splitPoints; - splitPoints.push_back( min ); + ChunkPtr c = cm->findIntersectingChunk(min); - Status status = c->multiSplit(splitPoints, NULL); - if (!status.isOK()) { - error() << "split failed: " << status; - } - else { - LOG(1) << "split worked"; - } + vector<BSONObj> splitPoints; + splitPoints.push_back(min); - break; + Status status = c->multiSplit(splitPoints, NULL); + if (!status.isOK()) { + error() << "split failed: " << status; + } else { + LOG(1) << "split worked"; } - if (didAnySplits) { - // State change, just wait till next round - continue; - } + break; + } - shared_ptr<MigrateInfo> migrateInfo(_policy->balance(ns, status, _balancedLastTime)); - if (migrateInfo) { - candidateChunks->push_back(migrateInfo); - } + if (didAnySplits) { + // State change, just wait till next round + continue; + } + + shared_ptr<MigrateInfo> migrateInfo(_policy->balance(ns, status, _balancedLastTime)); + if (migrateInfo) { + candidateChunks->push_back(migrateInfo); } } +} - bool Balancer::_init() { - try { - log() << "about to contact config servers and shards"; +bool Balancer::_init() { + try { + log() << "about to contact config servers and shards"; - // contact the config server and refresh shard information - // checks that each shard is indeed a different process (no hostname mixup) - // these checks are redundant in that they're redone at every new round but we want to do them initially here - // so to catch any problem soon - Shard::reloadShardInfo(); - _checkOIDs(); + // contact the config server and refresh shard information + // checks that each shard is indeed a different process (no hostname mixup) + // these checks are redundant in that they're redone at every new round but we want to do them initially here + // so to catch any problem soon + Shard::reloadShardInfo(); + _checkOIDs(); - log() << "config servers and shards contacted successfully"; + log() << "config servers and shards contacted successfully"; - StringBuilder buf; - buf << getHostNameCached() << ":" << serverGlobalParams.port; - _myid = buf.str(); - _started = time(0); + StringBuilder buf; + buf << getHostNameCached() << ":" << serverGlobalParams.port; + _myid = buf.str(); + _started = time(0); - log() << "balancer id: " << _myid << " started"; + log() << "balancer id: " << _myid << " started"; - return true; + return true; + } catch (std::exception& e) { + warning() << "could not initialize balancer, please check that all shards and config " + "servers are up: " << e.what(); + return false; + } +} + +void Balancer::run() { + Client::initThread("Balancer"); + + // This is the body of a BackgroundJob so if we throw here we're basically ending the + // balancer thread prematurely. + while (!inShutdown()) { + if (!_init()) { + log() << "will retry to initialize balancer in one minute"; + sleepsecs(60); + continue; } - catch ( std::exception& e ) { - warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what(); - return false; - } + break; } - void Balancer::run() { - Client::initThread("Balancer"); + const int sleepTime = 10; - // This is the body of a BackgroundJob so if we throw here we're basically ending the - // balancer thread prematurely. - while (!inShutdown()) { - if (!_init()) { - log() << "will retry to initialize balancer in one minute"; - sleepsecs(60); - continue; - } + while (!inShutdown()) { + Timer balanceRoundTimer; + ActionLogType actionLog; - break; - } + actionLog.setServer(getHostNameCached()); + actionLog.setWhat("balancer.round"); + + try { + // ping has to be first so we keep things in the config server in sync + _ping(); - const int sleepTime = 10; + BSONObj balancerResult; - while (!inShutdown()) { - Timer balanceRoundTimer; - ActionLogType actionLog; + // use fresh shard state + Shard::reloadShardInfo(); - actionLog.setServer(getHostNameCached()); - actionLog.setWhat("balancer.round"); + // refresh chunk size (even though another balancer might be active) + Chunk::refreshChunkSize(); - try { - // ping has to be first so we keep things in the config server in sync - _ping(); + auto balSettingsResult = + grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); + const bool isBalSettingsAbsent = + balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; + if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { + warning() << balSettingsResult.getStatus(); + return; + } + const SettingsType& balancerConfig = + isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); - BSONObj balancerResult; + // now make sure we should even be running + if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "skipping balancing round because balancing is disabled"; - // use fresh shard state - Shard::reloadShardInfo(); + // Ping again so scripts can determine if we're active without waiting + _ping(true); - // refresh chunk size (even though another balancer might be active) - Chunk::refreshChunkSize(); + sleepsecs(sleepTime); + continue; + } - auto balSettingsResult = - grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey); - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return; - } - const SettingsType& balancerConfig = isBalSettingsAbsent ? - SettingsType{} : balSettingsResult.getValue(); + uassert(13258, "oids broken after resetting!", _checkOIDs()); - // now make sure we should even be running - if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { + { + auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( + "balancer", "doing balance round"); - LOG(1) << "skipping balancing round because balancing is disabled"; + if (!scopedDistLock.isOK()) { + LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus()); // Ping again so scripts can determine if we're active without waiting - _ping( true ); + _ping(true); - sleepsecs( sleepTime ); + sleepsecs(sleepTime); // no need to wake up soon continue; } - uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); - - { - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( - "balancer", "doing balance round"); - - if (!scopedDistLock.isOK()) { - LOG(1) << "skipping balancing round" - << causedBy(scopedDistLock.getStatus()); - - // Ping again so scripts can determine if we're active without waiting - _ping( true ); - - sleepsecs( sleepTime ); // no need to wake up soon - continue; - } - - const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ? - balancerConfig.getWaitForDelete() : false); - - std::unique_ptr<WriteConcernOptions> writeConcern; - if (balancerConfig.isKeySet()) { // if balancer doc exists. - writeConcern = std::move(balancerConfig.getWriteConcern()); - } - - LOG(1) << "*** start balancing round. " - << "waitForDelete: " << waitForDelete - << ", secondaryThrottle: " - << (writeConcern.get() ? writeConcern->toBSON().toString() : "default") - ; - - vector<shared_ptr<MigrateInfo>> candidateChunks; - _doBalanceRound(&candidateChunks); - - if ( candidateChunks.size() == 0 ) { - LOG(1) << "no need to move any chunk"; - _balancedLastTime = 0; - } - else { - _balancedLastTime = _moveChunks(candidateChunks, - writeConcern.get(), - waitForDelete); - } - - actionLog.setDetails( - _buildDetails(false, - balanceRoundTimer.millis(), - static_cast<int>(candidateChunks.size()), - _balancedLastTime, - "")); - actionLog.setTime(jsTime()); - - grid.catalogManager()->logAction(actionLog); - - LOG(1) << "*** end of balancing round"; + const bool waitForDelete = + (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() + : false); + + std::unique_ptr<WriteConcernOptions> writeConcern; + if (balancerConfig.isKeySet()) { // if balancer doc exists. + writeConcern = std::move(balancerConfig.getWriteConcern()); } - // Ping again so scripts can determine if we're active without waiting - _ping(true); + LOG(1) << "*** start balancing round. " + << "waitForDelete: " << waitForDelete << ", secondaryThrottle: " + << (writeConcern.get() ? writeConcern->toBSON().toString() : "default"); - sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime); - } - catch ( std::exception& e ) { - log() << "caught exception while doing balance: " << e.what(); - - // Just to match the opening statement if in log level 1 - LOG(1) << "*** End of balancing round"; - - // This round failed, tell the world! - actionLog.setDetails( - _buildDetails(true, - balanceRoundTimer.millis(), - 0, - 0, - e.what())); + vector<shared_ptr<MigrateInfo>> candidateChunks; + _doBalanceRound(&candidateChunks); + + if (candidateChunks.size() == 0) { + LOG(1) << "no need to move any chunk"; + _balancedLastTime = 0; + } else { + _balancedLastTime = + _moveChunks(candidateChunks, writeConcern.get(), waitForDelete); + } + + actionLog.setDetails(_buildDetails(false, + balanceRoundTimer.millis(), + static_cast<int>(candidateChunks.size()), + _balancedLastTime, + "")); actionLog.setTime(jsTime()); grid.catalogManager()->logAction(actionLog); - // Sleep a fair amount before retrying because of the error - sleepsecs(sleepTime); - - continue; + LOG(1) << "*** end of balancing round"; } - } + // Ping again so scripts can determine if we're active without waiting + _ping(true); + + sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime); + } catch (std::exception& e) { + log() << "caught exception while doing balance: " << e.what(); + + // Just to match the opening statement if in log level 1 + LOG(1) << "*** End of balancing round"; + + // This round failed, tell the world! + actionLog.setDetails(_buildDetails(true, balanceRoundTimer.millis(), 0, 0, e.what())); + actionLog.setTime(jsTime()); + + grid.catalogManager()->logAction(actionLog); + + // Sleep a fair amount before retrying because of the error + sleepsecs(sleepTime); + + continue; + } } +} } // namespace mongo |