diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-06-14 18:44:51 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-06-14 18:44:51 -0400 |
commit | 440163fccc7554cf4d059187c5dda22b4817f7d1 (patch) | |
tree | 778f70a01ca69f674278cc123104ad3754a719bb /s/d_logic.cpp | |
parent | de1ce815b666a6c9c724a462240b3791e49f9f2f (diff) | |
download | mongo-440163fccc7554cf4d059187c5dda22b4817f7d1.tar.gz |
more sharding mongod side cleaning
Diffstat (limited to 's/d_logic.cpp')
-rw-r--r-- | s/d_logic.cpp | 166 |
1 files changed, 108 insertions, 58 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 2d49ad59317..9dceb11b62f 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -41,13 +41,80 @@ using namespace std; namespace mongo { + + // -----ShardingState START ---- + + ShardingState::ShardingState(){ + _enabled = false; + } + + void ShardingState::enable( const string& server ){ + _enabled = true; + assert( server.size() ); + if ( _configServer.size() == 0 ) + _configServer = server; + else { + assert( server == _configServer ); + } + } + + + bool ShardingState::hasVersion( const string& ns ) const { + NSVersionMap::const_iterator i = _versions.find(ns); + return i != _versions.end(); + } + + bool ShardingState::hasVersion( const string& ns , ConfigVersion& version ) const { + NSVersionMap::const_iterator i = _versions.find(ns); + if ( i == _versions.end() ) + return false; + version = i->second; + return true; + } + + ConfigVersion& ShardingState::getVersion( const string& ns ){ + return _versions[ns]; + } - NSVersionMap globalVersions; - boost::thread_specific_ptr<NSVersionMap> clientShardVersions; + void ShardingState::setVersion( const string& ns , const ConfigVersion& version ){ + _versions[ns] = version; + } - string shardConfigServer; + ShardingState shardingState; + + // -----ShardingState END ---- + + // -----ShardedConnectionInfo START ---- + + boost::thread_specific_ptr<ShardedConnectionInfo> ShardedConnectionInfo::_tl; + + ShardedConnectionInfo::ShardedConnectionInfo(){ + _id.clear(); + } + + ShardedConnectionInfo* ShardedConnectionInfo::get( bool create ){ + ShardedConnectionInfo* info = _tl.get(); + if ( ! info && create ){ + log(1) << "entering shard mode for connection" << endl; + info = new ShardedConnectionInfo(); + _tl.reset( info ); + } + return info; + } - boost::thread_specific_ptr<OID> clientServerIds; + ConfigVersion& ShardedConnectionInfo::getVersion( const string& ns ){ + return _versions[ns]; + } + + void ShardedConnectionInfo::setVersion( const string& ns , const ConfigVersion& version ){ + _versions[ns] = version; + } + + void ShardedConnectionInfo::setID( const OID& id ){ + _id = id; + } + + // -----ShardedConnectionInfo END ---- unsigned long long extractVersion( BSONElement e , string& errmsg ){ if ( e.eoo() ){ @@ -93,7 +160,8 @@ namespace mongo { virtual LockType locktype() const { return WRITE; } // TODO: figure out how to make this not need to lock bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - + ShardedConnectionInfo* info = ShardedConnectionInfo::get( true ); + bool authoritative = cmdObj.getBoolField( "authoritative" ); string configdb = cmdObj["configdb"].valuestrsafe(); @@ -103,17 +171,19 @@ namespace mongo { return false; } - if ( shardConfigServer.size() == 0 ){ + if ( shardingState.enabled() ){ + if ( configdb != shardingState.getConfigServer() ){ + errmsg = "specified a different configdb!"; + return false; + } + } + else { if ( ! authoritative ){ result.appendBool( "need_authoritative" , true ); errmsg = "first setShardVersion"; return false; } - shardConfigServer = configdb; - } - else if ( shardConfigServer != configdb ){ - errmsg = "specified a different configdb!"; - return false; + shardingState.enable( configdb ); } } @@ -125,14 +195,10 @@ namespace mongo { } else { OID clientId = cmdObj["serverID"].__oid(); - if ( ! clientServerIds.get() ){ - string s = clientId.str(); - - OID * nid = new OID(); - nid->init( s ); - clientServerIds.reset( nid ); + if ( ! info->hasID() ){ + info->setID( clientId ); } - else if ( clientId != *clientServerIds.get() ){ + else if ( clientId != info->getID() ){ errmsg = "server id has changed!"; return 0; } @@ -143,14 +209,6 @@ namespace mongo { if ( errmsg.size() ){ return false; } - - NSVersionMap * versions = clientShardVersions.get(); - - if ( ! versions ){ - log(1) << "entering shard mode for connection" << endl; - versions = new NSVersionMap(); - clientShardVersions.reset( versions ); - } string ns = cmdObj["setShardVersion"].valuestrsafe(); if ( ns.size() == 0 ){ @@ -158,8 +216,8 @@ namespace mongo { return false; } - unsigned long long& oldVersion = (*versions)[ns]; - unsigned long long& globalVersion = globalVersions[ns]; + ConfigVersion& oldVersion = info->getVersion(ns); + unsigned long long& globalVersion = shardingState.getVersion(ns); if ( version == 0 && globalVersion == 0 ){ // this connection is cleaning itself @@ -231,11 +289,13 @@ namespace mongo { return false; } - result.append( "configServer" , shardConfigServer.c_str() ); + result.append( "configServer" , shardingState.getConfigServer() ); - result.appendTimestamp( "global" , globalVersions[ns] ); - if ( clientShardVersions.get() ) - result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] ); + result.appendTimestamp( "global" , shardingState.getVersion(ns) ); + + ShardedConnectionInfo* info = ShardedConnectionInfo::get( false ); + if ( info ) + result.appendTimestamp( "mine" , info->getVersion(ns) ); else result.appendTimestamp( "mine" , 0 ); @@ -247,19 +307,13 @@ namespace mongo { bool haveLocalShardingInfo( const string& ns ){ - if ( shardConfigServer.empty() ) + if ( ! shardingState.enabled() ) return false; - - unsigned long long version = globalVersions[ns]; - if ( version == 0 ) - return false; - - NSVersionMap * versions = clientShardVersions.get(); - if ( ! versions ) + if ( ! shardingState.hasVersion( ns ) ) return false; - - return true; + + return ShardedConnectionInfo::get(false) > 0; } /** @@ -267,24 +321,22 @@ namespace mongo { or if version for this client is ok */ bool shardVersionOk( const string& ns , string& errmsg ){ - if ( shardConfigServer.empty() ){ - return true; - } - - NSVersionMap::iterator i = globalVersions.find( ns ); - if ( i == globalVersions.end() ) + if ( ! shardingState.enabled() ) return true; - NSVersionMap * versions = clientShardVersions.get(); - if ( ! versions ){ + ShardedConnectionInfo* info = ShardedConnectionInfo::get( false ); + if ( ! info ){ // this means the client has nothing sharded // so this allows direct connections to do whatever they want // which i think is the correct behavior return true; } - unsigned long long clientVersion = (*versions)[ns]; - unsigned long long version = i->second; + unsigned long long version; + if ( ! shardingState.hasVersion( ns , version ) ) + return true; + + unsigned long long clientVersion = info->getVersion(ns); if ( version == 0 && clientVersion > 0 ){ stringstream ss; @@ -308,10 +360,8 @@ namespace mongo { bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){ - - if ( shardConfigServer.empty() ){ + if ( ! shardingState.enabled() ) return false; - } int op = m.operation(); if ( op < 2000 @@ -354,8 +404,8 @@ namespace mongo { return true; } - OID * clientID = clientServerIds.get(); - massert( 10422 , "write with bad shard config and no server id!" , clientID ); + const OID& clientID = ShardedConnectionInfo::get(false)->getID(); + massert( 10422 , "write with bad shard config and no server id!" , clientID.isSet() ); log() << "got write with an old config - writing back" << endl; @@ -364,7 +414,7 @@ namespace mongo { b.append( "ns" , ns ); b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); log() << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl; - queueWriteBack( clientID->str() , b.obj() ); + queueWriteBack( clientID.str() , b.obj() ); return true; } |