// config.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "stdafx.h" #include "../util/message.h" #include "../util/unittest.h" #include "../client/connpool.h" #include "../client/model.h" #include "../db/pdfile.h" #include "../db/cmdline.h" #include "server.h" #include "config.h" #include "chunk.h" namespace mongo { int ConfigServer::VERSION = 2; /* --- DBConfig --- */ string DBConfig::modelServer() { return configServer.modelServer(); } bool DBConfig::isSharded( const string& ns ){ if ( ! _shardingEnabled ) return false; return _sharded.find( ns ) != _sharded.end(); } string DBConfig::getShard( const string& ns ){ if ( isSharded( ns ) ) return ""; uassert( 10178 , "no primary!" , _primary.size() ); return _primary; } void DBConfig::enableSharding(){ _shardingEnabled = true; } ChunkManager* DBConfig::shardCollection( const string& ns , ShardKeyPattern fieldsAndOrder , bool unique ){ if ( ! _shardingEnabled ) throw UserException( 8042 , "db doesn't have sharding enabled" ); ChunkManager * info = _shards[ns]; if ( info ) return info; if ( isSharded( ns ) ) throw UserException( 8043 , "already sharded" ); log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl; _sharded[ns] = CollectionInfo( fieldsAndOrder , unique ); info = new ChunkManager( this , ns , fieldsAndOrder , unique ); _shards[ns] = info; return info; } bool DBConfig::removeSharding( const string& ns ){ if ( ! _shardingEnabled ){ cout << "AAAA" << endl; return false; } ChunkManager * info = _shards[ns]; map::iterator i = _sharded.find( ns ); if ( info == 0 && i == _sharded.end() ){ cout << "BBBB" << endl; return false; } uassert( 10179 , "_sharded but no info" , info ); uassert( 10180 , "info but no sharded" , i != _sharded.end() ); _sharded.erase( i ); _shards.erase( ns ); // TODO: clean this up, maybe switch to shared_ptr return true; } ChunkManager* DBConfig::getChunkManager( const string& ns , bool reload ){ ChunkManager* m = _shards[ns]; if ( m && ! reload ) return m; uassert( 10181 , (string)"not sharded:" + ns , isSharded( ns ) ); if ( m && reload ) log() << "reloading shard info for: " << ns << endl; m = new ChunkManager( this , ns , _sharded[ ns ].key , _sharded[ns].unique ); _shards[ns] = m; return m; } void DBConfig::serialize(BSONObjBuilder& to){ to.append("name", _name); to.appendBool("partitioned", _shardingEnabled ); to.append("primary", _primary ); if ( _sharded.size() > 0 ){ BSONObjBuilder a; for ( map::reverse_iterator i=_sharded.rbegin(); i != _sharded.rend(); i++){ BSONObjBuilder temp; temp.append( "key" , i->second.key.key() ); temp.appendBool( "unique" , i->second.unique ); a.append( i->first.c_str() , temp.obj() ); } to.append( "sharded" , a.obj() ); } } void DBConfig::unserialize(const BSONObj& from){ _name = from.getStringField("name"); _shardingEnabled = from.getBoolField("partitioned"); _primary = from.getStringField("primary"); _sharded.clear(); BSONObj sharded = from.getObjectField( "sharded" ); if ( ! sharded.isEmpty() ){ BSONObjIterator i(sharded); while ( i.more() ){ BSONElement e = i.next(); uassert( 10182 , "sharded things have to be objects" , e.type() == Object ); BSONObj c = e.embeddedObject(); uassert( 10183 , "key has to be an object" , c["key"].type() == Object ); _sharded[e.fieldName()] = CollectionInfo( c["key"].embeddedObject() , c["unique"].trueValue() ); } } } void DBConfig::save( bool check ){ Model::save( check ); for ( map::iterator i=_shards.begin(); i != _shards.end(); i++) i->second->save(); } bool DBConfig::reload(){ // TODO: i don't think is 100% correct return doload(); } bool DBConfig::doload(){ BSONObjBuilder b; b.append("name", _name.c_str()); BSONObj q = b.done(); return load(q); } bool DBConfig::dropDatabase( string& errmsg ){ /** * 1) make sure everything is up * 2) update config server * 3) drop and reset sharded collections * 4) drop and reset primary * 5) drop everywhere to clean up loose ends */ log() << "DBConfig::dropDatabase: " << _name << endl; // 1 if ( ! configServer.allUp( errmsg ) ){ log(1) << "\t DBConfig::dropDatabase not all up" << endl; return 0; } // 2 grid.removeDB( _name ); remove( true ); if ( ! configServer.allUp( errmsg ) ){ log() << "error removing from config server even after checking!" << endl; return 0; } log(1) << "\t removed entry from config server for: " << _name << endl; set allServers; // 3 while ( true ){ int num; if ( ! _dropShardedCollections( num , allServers , errmsg ) ) return 0; log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num << endl; if ( num == 0 ) break; } // 4 { ScopedDbConnection conn( _primary ); BSONObj res; if ( ! conn->dropDatabase( _name , &res ) ){ errmsg = res.toString(); return 0; } conn.done(); } // 5 for ( set::iterator i=allServers.begin(); i!=allServers.end(); i++ ){ string s = *i; ScopedDbConnection conn( s ); BSONObj res; if ( ! conn->dropDatabase( _name , &res ) ){ errmsg = res.toString(); return 0; } conn.done(); } log(1) << "\t dropped primary db for: " << _name << endl; return true; } bool DBConfig::_dropShardedCollections( int& num, set& allServers , string& errmsg ){ num = 0; set seen; while ( true ){ map::iterator i = _shards.begin(); if ( i == _shards.end() ) break; if ( seen.count( i->first ) ){ errmsg = "seen a collection twice!"; return false; } seen.insert( i->first ); log(1) << "\t dropping sharded collection: " << i->first << endl; i->second->getAllServers( allServers ); i->second->drop(); num++; uassert( 10184 , "_dropShardedCollections too many collections - bailing" , num < 100000 ); log(2) << "\t\t dropped " << num << " so far" << endl; } return true; } /* --- Grid --- */ string Grid::pickShardForNewDB(){ ScopedDbConnection conn( configServer.getPrimary() ); // TODO: this is temporary vector all; auto_ptr c = conn->query( "config.shards" , Query() ); while ( c->more() ){ BSONObj s = c->next(); all.push_back( s["host"].valuestrsafe() ); // look at s["maxSize"] if exists } conn.done(); if ( all.size() == 0 ) return ""; return all[ rand() % all.size() ]; } bool Grid::knowAboutShard( string name ) const{ ScopedDbConnection conn( configServer.getPrimary() ); BSONObj shard = conn->findOne( "config.shards" , BSON( "host" << name ) ); conn.done(); return ! shard.isEmpty(); } DBConfig* Grid::getDBConfig( string database , bool create ){ { string::size_type i = database.find( "." ); if ( i != string::npos ) database = database.substr( 0 , i ); } if ( database == "config" ) return &configServer; scoped_lock l( _lock ); DBConfig*& cc = _databases[database]; if ( cc == 0 ){ cc = new DBConfig( database ); if ( ! cc->doload() ){ if ( create ){ // note here that cc->primary == 0. log() << "couldn't find database [" << database << "] in config db" << endl; if ( database == "admin" ) cc->_primary = configServer.getPrimary(); else cc->_primary = pickShardForNewDB(); if ( cc->_primary.size() ){ cc->save(); log() << "\t put [" << database << "] on: " << cc->_primary << endl; } else { log() << "\t can't find a shard to put new db on" << endl; uassert( 10185 , "can't find a shard to put new db on" , 0 ); } } else { cc = 0; } } } return cc; } void Grid::removeDB( string database ){ uassert( 10186 , "removeDB expects db name" , database.find( '.' ) == string::npos ); scoped_lock l( _lock ); _databases.erase( database ); } unsigned long long Grid::getNextOpTime() const { ScopedDbConnection conn( configServer.getPrimary() ); BSONObj result; massert( 10421 , "getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime" ) ); conn.done(); return result["optime"]._numberLong(); } /* --- ConfigServer ---- */ ConfigServer::ConfigServer() { _shardingEnabled = false; _primary = ""; _name = "grid"; } ConfigServer::~ConfigServer() { } bool ConfigServer::init( vector configHosts ){ uassert( 10187 , "need configdbs" , configHosts.size() ); string hn = getHostName(); if ( hn.empty() ) { sleepsecs(5); dbexit( EXIT_BADOPTIONS ); } ourHostname = hn; set hosts; for ( size_t i=0; i::iterator i=hosts.begin(); i!=hosts.end(); i++ ){ string host = *i; bool ok = false; for ( int x=0; x<10; x++ ){ if ( ! hostbyname( host.c_str() ).empty() ){ ok = true; break; } log() << "can't resolve DNS for [" << host << "] sleeping and trying " << (10-x) << " more times" << endl; sleepsecs( 10 ); } if ( ! ok ) return false; } uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 ); _primary = configHosts[0]; return true; } bool ConfigServer::allUp(){ string errmsg; return allUp( errmsg ); } bool ConfigServer::allUp( string& errmsg ){ try { ScopedDbConnection conn( _primary ); conn->getLastError(); conn.done(); return true; } catch ( DBException& ){ log() << "ConfigServer::allUp : " << _primary << " seems down!" << endl; errmsg = _primary + " seems down"; return false; } } int ConfigServer::dbConfigVersion(){ ScopedDbConnection conn( _primary ); int version = dbConfigVersion( conn.conn() ); conn.done(); return version; } int ConfigServer::dbConfigVersion( DBClientBase& conn ){ auto_ptr c = conn.query( "config.version" , BSONObj() ); int version = 0; if ( c->more() ){ BSONObj o = c->next(); version = o["version"].numberInt(); uassert( 10189 , "should only have 1 thing in config.version" , ! c->more() ); } else { if ( conn.count( "config.shard" ) || conn.count( "config.databases" ) ){ version = 1; } } return version; } int ConfigServer::checkConfigVersion(){ int cur = dbConfigVersion(); if ( cur == VERSION ) return 0; if ( cur == 0 ){ ScopedDbConnection conn( _primary ); conn->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) ); pool.flush(); assert( VERSION == dbConfigVersion( conn.conn() ) ); conn.done(); return 0; } log() << "don't know how to upgrade " << cur << " to " << VERSION << endl; return -8; } void ConfigServer::reloadSettings(){ set got; ScopedDbConnection conn( _primary ); auto_ptr c = conn->query( "config.settings" , BSONObj() ); while ( c->more() ){ BSONObj o = c->next(); string name = o["_id"].valuestrsafe(); got.insert( name ); if ( name == "chunksize" ){ log(1) << "MaxChunkSize: " << o["value"] << endl; Chunk::MaxChunkSize = o["value"].numberInt() * 1024 * 1024; } else { log() << "warning: unknown setting [" << name << "]" << endl; } } if ( ! got.count( "chunksize" ) ){ conn->insert( "config.settings" , BSON( "_id" << "chunksize" << "value" << (Chunk::MaxChunkSize / ( 1024 * 1024 ) ) ) ); } conn.done(); } string ConfigServer::getHost( string name , bool withPort ){ if ( name.find( ":" ) ){ if ( withPort ) return name; return name.substr( 0 , name.find( ":" ) ); } if ( withPort ){ stringstream ss; ss << name << ":" << CmdLine::ConfigServerPort; return ss.str(); } return name; } ConfigServer configServer; Grid grid; class DBConfigUnitTest : public UnitTest { public: void testInOut( DBConfig& c , BSONObj o ){ c.unserialize( o ); BSONObjBuilder b; c.serialize( b ); BSONObj out = b.obj(); if ( o.toString() == out.toString() ) return; log() << "DBConfig serialization broken\n" << "in : " << o.toString() << "\n" << "out : " << out.toString() << endl; assert(0); } void a(){ BSONObjBuilder b; b << "name" << "abc"; b.appendBool( "partitioned" , true ); b << "primary" << "myserver"; DBConfig c; testInOut( c , b.obj() ); } void b(){ BSONObjBuilder b; b << "name" << "abc"; b.appendBool( "partitioned" , true ); b << "primary" << "myserver"; BSONObjBuilder a; a << "abc.foo" << fromjson( "{ 'key' : { 'a' : 1 } , 'unique' : false }" ); a << "abc.bar" << fromjson( "{ 'key' : { 'kb' : -1 } , 'unique' : true }" ); b.appendArray( "sharded" , a.obj() ); DBConfig c; testInOut( c , b.obj() ); assert( c.isSharded( "abc.foo" ) ); assert( ! c.isSharded( "abc.food" ) ); } void run(){ a(); b(); } } dbConfigUnitTest; }