summaryrefslogtreecommitdiff
path: root/src/mongo/s/config.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/config.cpp')
-rw-r--r--src/mongo/s/config.cpp879
1 files changed, 879 insertions, 0 deletions
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
new file mode 100644
index 00000000000..b4923b56a1f
--- /dev/null
+++ b/src/mongo/s/config.cpp
@@ -0,0 +1,879 @@
+// config.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../util/net/message.h"
+#include "../util/stringutils.h"
+#include "../util/unittest.h"
+#include "../client/connpool.h"
+#include "../client/model.h"
+#include "../db/pdfile.h"
+#include "../db/cmdline.h"
+
+#include "chunk.h"
+#include "config.h"
+#include "grid.h"
+#include "server.h"
+
+namespace mongo {
+
+ int ConfigServer::VERSION = 3;
+ Shard Shard::EMPTY;
+
+ string ShardNS::shard = "config.shards";
+ string ShardNS::database = "config.databases";
+ string ShardNS::collection = "config.collections";
+ string ShardNS::chunk = "config.chunks";
+
+ string ShardNS::mongos = "config.mongos";
+ string ShardNS::settings = "config.settings";
+
+ BSONField<bool> ShardFields::draining("draining");
+ BSONField<long long> ShardFields::maxSize ("maxSize");
+
+ OID serverID;
+
+ /* --- DBConfig --- */
+
+ DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) {
+ _dirty = false;
+ _dropped = in["dropped"].trueValue();
+ if ( in["key"].isABSONObj() ) {
+ _key = in["key"].Obj().getOwned();
+ _unqiue = in["unique"].trueValue();
+ shard( in["_id"].String() , _key , _unqiue );
+ }
+ _dirty = false;
+ }
+
+
+ void DBConfig::CollectionInfo::shard( const string& ns , const ShardKeyPattern& key , bool unique ) {
+ _cm.reset( new ChunkManager( ns , key , unique ) );
+ _key = key.key().getOwned();
+ _unqiue = unique;
+ _dirty = true;
+ _dropped = false;
+ }
+
+ void DBConfig::CollectionInfo::unshard() {
+ _cm.reset();
+ _dropped = true;
+ _dirty = true;
+ _key = BSONObj();
+ }
+
+ void DBConfig::CollectionInfo::save( const string& ns , DBClientBase* conn ) {
+ BSONObj key = BSON( "_id" << ns );
+
+ BSONObjBuilder val;
+ val.append( "_id" , ns );
+ val.appendDate( "lastmod" , time(0) );
+ val.appendBool( "dropped" , _dropped );
+ if ( _cm )
+ _cm->getInfo( val );
+
+ conn->update( ShardNS::collection , key , val.obj() , true );
+ string err = conn->getLastError();
+ uassert( 13473 , (string)"failed to save collection (" + ns + "): " + err , err.size() == 0 );
+
+ _dirty = false;
+ }
+
+ bool DBConfig::isSharded( const string& ns ) {
+ if ( ! _shardingEnabled )
+ return false;
+ scoped_lock lk( _lock );
+ return _isSharded( ns );
+ }
+
+ bool DBConfig::_isSharded( const string& ns ) {
+ if ( ! _shardingEnabled )
+ return false;
+ Collections::iterator i = _collections.find( ns );
+ if ( i == _collections.end() )
+ return false;
+ return i->second.isSharded();
+ }
+
+ ShardPtr DBConfig::getShardIfExists( const string& ns ){
+ try{
+ // TODO: this function assumes the _primary will not change under-the-covers, but so does
+ // getShard() in general
+ return ShardPtr( new Shard( getShard( ns ) ) );
+ }
+ catch( AssertionException& e ){
+ warning() << "primary not found for " << ns << causedBy( e ) << endl;
+ return ShardPtr();
+ }
+ }
+
+ const Shard& DBConfig::getShard( const string& ns ) {
+ if ( isSharded( ns ) )
+ return Shard::EMPTY;
+
+ uassert( 10178 , "no primary!" , _primary.ok() );
+ return _primary;
+ }
+
+ void DBConfig::enableSharding() {
+ if ( _shardingEnabled )
+ return;
+
+ assert( _name != "config" );
+
+ scoped_lock lk( _lock );
+ _shardingEnabled = true;
+ _save();
+ }
+
+ /**
+ *
+ */
+ ChunkManagerPtr DBConfig::shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique , vector<BSONObj>* initPoints, vector<Shard>* initShards ) {
+ uassert( 8042 , "db doesn't have sharding enabled" , _shardingEnabled );
+ uassert( 13648 , str::stream() << "can't shard collection because not all config servers are up" , configServer.allUp() );
+
+
+ {
+ scoped_lock lk( _lock );
+
+ CollectionInfo& ci = _collections[ns];
+ uassert( 8043 , "collection already sharded" , ! ci.isSharded() );
+
+ log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl;
+
+ ci.shard( ns , fieldsAndOrder , unique );
+ ChunkManagerPtr cm = ci.getCM();
+ uassert( 13449 , "collections already sharded" , (cm->numChunks() == 0) );
+
+ cm->createFirstChunks( getPrimary() , initPoints, initShards );
+ _save();
+ }
+
+ ChunkManagerPtr manager = getChunkManager(ns,true,true);
+
+ // Tell the primary mongod to refresh it's data
+ // TODO: Think the real fix here is for mongos to just assume all collections sharded, when we get there
+ for( int i = 0; i < 4; i++ ){
+ if( i == 3 ){
+ warning() << "too many tries updating initial version of " << ns << " on shard primary " << getPrimary() <<
+ ", other mongoses may not see the collection as sharded immediately" << endl;
+ break;
+ }
+ try {
+ ShardConnection conn( getPrimary(), ns );
+ conn.setVersion();
+ conn.done();
+ break;
+ }
+ catch( DBException& e ){
+ warning() << "could not update initial version of " << ns << " on shard primary " << getPrimary() <<
+ causedBy( e ) << endl;
+ }
+ sleepsecs( i );
+ }
+
+ return manager;
+ }
+
+ bool DBConfig::removeSharding( const string& ns ) {
+ if ( ! _shardingEnabled ) {
+ return false;
+ }
+
+ scoped_lock lk( _lock );
+
+ Collections::iterator i = _collections.find( ns );
+
+ if ( i == _collections.end() )
+ return false;
+
+ CollectionInfo& ci = _collections[ns];
+ if ( ! ci.isSharded() )
+ return false;
+
+ ci.unshard();
+ _save( false, true );
+ return true;
+ }
+
+ ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload, bool forceReload ){
+ try{
+ return getChunkManager( ns, shouldReload, forceReload );
+ }
+ catch( AssertionException& e ){
+ warning() << "chunk manager not found for " << ns << causedBy( e ) << endl;
+ return ChunkManagerPtr();
+ }
+ }
+
+ ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload, bool forceReload ) {
+ BSONObj key;
+ bool unique;
+ ShardChunkVersion oldVersion;
+
+ {
+ scoped_lock lk( _lock );
+
+ CollectionInfo& ci = _collections[ns];
+
+ bool earlyReload = ! ci.isSharded() && ( shouldReload || forceReload );
+ if ( earlyReload ) {
+ // this is to catch cases where there this is a new sharded collection
+ _reload();
+ ci = _collections[ns];
+ }
+ massert( 10181 , (string)"not sharded:" + ns , ci.isSharded() );
+ assert( ! ci.key().isEmpty() );
+
+ if ( ! ( shouldReload || forceReload ) || earlyReload )
+ return ci.getCM();
+
+ key = ci.key().copy();
+ unique = ci.unique();
+ if ( ci.getCM() )
+ oldVersion = ci.getCM()->getVersion();
+ }
+
+ assert( ! key.isEmpty() );
+
+ BSONObj newest;
+ if ( oldVersion > 0 && ! forceReload ) {
+ ScopedDbConnection conn( configServer.modelServer() , 30.0 );
+ newest = conn->findOne( ShardNS::chunk ,
+ Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) );
+ conn.done();
+
+ if ( ! newest.isEmpty() ) {
+ ShardChunkVersion v = newest["lastmod"];
+ if ( v == oldVersion ) {
+ scoped_lock lk( _lock );
+ CollectionInfo& ci = _collections[ns];
+ massert( 15885 , str::stream() << "not sharded after reloading from chunks : " << ns , ci.isSharded() );
+ return ci.getCM();
+ }
+ }
+
+ }
+ else if( oldVersion == 0 ){
+ warning() << "version 0 found when " << ( forceReload ? "reloading" : "checking" ) << " chunk manager"
+ << ", collection '" << ns << "' initially detected as sharded" << endl;
+ }
+
+ // we are not locked now, and want to load a new ChunkManager
+
+ auto_ptr<ChunkManager> temp;
+
+ {
+ scoped_lock lll ( _hitConfigServerLock );
+
+ if ( ! newest.isEmpty() && ! forceReload ) {
+ // if we have a target we're going for
+ // see if we've hit already
+
+ scoped_lock lk( _lock );
+ CollectionInfo& ci = _collections[ns];
+ if ( ci.isSharded() && ci.getCM() ) {
+ ShardChunkVersion currentVersion = newest["lastmod"];
+ if ( currentVersion == ci.getCM()->getVersion() ) {
+ return ci.getCM();
+ }
+ }
+
+ }
+
+ temp.reset( new ChunkManager( ns , key , unique ) );
+ if ( temp->numChunks() == 0 ) {
+ // maybe we're not sharded any more
+ reload(); // this is a full reload
+ return getChunkManager( ns , false );
+ }
+ }
+
+ scoped_lock lk( _lock );
+
+ CollectionInfo& ci = _collections[ns];
+ massert( 14822 , (string)"state changed in the middle: " + ns , ci.isSharded() );
+
+ bool forced = false;
+ if ( temp->getVersion() > ci.getCM()->getVersion() ||
+ (forced = (temp->getVersion() == ci.getCM()->getVersion() && forceReload ) ) ) {
+
+ if( forced ){
+ warning() << "chunk manager reload forced for collection '" << ns << "', config version is " << temp->getVersion() << endl;
+ }
+
+ // we only want to reset if we're newer or equal and forced
+ // otherwise we go into a bad cycle
+ ci.resetCM( temp.release() );
+ }
+
+ massert( 15883 , str::stream() << "not sharded after chunk manager reset : " << ns , ci.isSharded() );
+ return ci.getCM();
+ }
+
+ void DBConfig::setPrimary( string s ) {
+ scoped_lock lk( _lock );
+ _primary.reset( s );
+ _save();
+ }
+
+ void DBConfig::serialize(BSONObjBuilder& to) {
+ to.append("_id", _name);
+ to.appendBool("partitioned", _shardingEnabled );
+ to.append("primary", _primary.getName() );
+ }
+
+ void DBConfig::unserialize(const BSONObj& from) {
+ LOG(1) << "DBConfig unserialize: " << _name << " " << from << endl;
+ assert( _name == from["_id"].String() );
+
+ _shardingEnabled = from.getBoolField("partitioned");
+ _primary.reset( from.getStringField("primary") );
+
+ // In the 1.5.x series, we used to have collection metadata nested in the database entry. The 1.6.x series
+ // had migration code that ported that info to where it belongs now: the 'collections' collection. We now
+ // just assert that we're not migrating from a 1.5.x directly into a 1.7.x without first converting.
+ BSONObj sharded = from.getObjectField( "sharded" );
+ if ( ! sharded.isEmpty() )
+ uasserted( 13509 , "can't migrate from 1.5.x release to the current one; need to upgrade to 1.6.x first");
+ }
+
+ bool DBConfig::load() {
+ scoped_lock lk( _lock );
+ return _load();
+ }
+
+ bool DBConfig::_load() {
+ ScopedDbConnection conn( configServer.modelServer(), 30.0 );
+
+ BSONObj o = conn->findOne( ShardNS::database , BSON( "_id" << _name ) );
+
+ if ( o.isEmpty() ) {
+ conn.done();
+ return false;
+ }
+
+ unserialize( o );
+
+ BSONObjBuilder b;
+ b.appendRegex( "_id" , (string)"^" + _name + "\\." );
+
+ auto_ptr<DBClientCursor> cursor = conn->query( ShardNS::collection ,b.obj() );
+ assert( cursor.get() );
+ while ( cursor->more() ) {
+ BSONObj o = cursor->next();
+ if( o["dropped"].trueValue() ) _collections.erase( o["_id"].String() );
+ else _collections[o["_id"].String()] = CollectionInfo( o );
+ }
+
+ conn.done();
+
+ return true;
+ }
+
+ void DBConfig::_save( bool db, bool coll ) {
+ ScopedDbConnection conn( configServer.modelServer(), 30.0 );
+
+ if( db ){
+
+ BSONObj n;
+ {
+ BSONObjBuilder b;
+ serialize(b);
+ n = b.obj();
+ }
+
+ conn->update( ShardNS::database , BSON( "_id" << _name ) , n , true );
+ string err = conn->getLastError();
+ uassert( 13396 , (string)"DBConfig save failed: " + err , err.size() == 0 );
+
+ }
+
+ if( coll ){
+
+ for ( Collections::iterator i=_collections.begin(); i!=_collections.end(); ++i ) {
+ if ( ! i->second.isDirty() )
+ continue;
+ i->second.save( i->first , conn.get() );
+ }
+
+ }
+
+ conn.done();
+ }
+
+ bool DBConfig::reload() {
+ scoped_lock lk( _lock );
+ return _reload();
+ }
+
+ bool DBConfig::_reload() {
+ // TODO: i don't think is 100% correct
+ return _load();
+ }
+
+ bool DBConfig::dropDatabase( string& errmsg ) {
+ /**
+ * 1) make sure everything is up
+ * 2) update config server
+ * 3) drop and reset sharded collections
+ * 4) drop and reset primary
+ * 5) drop everywhere to clean up loose ends
+ */
+
+ log() << "DBConfig::dropDatabase: " << _name << endl;
+ configServer.logChange( "dropDatabase.start" , _name , BSONObj() );
+
+ // 1
+ if ( ! configServer.allUp( errmsg ) ) {
+ LOG(1) << "\t DBConfig::dropDatabase not all up" << endl;
+ return 0;
+ }
+
+ // 2
+ grid.removeDB( _name );
+ {
+ ScopedDbConnection conn( configServer.modelServer(), 30.0 );
+ conn->remove( ShardNS::database , BSON( "_id" << _name ) );
+ errmsg = conn->getLastError();
+ if ( ! errmsg.empty() ) {
+ log() << "could not drop '" << _name << "': " << errmsg << endl;
+ conn.done();
+ return false;
+ }
+
+ conn.done();
+ }
+
+ if ( ! configServer.allUp( errmsg ) ) {
+ log() << "error removing from config server even after checking!" << endl;
+ return 0;
+ }
+ LOG(1) << "\t removed entry from config server for: " << _name << endl;
+
+ set<Shard> allServers;
+
+ // 3
+ while ( true ) {
+ int num = 0;
+ if ( ! _dropShardedCollections( num , allServers , errmsg ) )
+ return 0;
+ log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num << endl;
+ if ( num == 0 )
+ break;
+ }
+
+ // 4
+ {
+ ScopedDbConnection conn( _primary, 30.0 );
+ BSONObj res;
+ if ( ! conn->dropDatabase( _name , &res ) ) {
+ errmsg = res.toString();
+ return 0;
+ }
+ conn.done();
+ }
+
+ // 5
+ for ( set<Shard>::iterator i=allServers.begin(); i!=allServers.end(); i++ ) {
+ ScopedDbConnection conn( *i, 30.0 );
+ BSONObj res;
+ if ( ! conn->dropDatabase( _name , &res ) ) {
+ errmsg = res.toString();
+ return 0;
+ }
+ conn.done();
+ }
+
+ LOG(1) << "\t dropped primary db for: " << _name << endl;
+
+ configServer.logChange( "dropDatabase" , _name , BSONObj() );
+ return true;
+ }
+
+ bool DBConfig::_dropShardedCollections( int& num, set<Shard>& allServers , string& errmsg ) {
+ num = 0;
+ set<string> seen;
+ while ( true ) {
+ Collections::iterator i = _collections.begin();
+ for ( ; i != _collections.end(); ++i ) {
+ // log() << "coll : " << i->first << " and " << i->second.isSharded() << endl;
+ if ( i->second.isSharded() )
+ break;
+ }
+
+ if ( i == _collections.end() )
+ break;
+
+ if ( seen.count( i->first ) ) {
+ errmsg = "seen a collection twice!";
+ return false;
+ }
+
+ seen.insert( i->first );
+ LOG(1) << "\t dropping sharded collection: " << i->first << endl;
+
+ i->second.getCM()->getAllShards( allServers );
+ i->second.getCM()->drop( i->second.getCM() );
+ uassert( 10176 , str::stream() << "shard state missing for " << i->first , removeSharding( i->first ) );
+
+ num++;
+ uassert( 10184 , "_dropShardedCollections too many collections - bailing" , num < 100000 );
+ LOG(2) << "\t\t dropped " << num << " so far" << endl;
+ }
+
+ return true;
+ }
+
+ void DBConfig::getAllShards(set<Shard>& shards) const {
+ scoped_lock lk( _lock );
+ shards.insert(getPrimary());
+ for (Collections::const_iterator it(_collections.begin()), end(_collections.end()); it != end; ++it) {
+ if (it->second.isSharded()) {
+ it->second.getCM()->getAllShards(shards);
+ } // TODO: handle collections on non-primary shard
+ }
+ }
+
+ /* --- ConfigServer ---- */
+
+ ConfigServer::ConfigServer() : DBConfig( "config" ) {
+ _shardingEnabled = false;
+ }
+
+ ConfigServer::~ConfigServer() {
+ }
+
+ bool ConfigServer::init( string s ) {
+ vector<string> configdbs;
+ splitStringDelim( s, &configdbs, ',' );
+ return init( configdbs );
+ }
+
+ bool ConfigServer::init( vector<string> configHosts ) {
+
+ uassert( 10187 , "need configdbs" , configHosts.size() );
+
+ string hn = getHostName();
+ if ( hn.empty() ) {
+ sleepsecs(5);
+ dbexit( EXIT_BADOPTIONS );
+ }
+
+ set<string> hosts;
+ for ( size_t i=0; i<configHosts.size(); i++ ) {
+ string host = configHosts[i];
+ hosts.insert( getHost( host , false ) );
+ configHosts[i] = getHost( host , true );
+ }
+
+ for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ) {
+ string host = *i;
+ bool ok = false;
+ for ( int x=10; x>0; x-- ) {
+ if ( ! hostbyname( host.c_str() ).empty() ) {
+ ok = true;
+ break;
+ }
+ log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x << " more times" << endl;
+ sleepsecs( 10 );
+ }
+ if ( ! ok )
+ return false;
+ }
+
+ _config = configHosts;
+
+ string fullString;
+ joinStringDelim( configHosts, &fullString, ',' );
+ _primary.setAddress( ConnectionString( fullString , ConnectionString::SYNC ) );
+ LOG(1) << " config string : " << fullString << endl;
+
+ return true;
+ }
+
+ bool ConfigServer::checkConfigServersConsistent( string& errmsg , int tries ) const {
+ if ( tries <= 0 )
+ return false;
+
+ unsigned firstGood = 0;
+ int up = 0;
+ vector<BSONObj> res;
+ for ( unsigned i=0; i<_config.size(); i++ ) {
+ BSONObj x;
+ try {
+ ScopedDbConnection conn( _config[i], 30.0 );
+
+ // check auth
+ conn->update("config.foo.bar", BSONObj(), BSON("x" << 1));
+ conn->simpleCommand( "admin", &x, "getlasterror");
+ if (x["err"].type() == String && x["err"].String() == "unauthorized") {
+ errmsg = "not authorized, did you start with --keyFile?";
+ return false;
+ }
+
+ if ( ! conn->simpleCommand( "config" , &x , "dbhash" ) )
+ x = BSONObj();
+ else {
+ x = x.getOwned();
+ if ( up == 0 )
+ firstGood = i;
+ up++;
+ }
+ conn.done();
+ }
+ catch ( SocketException& e ) {
+ warning() << " couldn't check on config server:" << _config[i] << " ok for now : " << e.toString() << endl;
+ }
+ res.push_back(x);
+ }
+
+ if ( _config.size() == 1 )
+ return true;
+
+ if ( up == 0 ) {
+ errmsg = "no config servers reachable";
+ return false;
+ }
+
+ if ( up == 1 ) {
+ log( LL_WARNING ) << "only 1 config server reachable, continuing" << endl;
+ return true;
+ }
+
+ BSONObj base = res[firstGood];
+ for ( unsigned i=firstGood+1; i<res.size(); i++ ) {
+ if ( res[i].isEmpty() )
+ continue;
+
+ string c1 = base.getFieldDotted( "collections.chunks" );
+ string c2 = res[i].getFieldDotted( "collections.chunks" );
+
+ string d1 = base.getFieldDotted( "collections.databases" );
+ string d2 = res[i].getFieldDotted( "collections.databases" );
+
+ if ( c1 == c2 && d1 == d2 )
+ continue;
+
+ stringstream ss;
+ ss << "config servers " << _config[firstGood] << " and " << _config[i] << " differ";
+ log( LL_WARNING ) << ss.str();
+ if ( tries <= 1 ) {
+ ss << "\n" << c1 << "\t" << c2 << "\n" << d1 << "\t" << d2;
+ errmsg = ss.str();
+ return false;
+ }
+
+ return checkConfigServersConsistent( errmsg , tries - 1 );
+ }
+
+ return true;
+ }
+
+ bool ConfigServer::ok( bool checkConsistency ) {
+ if ( ! _primary.ok() )
+ return false;
+
+ if ( checkConsistency ) {
+ string errmsg;
+ if ( ! checkConfigServersConsistent( errmsg ) ) {
+ log( LL_ERROR ) << "config servers not in sync! " << errmsg << warnings;
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ bool ConfigServer::allUp() {
+ string errmsg;
+ return allUp( errmsg );
+ }
+
+ bool ConfigServer::allUp( string& errmsg ) {
+ try {
+ ScopedDbConnection conn( _primary, 30.0 );
+ conn->getLastError();
+ conn.done();
+ return true;
+ }
+ catch ( DBException& ) {
+ log() << "ConfigServer::allUp : " << _primary.toString() << " seems down!" << endl;
+ errmsg = _primary.toString() + " seems down";
+ return false;
+ }
+
+ }
+
+ int ConfigServer::dbConfigVersion() {
+ ScopedDbConnection conn( _primary, 30.0 );
+ int version = dbConfigVersion( conn.conn() );
+ conn.done();
+ return version;
+ }
+
+ int ConfigServer::dbConfigVersion( DBClientBase& conn ) {
+ auto_ptr<DBClientCursor> c = conn.query( "config.version" , BSONObj() );
+ int version = 0;
+ if ( c->more() ) {
+ BSONObj o = c->next();
+ version = o["version"].numberInt();
+ uassert( 10189 , "should only have 1 thing in config.version" , ! c->more() );
+ }
+ else {
+ if ( conn.count( ShardNS::shard ) || conn.count( ShardNS::database ) ) {
+ version = 1;
+ }
+ }
+
+ return version;
+ }
+
+ void ConfigServer::reloadSettings() {
+ set<string> got;
+
+ ScopedDbConnection conn( _primary, 30.0 );
+ auto_ptr<DBClientCursor> c = conn->query( ShardNS::settings , BSONObj() );
+ assert( c.get() );
+ while ( c->more() ) {
+ BSONObj o = c->next();
+ string name = o["_id"].valuestrsafe();
+ got.insert( name );
+ if ( name == "chunksize" ) {
+ int csize = o["value"].numberInt();
+
+ // validate chunksize before proceeding
+ if ( csize == 0 ) {
+ // setting was not modified; mark as such
+ got.erase(name);
+ log() << "warning: invalid chunksize (" << csize << ") ignored" << endl;
+ } else {
+ LOG(1) << "MaxChunkSize: " << csize << endl;
+ Chunk::MaxChunkSize = csize * 1024 * 1024;
+ }
+ }
+ else if ( name == "balancer" ) {
+ // ones we ignore here
+ }
+ else {
+ log() << "warning: unknown setting [" << name << "]" << endl;
+ }
+ }
+
+ if ( ! got.count( "chunksize" ) ) {
+ conn->insert( ShardNS::settings , BSON( "_id" << "chunksize" <<
+ "value" << (Chunk::MaxChunkSize / ( 1024 * 1024 ) ) ) );
+ }
+
+
+ // indexes
+ try {
+ conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "min" << 1 ) , true );
+ conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "shard" << 1 << "min" << 1 ) , true );
+ conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "lastmod" << 1 ) , true );
+ conn->ensureIndex( ShardNS::shard , BSON( "host" << 1 ) , true );
+ }
+ catch ( std::exception& e ) {
+ log( LL_WARNING ) << "couldn't create indexes on config db: " << e.what() << endl;
+ }
+
+ conn.done();
+ }
+
+ string ConfigServer::getHost( string name , bool withPort ) {
+ if ( name.find( ":" ) != string::npos ) {
+ if ( withPort )
+ return name;
+ return name.substr( 0 , name.find( ":" ) );
+ }
+
+ if ( withPort ) {
+ stringstream ss;
+ ss << name << ":" << CmdLine::ConfigServerPort;
+ return ss.str();
+ }
+
+ return name;
+ }
+
+ /* must never throw */
+ void ConfigServer::logChange( const string& what , const string& ns , const BSONObj& detail ) {
+ string changeID;
+
+ try {
+ // get this entry's ID so we can use on the exception code path too
+ stringstream id;
+ static AtomicUInt num;
+ id << getHostNameCached() << "-" << terseCurrentTime() << "-" << num++;
+ changeID = id.str();
+
+ // send a copy of the message to the log in case it doesn't manage to reach config.changelog
+ Client* c = currentClient.get();
+ BSONObj msg = BSON( "_id" << changeID << "server" << getHostNameCached() << "clientAddr" << (c ? c->clientAddress(true) : "N/A")
+ << "time" << DATENOW << "what" << what << "ns" << ns << "details" << detail );
+ log() << "about to log metadata event: " << msg << endl;
+
+ assert( _primary.ok() );
+
+ ScopedDbConnection conn( _primary, 30.0 );
+
+ static bool createdCapped = false;
+ if ( ! createdCapped ) {
+ try {
+ conn->createCollection( "config.changelog" , 1024 * 1024 * 10 , true );
+ }
+ catch ( UserException& e ) {
+ LOG(1) << "couldn't create changelog (like race condition): " << e << endl;
+ // don't care
+ }
+ createdCapped = true;
+ }
+
+ conn->insert( "config.changelog" , msg );
+
+ conn.done();
+
+ }
+
+ catch ( std::exception& e ) {
+ // if we got here, it means the config change is only in the log; it didn't make it to config.changelog
+ log() << "not logging config change: " << changeID << " " << e.what() << endl;
+ }
+ }
+
+ void ConfigServer::replicaSetChange( const ReplicaSetMonitor * monitor ) {
+ try {
+ Shard s = Shard::lookupRSName(monitor->getName());
+ if (s == Shard::EMPTY) {
+ log(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl;
+ return;
+ }
+ ScopedDbConnection conn( configServer.getConnectionString(), 30.0 );
+ conn->update( ShardNS::shard , BSON( "_id" << s.getName() ) , BSON( "$set" << BSON( "host" << monitor->getServerAddress() ) ) );
+ conn.done();
+ }
+ catch ( DBException & ) {
+ error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName() << " to: " << monitor->getServerAddress() << endl;
+ }
+ }
+
+ DBConfigPtr configServerPtr (new ConfigServer());
+ ConfigServer& configServer = dynamic_cast<ConfigServer&>(*configServerPtr);
+
+}