summaryrefslogtreecommitdiff
path: root/src/mongo/s/balance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/balance.cpp')
-rw-r--r--src/mongo/s/balance.cpp1023
1 files changed, 500 insertions, 523 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index 9203f913656..fd78d6195fe 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -61,652 +61,629 @@
namespace mongo {
- using boost::scoped_ptr;
- using std::auto_ptr;
- using std::endl;
- using std::map;
- using std::set;
- using std::string;
- using std::vector;
-
- MONGO_FP_DECLARE(skipBalanceRound);
-
- Balancer balancer;
-
- Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy() ) {}
-
- Balancer::~Balancer() {
- }
-
- int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks,
- const WriteConcernOptions* writeConcern,
- bool waitForDelete)
- {
- int movedCount = 0;
-
- for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
+using boost::scoped_ptr;
+using std::auto_ptr;
+using std::endl;
+using std::map;
+using std::set;
+using std::string;
+using std::vector;
+
+MONGO_FP_DECLARE(skipBalanceRound);
+
+Balancer balancer;
+
+Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {}
+
+Balancer::~Balancer() {}
+
+int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks,
+ const WriteConcernOptions* writeConcern,
+ bool waitForDelete) {
+ int movedCount = 0;
+
+ for (vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin();
+ it != candidateChunks->end();
+ ++it) {
+ // If the balancer was disabled since we started this round, don't start new
+ // chunks moves.
+ SettingsType balancerConfig;
+ std::string errMsg;
+
+ if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) {
+ warning() << errMsg;
+ // No point in continuing the round if the config servers are unreachable.
+ return movedCount;
+ }
- // If the balancer was disabled since we started this round, don't start new
- // chunks moves.
- SettingsType balancerConfig;
- std::string errMsg;
+ if ((balancerConfig.isKeySet() && // balancer config doc exists
+ !grid.shouldBalance(balancerConfig)) ||
+ MONGO_FAIL_POINT(skipBalanceRound)) {
+ LOG(1) << "Stopping balancing round early as balancing was disabled";
+ return movedCount;
+ }
- if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) {
- warning() << errMsg;
- // No point in continuing the round if the config servers are unreachable.
- return movedCount;
+ // Changes to metadata, borked metadata, and connectivity problems between shards should
+ // cause us to abort this chunk move, but shouldn't cause us to abort the entire round
+ // of chunks.
+ // TODO(spencer): We probably *should* abort the whole round on issues communicating
+ // with the config servers, but its impossible to distinguish those types of failures
+ // at the moment.
+ // TODO: Handle all these things more cleanly, since they're expected problems
+ const CandidateChunk& chunkInfo = *it->get();
+ try {
+ DBConfigPtr cfg = grid.getDBConfig(chunkInfo.ns);
+ verify(cfg);
+
+ // NOTE: We purposely do not reload metadata here, since _doBalanceRound already
+ // tried to do so once.
+ ChunkManagerPtr cm = cfg->getChunkManager(chunkInfo.ns);
+ verify(cm);
+
+ ChunkPtr c = cm->findIntersectingChunk(chunkInfo.chunk.min);
+ if (c->getMin().woCompare(chunkInfo.chunk.min) ||
+ c->getMax().woCompare(chunkInfo.chunk.max)) {
+ // likely a split happened somewhere
+ cm = cfg->getChunkManager(chunkInfo.ns, true /* reload */);
+ verify(cm);
+
+ c = cm->findIntersectingChunk(chunkInfo.chunk.min);
+ if (c->getMin().woCompare(chunkInfo.chunk.min) ||
+ c->getMax().woCompare(chunkInfo.chunk.max)) {
+ log() << "chunk mismatch after reload, ignoring will retry issue "
+ << chunkInfo.chunk.toString() << endl;
+ continue;
+ }
}
- if ((balancerConfig.isKeySet() && // balancer config doc exists
- !grid.shouldBalance(balancerConfig)) ||
- MONGO_FAIL_POINT(skipBalanceRound)) {
- LOG(1) << "Stopping balancing round early as balancing was disabled";
- return movedCount;
+ BSONObj res;
+ if (c->moveAndCommit(Shard::make(chunkInfo.to),
+ Chunk::MaxChunkSize,
+ writeConcern,
+ waitForDelete,
+ 0, /* maxTimeMS */
+ res)) {
+ movedCount++;
+ continue;
}
- // Changes to metadata, borked metadata, and connectivity problems between shards should
- // cause us to abort this chunk move, but shouldn't cause us to abort the entire round
- // of chunks.
- // TODO(spencer): We probably *should* abort the whole round on issues communicating
- // with the config servers, but its impossible to distinguish those types of failures
- // at the moment.
- // TODO: Handle all these things more cleanly, since they're expected problems
- const CandidateChunk& chunkInfo = *it->get();
- try {
-
- DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
- verify( cfg );
+ // the move requires acquiring the collection metadata's lock, which can fail
+ log() << "balancer move failed: " << res << " from: " << chunkInfo.from
+ << " to: " << chunkInfo.to << " chunk: " << chunkInfo.chunk << endl;
- // NOTE: We purposely do not reload metadata here, since _doBalanceRound already
- // tried to do so once.
- ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
- verify( cm );
+ if (res["chunkTooBig"].trueValue()) {
+ // reload just to be safe
+ cm = cfg->getChunkManager(chunkInfo.ns);
+ verify(cm);
+ c = cm->findIntersectingChunk(chunkInfo.chunk.min);
- ChunkPtr c = cm->findIntersectingChunk( chunkInfo.chunk.min );
- if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) {
- // likely a split happened somewhere
- cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
- verify( cm );
+ log() << "performing a split because migrate failed for size reasons";
- c = cm->findIntersectingChunk( chunkInfo.chunk.min );
- if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) {
- log() << "chunk mismatch after reload, ignoring will retry issue " << chunkInfo.chunk.toString() << endl;
- continue;
- }
- }
+ Status status = c->split(Chunk::normal, NULL, NULL);
+ log() << "split results: " << status << endl;
- BSONObj res;
- if (c->moveAndCommit(Shard::make(chunkInfo.to),
- Chunk::MaxChunkSize,
- writeConcern,
- waitForDelete,
- 0, /* maxTimeMS */
- res)) {
+ if (!status.isOK()) {
+ log() << "marking chunk as jumbo: " << c->toString() << endl;
+ c->markAsJumbo();
+ // we increment moveCount so we do another round right away
movedCount++;
- continue;
- }
-
- // the move requires acquiring the collection metadata's lock, which can fail
- log() << "balancer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
- << " chunk: " << chunkInfo.chunk << endl;
-
- if ( res["chunkTooBig"].trueValue() ) {
- // reload just to be safe
- cm = cfg->getChunkManager( chunkInfo.ns );
- verify( cm );
- c = cm->findIntersectingChunk( chunkInfo.chunk.min );
-
- log() << "performing a split because migrate failed for size reasons";
-
- Status status = c->split(Chunk::normal, NULL, NULL);
- log() << "split results: " << status << endl;
-
- if ( !status.isOK() ) {
- log() << "marking chunk as jumbo: " << c->toString() << endl;
- c->markAsJumbo();
- // we increment moveCount so we do another round right away
- movedCount++;
- }
-
}
}
- catch( const DBException& ex ) {
- warning() << "could not move chunk " << chunkInfo.chunk.toString()
- << ", continuing balancing round" << causedBy( ex ) << endl;
- }
+ } catch (const DBException& ex) {
+ warning() << "could not move chunk " << chunkInfo.chunk.toString()
+ << ", continuing balancing round" << causedBy(ex) << endl;
}
-
- return movedCount;
}
- void Balancer::_ping( bool waiting ) {
- clusterUpdate( MongosType::ConfigNS,
- BSON( MongosType::name( _myid )),
- BSON( "$set" << BSON( MongosType::ping(jsTime()) <<
- MongosType::up(static_cast<int>(time(0)-_started)) <<
- MongosType::waiting(waiting) <<
- MongosType::mongoVersion(versionString) )),
- true, // upsert
- false, // multi
- WriteConcernOptions::Unacknowledged,
- NULL );
- }
+ return movedCount;
+}
+
+void Balancer::_ping(bool waiting) {
+ clusterUpdate(MongosType::ConfigNS,
+ BSON(MongosType::name(_myid)),
+ BSON("$set" << BSON(MongosType::ping(jsTime())
+ << MongosType::up(static_cast<int>(time(0) - _started))
+ << MongosType::waiting(waiting)
+ << MongosType::mongoVersion(versionString))),
+ true, // upsert
+ false, // multi
+ WriteConcernOptions::Unacknowledged,
+ NULL);
+}
+
+/*
+* Builds the details object for the actionlog.
+* Current formats for detail are:
+* Success: {
+* "candidateChunks" : ,
+* "chunksMoved" : ,
+* "executionTimeMillis" : ,
+* "errorOccured" : false
+* }
+* Failure: {
+* "executionTimeMillis" : ,
+* "errmsg" : ,
+* "errorOccured" : true
+* }
+* @param didError, did this round end in an error?
+* @param executionTime, the time this round took to run
+* @param candidateChunks, the number of chunks identified to be moved
+* @param chunksMoved, the number of chunks moved
+* @param errmsg, the error message for this round
+*/
- /*
- * Builds the details object for the actionlog.
- * Current formats for detail are:
- * Success: {
- * "candidateChunks" : ,
- * "chunksMoved" : ,
- * "executionTimeMillis" : ,
- * "errorOccured" : false
- * }
- * Failure: {
- * "executionTimeMillis" : ,
- * "errmsg" : ,
- * "errorOccured" : true
- * }
- * @param didError, did this round end in an error?
- * @param executionTime, the time this round took to run
- * @param candidateChunks, the number of chunks identified to be moved
- * @param chunksMoved, the number of chunks moved
- * @param errmsg, the error message for this round
- */
-
- static BSONObj _buildDetails( bool didError, int executionTime,
- int candidateChunks, int chunksMoved, const std::string& errmsg ) {
-
- BSONObjBuilder builder;
- builder.append("executionTimeMillis", executionTime);
- builder.append("errorOccured", didError);
-
- if ( didError ) {
- builder.append("errmsg", errmsg);
- } else {
- builder.append("candidateChunks", candidateChunks);
- builder.append("chunksMoved", chunksMoved);
- }
- return builder.obj();
+static BSONObj _buildDetails(bool didError,
+ int executionTime,
+ int candidateChunks,
+ int chunksMoved,
+ const std::string& errmsg) {
+ BSONObjBuilder builder;
+ builder.append("executionTimeMillis", executionTime);
+ builder.append("errorOccured", didError);
+
+ if (didError) {
+ builder.append("errmsg", errmsg);
+ } else {
+ builder.append("candidateChunks", candidateChunks);
+ builder.append("chunksMoved", chunksMoved);
}
+ return builder.obj();
+}
- /**
- * Reports the result of the balancer round into config.actionlog
- *
- * @param actionLog, which contains the balancer round information to be logged
- *
- */
-
- static void _reportRound( ActionLogType& actionLog) {
- try {
- ScopedDbConnection conn( configServer.getConnectionString(), 30 );
-
- // send a copy of the message to the log in case it doesn't reach config.actionlog
- actionLog.setTime(jsTime());
-
- LOG(1) << "about to log balancer result: " << actionLog;
-
- // The following method is not thread safe. However, there is only one balancer
- // thread per mongos process. The create collection is a a no-op when the collection
- // already exists
- static bool createActionlog = false;
- if ( ! createActionlog ) {
- try {
- static const int actionLogSizeBytes = 1024 * 1024 * 2;
- conn->createCollection( ActionLogType::ConfigNS , actionLogSizeBytes , true );
- }
- catch ( const DBException& ex ) {
- LOG(1) << "config.actionlog could not be created, another mongos process "
- << "may have done so" << causedBy(ex);
-
- }
- createActionlog = true;
- }
-
- Status result = clusterInsert( ActionLogType::ConfigNS,
- actionLog.toBSON(),
- WriteConcernOptions::AllConfigs,
- NULL );
-
- if ( !result.isOK() ) {
- log() << "Error encountered while logging action from balancer "
- << result.reason();
+/**
+ * Reports the result of the balancer round into config.actionlog
+ *
+ * @param actionLog, which contains the balancer round information to be logged
+ *
+ */
+
+static void _reportRound(ActionLogType& actionLog) {
+ try {
+ ScopedDbConnection conn(configServer.getConnectionString(), 30);
+
+ // send a copy of the message to the log in case it doesn't reach config.actionlog
+ actionLog.setTime(jsTime());
+
+ LOG(1) << "about to log balancer result: " << actionLog;
+
+ // The following method is not thread safe. However, there is only one balancer
+ // thread per mongos process. The create collection is a a no-op when the collection
+ // already exists
+ static bool createActionlog = false;
+ if (!createActionlog) {
+ try {
+ static const int actionLogSizeBytes = 1024 * 1024 * 2;
+ conn->createCollection(ActionLogType::ConfigNS, actionLogSizeBytes, true);
+ } catch (const DBException& ex) {
+ LOG(1) << "config.actionlog could not be created, another mongos process "
+ << "may have done so" << causedBy(ex);
}
-
- conn.done();
- }
- catch ( const DBException& ex ) {
- // if we got here, it means the config change is only in the log;
- // the change didn't make it to config.actionlog
- warning() << "could not log balancer result" << causedBy(ex);
+ createActionlog = true;
}
- }
- bool Balancer::_checkOIDs() {
- vector<Shard> all;
- Shard::getAllShards( all );
+ Status result = clusterInsert(
+ ActionLogType::ConfigNS, actionLog.toBSON(), WriteConcernOptions::AllConfigs, NULL);
- map<int,Shard> oids;
-
- for ( vector<Shard>::iterator i=all.begin(); i!=all.end(); ++i ) {
- Shard s = *i;
- BSONObj f = s.runCommand( "admin" , "features" );
- if ( f["oidMachine"].isNumber() ) {
- int x = f["oidMachine"].numberInt();
- if ( oids.count(x) == 0 ) {
- oids[x] = s;
- }
- else {
- log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl;
- s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
- oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
- return false;
- }
- }
- else {
- log() << "warning: oidMachine not set on: " << s.toString() << endl;
- }
+ if (!result.isOK()) {
+ log() << "Error encountered while logging action from balancer " << result.reason();
}
- return true;
+
+ conn.done();
+ } catch (const DBException& ex) {
+ // if we got here, it means the config change is only in the log;
+ // the change didn't make it to config.actionlog
+ warning() << "could not log balancer result" << causedBy(ex);
}
-
- /**
- * Occasionally prints a log message with shard versions if the versions are not the same
- * in the cluster.
- */
- void warnOnMultiVersion( const ShardInfoMap& shardInfo ) {
-
- bool isMultiVersion = false;
- for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) {
- if ( !isSameMajorVersion( i->second.getMongoVersion().c_str() ) ) {
- isMultiVersion = true;
- break;
+}
+
+bool Balancer::_checkOIDs() {
+ vector<Shard> all;
+ Shard::getAllShards(all);
+
+ map<int, Shard> oids;
+
+ for (vector<Shard>::iterator i = all.begin(); i != all.end(); ++i) {
+ Shard s = *i;
+ BSONObj f = s.runCommand("admin", "features");
+ if (f["oidMachine"].isNumber()) {
+ int x = f["oidMachine"].numberInt();
+ if (oids.count(x) == 0) {
+ oids[x] = s;
+ } else {
+ log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString()
+ << " and " << oids[x].toString() << endl;
+ s.runCommand("admin", BSON("features" << 1 << "oidReset" << 1));
+ oids[x].runCommand("admin", BSON("features" << 1 << "oidReset" << 1));
+ return false;
}
+ } else {
+ log() << "warning: oidMachine not set on: " << s.toString() << endl;
}
+ }
+ return true;
+}
- // If we're all the same version, don't message
- if ( !isMultiVersion ) return;
-
- warning() << "multiVersion cluster detected, my version is " << versionString << endl;
- for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) {
- log() << i->first << " is at version " << i->second.getMongoVersion() << endl;
- }
+/**
+ * Occasionally prints a log message with shard versions if the versions are not the same
+ * in the cluster.
+ */
+void warnOnMultiVersion(const ShardInfoMap& shardInfo) {
+ bool isMultiVersion = false;
+ for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
+ if (!isSameMajorVersion(i->second.getMongoVersion().c_str())) {
+ isMultiVersion = true;
+ break;
+ }
}
- void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) {
- verify( candidateChunks );
+ // If we're all the same version, don't message
+ if (!isMultiVersion)
+ return;
- //
- // 1. Check whether there is any sharded collection to be balanced by querying
- // the ShardsNS::collections collection
- //
+ warning() << "multiVersion cluster detected, my version is " << versionString << endl;
+ for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
+ log() << i->first << " is at version " << i->second.getMongoVersion() << endl;
+ }
+}
- auto_ptr<DBClientCursor> cursor = conn.query(CollectionType::ConfigNS, BSONObj());
+void Balancer::_doBalanceRound(DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks) {
+ verify(candidateChunks);
- if ( NULL == cursor.get() ) {
- warning() << "could not query " << CollectionType::ConfigNS
- << " while trying to balance" << endl;
- return;
- }
+ //
+ // 1. Check whether there is any sharded collection to be balanced by querying
+ // the ShardsNS::collections collection
+ //
- vector< string > collections;
- while ( cursor->more() ) {
- BSONObj col = cursor->nextSafe();
+ auto_ptr<DBClientCursor> cursor = conn.query(CollectionType::ConfigNS, BSONObj());
- // sharded collections will have a shard "key".
- if ( ! col[CollectionType::keyPattern()].eoo() &&
- ! col[CollectionType::noBalance()].trueValue() ){
- collections.push_back( col[CollectionType::ns()].String() );
- }
- else if( col[CollectionType::noBalance()].trueValue() ){
- LOG(1) << "not balancing collection " << col[CollectionType::ns()].String()
- << ", explicitly disabled" << endl;
- }
-
- }
- cursor.reset();
+ if (NULL == cursor.get()) {
+ warning() << "could not query " << CollectionType::ConfigNS << " while trying to balance"
+ << endl;
+ return;
+ }
- if ( collections.empty() ) {
- LOG(1) << "no collections to balance" << endl;
- return;
+ vector<string> collections;
+ while (cursor->more()) {
+ BSONObj col = cursor->nextSafe();
+
+ // sharded collections will have a shard "key".
+ if (!col[CollectionType::keyPattern()].eoo() &&
+ !col[CollectionType::noBalance()].trueValue()) {
+ collections.push_back(col[CollectionType::ns()].String());
+ } else if (col[CollectionType::noBalance()].trueValue()) {
+ LOG(1) << "not balancing collection " << col[CollectionType::ns()].String()
+ << ", explicitly disabled" << endl;
}
+ }
+ cursor.reset();
- //
- // 2. Get a list of all the shards that are participating in this balance round
- // along with any maximum allowed quotas and current utilization. We get the
- // latter by issuing db.serverStatus() (mem.mapped) to all shards.
- //
- // TODO: skip unresponsive shards and mark information as stale.
- //
-
- ShardInfoMap shardInfo;
- Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo);
+ if (collections.empty()) {
+ LOG(1) << "no collections to balance" << endl;
+ return;
+ }
- if (!loadStatus.isOK()) {
- warning() << "failed to load shard metadata" << causedBy(loadStatus);
- return;
- }
+ //
+ // 2. Get a list of all the shards that are participating in this balance round
+ // along with any maximum allowed quotas and current utilization. We get the
+ // latter by issuing db.serverStatus() (mem.mapped) to all shards.
+ //
+ // TODO: skip unresponsive shards and mark information as stale.
+ //
- if (shardInfo.size() < 2) {
- LOG(1) << "can't balance without more active shards";
- return;
- }
-
- OCCASIONALLY warnOnMultiVersion( shardInfo );
+ ShardInfoMap shardInfo;
+ Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo);
- //
- // 3. For each collection, check if the balancing policy recommends moving anything around.
- //
+ if (!loadStatus.isOK()) {
+ warning() << "failed to load shard metadata" << causedBy(loadStatus);
+ return;
+ }
- for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
- const string& ns = *it;
+ if (shardInfo.size() < 2) {
+ LOG(1) << "can't balance without more active shards";
+ return;
+ }
- OwnedPointerMap<string, OwnedPointerVector<ChunkType> > shardToChunksMap;
- cursor = conn.query(ChunkType::ConfigNS,
- QUERY(ChunkType::ns(ns)).sort(ChunkType::min()));
+ OCCASIONALLY warnOnMultiVersion(shardInfo);
- set<BSONObj> allChunkMinimums;
+ //
+ // 3. For each collection, check if the balancing policy recommends moving anything around.
+ //
- while ( cursor->more() ) {
- BSONObj chunkDoc = cursor->nextSafe().getOwned();
+ for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it) {
+ const string& ns = *it;
- auto_ptr<ChunkType> chunk(new ChunkType());
- string errmsg;
- if (!chunk->parseBSON(chunkDoc, &errmsg)) {
- error() << "bad chunk format for " << chunkDoc
- << ": " << errmsg << endl;
- return;
- }
+ OwnedPointerMap<string, OwnedPointerVector<ChunkType>> shardToChunksMap;
+ cursor = conn.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(ns)).sort(ChunkType::min()));
- allChunkMinimums.insert(chunk->getMin().getOwned());
- OwnedPointerVector<ChunkType>*& chunkList =
- shardToChunksMap.mutableMap()[chunk->getShard()];
+ set<BSONObj> allChunkMinimums;
- if (chunkList == NULL) {
- chunkList = new OwnedPointerVector<ChunkType>();
- }
+ while (cursor->more()) {
+ BSONObj chunkDoc = cursor->nextSafe().getOwned();
- chunkList->mutableVector().push_back(chunk.release());
+ auto_ptr<ChunkType> chunk(new ChunkType());
+ string errmsg;
+ if (!chunk->parseBSON(chunkDoc, &errmsg)) {
+ error() << "bad chunk format for " << chunkDoc << ": " << errmsg << endl;
+ return;
}
- cursor.reset();
- if (shardToChunksMap.map().empty()) {
- LOG(1) << "skipping empty collection (" << ns << ")";
- continue;
+ allChunkMinimums.insert(chunk->getMin().getOwned());
+ OwnedPointerVector<ChunkType>*& chunkList =
+ shardToChunksMap.mutableMap()[chunk->getShard()];
+
+ if (chunkList == NULL) {
+ chunkList = new OwnedPointerVector<ChunkType>();
}
- for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
- // this just makes sure there is an entry in shardToChunksMap for every shard
- OwnedPointerVector<ChunkType>*& chunkList =
- shardToChunksMap.mutableMap()[i->first];
+ chunkList->mutableVector().push_back(chunk.release());
+ }
+ cursor.reset();
- if (chunkList == NULL) {
- chunkList = new OwnedPointerVector<ChunkType>();
- }
- }
+ if (shardToChunksMap.map().empty()) {
+ LOG(1) << "skipping empty collection (" << ns << ")";
+ continue;
+ }
- DistributionStatus status(shardInfo, shardToChunksMap.map());
+ for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
+ // this just makes sure there is an entry in shardToChunksMap for every shard
+ OwnedPointerVector<ChunkType>*& chunkList = shardToChunksMap.mutableMap()[i->first];
- // load tags
- cursor = conn.query(TagsType::ConfigNS,
- QUERY(TagsType::ns(ns)).sort(TagsType::min()));
+ if (chunkList == NULL) {
+ chunkList = new OwnedPointerVector<ChunkType>();
+ }
+ }
- vector<TagRange> ranges;
+ DistributionStatus status(shardInfo, shardToChunksMap.map());
- while ( cursor->more() ) {
- BSONObj tag = cursor->nextSafe();
- TagRange tr(tag[TagsType::min()].Obj().getOwned(),
- tag[TagsType::max()].Obj().getOwned(),
- tag[TagsType::tag()].String());
- ranges.push_back(tr);
- uassert(16356,
- str::stream() << "tag ranges not valid for: " << ns,
- status.addTagRange(tr) );
+ // load tags
+ cursor = conn.query(TagsType::ConfigNS, QUERY(TagsType::ns(ns)).sort(TagsType::min()));
- }
- cursor.reset();
+ vector<TagRange> ranges;
- DBConfigPtr cfg = grid.getDBConfig( ns );
- if ( !cfg ) {
- warning() << "could not load db config to balance " << ns << " collection" << endl;
- continue;
- }
+ while (cursor->more()) {
+ BSONObj tag = cursor->nextSafe();
+ TagRange tr(tag[TagsType::min()].Obj().getOwned(),
+ tag[TagsType::max()].Obj().getOwned(),
+ tag[TagsType::tag()].String());
+ ranges.push_back(tr);
+ uassert(
+ 16356, str::stream() << "tag ranges not valid for: " << ns, status.addTagRange(tr));
+ }
+ cursor.reset();
- // This line reloads the chunk manager once if this process doesn't know the collection
- // is sharded yet.
- ChunkManagerPtr cm = cfg->getChunkManagerIfExists( ns, true );
- if ( !cm ) {
- warning() << "could not load chunks to balance " << ns << " collection" << endl;
- continue;
- }
+ DBConfigPtr cfg = grid.getDBConfig(ns);
+ if (!cfg) {
+ warning() << "could not load db config to balance " << ns << " collection" << endl;
+ continue;
+ }
- // loop through tags to make sure no chunk spans tags; splits on tag min. for all chunks
- bool didAnySplits = false;
- for ( unsigned i = 0; i < ranges.size(); i++ ) {
- BSONObj min = ranges[i].min;
+ // This line reloads the chunk manager once if this process doesn't know the collection
+ // is sharded yet.
+ ChunkManagerPtr cm = cfg->getChunkManagerIfExists(ns, true);
+ if (!cm) {
+ warning() << "could not load chunks to balance " << ns << " collection" << endl;
+ continue;
+ }
- min = cm->getShardKeyPattern().getKeyPattern().extendRangeBound( min, false );
+ // loop through tags to make sure no chunk spans tags; splits on tag min. for all chunks
+ bool didAnySplits = false;
+ for (unsigned i = 0; i < ranges.size(); i++) {
+ BSONObj min = ranges[i].min;
- if ( allChunkMinimums.count( min ) > 0 )
- continue;
+ min = cm->getShardKeyPattern().getKeyPattern().extendRangeBound(min, false);
- didAnySplits = true;
+ if (allChunkMinimums.count(min) > 0)
+ continue;
- log() << "ns: " << ns << " need to split on "
- << min << " because there is a range there" << endl;
+ didAnySplits = true;
- ChunkPtr c = cm->findIntersectingChunk( min );
+ log() << "ns: " << ns << " need to split on " << min
+ << " because there is a range there" << endl;
- vector<BSONObj> splitPoints;
- splitPoints.push_back( min );
+ ChunkPtr c = cm->findIntersectingChunk(min);
- Status status = c->multiSplit(splitPoints, NULL);
- if ( !status.isOK() ) {
- error() << "split failed: " << status << endl;
- }
- else {
- LOG(1) << "split worked" << endl;
- }
- break;
- }
+ vector<BSONObj> splitPoints;
+ splitPoints.push_back(min);
- if ( didAnySplits ) {
- // state change, just wait till next round
- continue;
+ Status status = c->multiSplit(splitPoints, NULL);
+ if (!status.isOK()) {
+ error() << "split failed: " << status << endl;
+ } else {
+ LOG(1) << "split worked" << endl;
}
+ break;
+ }
- CandidateChunk* p = _policy->balance( ns, status, _balancedLastTime );
- if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );
+ if (didAnySplits) {
+ // state change, just wait till next round
+ continue;
}
- }
- bool Balancer::_init() {
- try {
+ CandidateChunk* p = _policy->balance(ns, status, _balancedLastTime);
+ if (p)
+ candidateChunks->push_back(CandidateChunkPtr(p));
+ }
+}
- log() << "about to contact config servers and shards" << endl;
+bool Balancer::_init() {
+ try {
+ log() << "about to contact config servers and shards" << endl;
- // contact the config server and refresh shard information
- // checks that each shard is indeed a different process (no hostname mixup)
- // these checks are redundant in that they're redone at every new round but we want to do them initially here
- // so to catch any problem soon
- Shard::reloadShardInfo();
- _checkOIDs();
+ // contact the config server and refresh shard information
+ // checks that each shard is indeed a different process (no hostname mixup)
+ // these checks are redundant in that they're redone at every new round but we want to do them initially here
+ // so to catch any problem soon
+ Shard::reloadShardInfo();
+ _checkOIDs();
- log() << "config servers and shards contacted successfully" << endl;
+ log() << "config servers and shards contacted successfully" << endl;
- StringBuilder buf;
- buf << getHostNameCached() << ":" << serverGlobalParams.port;
- _myid = buf.str();
- _started = time(0);
+ StringBuilder buf;
+ buf << getHostNameCached() << ":" << serverGlobalParams.port;
+ _myid = buf.str();
+ _started = time(0);
- log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started) << endl;
+ log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started)
+ << endl;
- return true;
+ return true;
+ } catch (std::exception& e) {
+ warning() << "could not initialize balancer, please check that all shards and config "
+ "servers are up: " << e.what() << endl;
+ return false;
+ }
+}
+
+void Balancer::run() {
+ // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely
+ while (!inShutdown()) {
+ if (!_init()) {
+ log() << "will retry to initialize balancer in one minute" << endl;
+ sleepsecs(60);
+ continue;
}
- catch ( std::exception& e ) {
- warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what() << endl;
- return false;
- }
+ break;
}
- void Balancer::run() {
+ int sleepTime = 10;
- // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely
- while ( ! inShutdown() ) {
+ // getConnectioString and dist lock constructor does not throw, which is what we expect on while
+ // on the balancer thread
+ ConnectionString config = configServer.getConnectionString();
+ DistributedLock balanceLock(config, "balancer");
- if ( ! _init() ) {
- log() << "will retry to initialize balancer in one minute" << endl;
- sleepsecs( 60 );
- continue;
- }
-
- break;
- }
-
- int sleepTime = 10;
+ while (!inShutdown()) {
+ Timer balanceRoundTimer;
+ ActionLogType actionLog;
- // getConnectioString and dist lock constructor does not throw, which is what we expect on while
- // on the balancer thread
- ConnectionString config = configServer.getConnectionString();
- DistributedLock balanceLock( config , "balancer" );
+ actionLog.setServer(getHostNameCached());
+ actionLog.setWhat("balancer.round");
- while ( ! inShutdown() ) {
+ try {
+ ScopedDbConnection conn(config.toString(), 30);
- Timer balanceRoundTimer;
- ActionLogType actionLog;
+ // ping has to be first so we keep things in the config server in sync
+ _ping();
- actionLog.setServer(getHostNameCached());
- actionLog.setWhat("balancer.round");
+ BSONObj balancerResult;
- try {
+ // use fresh shard state
+ Shard::reloadShardInfo();
- ScopedDbConnection conn(config.toString(), 30);
+ // refresh chunk size (even though another balancer might be active)
+ Chunk::refreshChunkSize();
- // ping has to be first so we keep things in the config server in sync
- _ping();
+ SettingsType balancerConfig;
+ string errMsg;
- BSONObj balancerResult;
+ if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) {
+ warning() << errMsg;
+ return;
+ }
- // use fresh shard state
- Shard::reloadShardInfo();
+ // now make sure we should even be running
+ if ((balancerConfig.isKeySet() && // balancer config doc exists
+ !grid.shouldBalance(balancerConfig)) ||
+ MONGO_FAIL_POINT(skipBalanceRound)) {
+ LOG(1) << "skipping balancing round because balancing is disabled" << endl;
- // refresh chunk size (even though another balancer might be active)
- Chunk::refreshChunkSize();
+ // Ping again so scripts can determine if we're active without waiting
+ _ping(true);
- SettingsType balancerConfig;
- string errMsg;
+ conn.done();
- if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) {
- warning() << errMsg;
- return ;
- }
+ sleepsecs(sleepTime);
+ continue;
+ }
- // now make sure we should even be running
- if ((balancerConfig.isKeySet() && // balancer config doc exists
- !grid.shouldBalance(balancerConfig)) ||
- MONGO_FAIL_POINT(skipBalanceRound)) {
+ uassert(13258, "oids broken after resetting!", _checkOIDs());
- LOG(1) << "skipping balancing round because balancing is disabled" << endl;
+ {
+ dist_lock_try lk(&balanceLock, "doing balance round");
+ if (!lk.got()) {
+ LOG(1) << "skipping balancing round because another balancer is active" << endl;
// Ping again so scripts can determine if we're active without waiting
- _ping( true );
+ _ping(true);
conn.done();
- sleepsecs( sleepTime );
+ sleepsecs(sleepTime); // no need to wake up soon
continue;
}
- uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
-
- {
- dist_lock_try lk( &balanceLock , "doing balance round" );
- if ( ! lk.got() ) {
- LOG(1) << "skipping balancing round because another balancer is active" << endl;
-
- // Ping again so scripts can determine if we're active without waiting
- _ping( true );
-
- conn.done();
-
- sleepsecs( sleepTime ); // no need to wake up soon
- continue;
- }
+ if (!isConfigServerConsistent()) {
+ conn.done();
+ warning() << "Skipping balancing round because data inconsistency"
+ << " was detected amongst the config servers." << endl;
+ sleepsecs(sleepTime);
+ continue;
+ }
- if ( !isConfigServerConsistent() ) {
- conn.done();
- warning() << "Skipping balancing round because data inconsistency"
- << " was detected amongst the config servers." << endl;
- sleepsecs( sleepTime );
- continue;
+ const bool waitForDelete =
+ (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete()
+ : false);
+
+ scoped_ptr<WriteConcernOptions> writeConcern;
+ if (balancerConfig.isKeySet()) { // if balancer doc exists.
+ StatusWith<WriteConcernOptions*> extractStatus =
+ balancerConfig.extractWriteConcern();
+ if (extractStatus.isOK()) {
+ writeConcern.reset(extractStatus.getValue());
+ } else {
+ warning() << extractStatus.toString();
}
+ }
- const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ?
- balancerConfig.getWaitForDelete() : false);
-
- scoped_ptr<WriteConcernOptions> writeConcern;
- if (balancerConfig.isKeySet()) { // if balancer doc exists.
- StatusWith<WriteConcernOptions*> extractStatus =
- balancerConfig.extractWriteConcern();
- if (extractStatus.isOK()) {
- writeConcern.reset(extractStatus.getValue());
- }
- else {
- warning() << extractStatus.toString();
- }
- }
+ LOG(1) << "*** start balancing round. "
+ << "waitForDelete: " << waitForDelete << ", secondaryThrottle: "
+ << (writeConcern.get() ? writeConcern->toBSON().toString() : "default")
+ << endl;
+
+ vector<CandidateChunkPtr> candidateChunks;
+ _doBalanceRound(conn.conn(), &candidateChunks);
+ if (candidateChunks.size() == 0) {
+ LOG(1) << "no need to move any chunk" << endl;
+ _balancedLastTime = 0;
+ } else {
+ _balancedLastTime =
+ _moveChunks(&candidateChunks, writeConcern.get(), waitForDelete);
+ }
- LOG(1) << "*** start balancing round. "
- << "waitForDelete: " << waitForDelete
- << ", secondaryThrottle: "
- << (writeConcern.get() ? writeConcern->toBSON().toString() : "default")
- << endl;
-
- vector<CandidateChunkPtr> candidateChunks;
- _doBalanceRound( conn.conn() , &candidateChunks );
- if ( candidateChunks.size() == 0 ) {
- LOG(1) << "no need to move any chunk" << endl;
- _balancedLastTime = 0;
- }
- else {
- _balancedLastTime = _moveChunks(&candidateChunks,
- writeConcern.get(),
- waitForDelete );
- }
+ actionLog.setDetails(_buildDetails(false,
+ balanceRoundTimer.millis(),
+ static_cast<int>(candidateChunks.size()),
+ _balancedLastTime,
+ ""));
- actionLog.setDetails( _buildDetails( false, balanceRoundTimer.millis(),
- static_cast<int>(candidateChunks.size()), _balancedLastTime, "") );
+ _reportRound(actionLog);
- _reportRound( actionLog );
+ LOG(1) << "*** end of balancing round" << endl;
+ }
- LOG(1) << "*** end of balancing round" << endl;
- }
+ // Ping again so scripts can determine if we're active without waiting
+ _ping(true);
- // Ping again so scripts can determine if we're active without waiting
- _ping( true );
-
- conn.done();
+ conn.done();
- sleepsecs( _balancedLastTime ? sleepTime / 10 : sleepTime );
- }
- catch ( std::exception& e ) {
- log() << "caught exception while doing balance: " << e.what() << endl;
+ sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime);
+ } catch (std::exception& e) {
+ log() << "caught exception while doing balance: " << e.what() << endl;
- // Just to match the opening statement if in log level 1
- LOG(1) << "*** End of balancing round" << endl;
+ // Just to match the opening statement if in log level 1
+ LOG(1) << "*** End of balancing round" << endl;
- // This round failed, tell the world!
- actionLog.setDetails( _buildDetails( true, balanceRoundTimer.millis(),
- 0, 0, e.what()) );
+ // This round failed, tell the world!
+ actionLog.setDetails(_buildDetails(true, balanceRoundTimer.millis(), 0, 0, e.what()));
- _reportRound( actionLog );
+ _reportRound(actionLog);
- sleepsecs( sleepTime ); // sleep a fair amount b/c of error
- continue;
- }
+ sleepsecs(sleepTime); // sleep a fair amount b/c of error
+ continue;
}
-
}
+}
} // namespace mongo