summaryrefslogtreecommitdiff
path: root/s/d_logic.cpp
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-06-14 18:44:51 -0400
committerEliot Horowitz <eliot@10gen.com>2010-06-14 18:44:51 -0400
commit440163fccc7554cf4d059187c5dda22b4817f7d1 (patch)
tree778f70a01ca69f674278cc123104ad3754a719bb /s/d_logic.cpp
parentde1ce815b666a6c9c724a462240b3791e49f9f2f (diff)
downloadmongo-440163fccc7554cf4d059187c5dda22b4817f7d1.tar.gz
more sharding mongod side cleaning
Diffstat (limited to 's/d_logic.cpp')
-rw-r--r--s/d_logic.cpp166
1 files changed, 108 insertions, 58 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index 2d49ad59317..9dceb11b62f 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -41,13 +41,80 @@
using namespace std;
namespace mongo {
+
+ // -----ShardingState START ----
+
+ ShardingState::ShardingState(){
+ _enabled = false;
+ }
+
+ void ShardingState::enable( const string& server ){
+ _enabled = true;
+ assert( server.size() );
+ if ( _configServer.size() == 0 )
+ _configServer = server;
+ else {
+ assert( server == _configServer );
+ }
+ }
+
+
+ bool ShardingState::hasVersion( const string& ns ) const {
+ NSVersionMap::const_iterator i = _versions.find(ns);
+ return i != _versions.end();
+ }
+
+ bool ShardingState::hasVersion( const string& ns , ConfigVersion& version ) const {
+ NSVersionMap::const_iterator i = _versions.find(ns);
+ if ( i == _versions.end() )
+ return false;
+ version = i->second;
+ return true;
+ }
+
+ ConfigVersion& ShardingState::getVersion( const string& ns ){
+ return _versions[ns];
+ }
- NSVersionMap globalVersions;
- boost::thread_specific_ptr<NSVersionMap> clientShardVersions;
+ void ShardingState::setVersion( const string& ns , const ConfigVersion& version ){
+ _versions[ns] = version;
+ }
- string shardConfigServer;
+ ShardingState shardingState;
+
+ // -----ShardingState END ----
+
+ // -----ShardedConnectionInfo START ----
+
+ boost::thread_specific_ptr<ShardedConnectionInfo> ShardedConnectionInfo::_tl;
+
+ ShardedConnectionInfo::ShardedConnectionInfo(){
+ _id.clear();
+ }
+
+ ShardedConnectionInfo* ShardedConnectionInfo::get( bool create ){
+ ShardedConnectionInfo* info = _tl.get();
+ if ( ! info && create ){
+ log(1) << "entering shard mode for connection" << endl;
+ info = new ShardedConnectionInfo();
+ _tl.reset( info );
+ }
+ return info;
+ }
- boost::thread_specific_ptr<OID> clientServerIds;
+ ConfigVersion& ShardedConnectionInfo::getVersion( const string& ns ){
+ return _versions[ns];
+ }
+
+ void ShardedConnectionInfo::setVersion( const string& ns , const ConfigVersion& version ){
+ _versions[ns] = version;
+ }
+
+ void ShardedConnectionInfo::setID( const OID& id ){
+ _id = id;
+ }
+
+ // -----ShardedConnectionInfo END ----
unsigned long long extractVersion( BSONElement e , string& errmsg ){
if ( e.eoo() ){
@@ -93,7 +160,8 @@ namespace mongo {
virtual LockType locktype() const { return WRITE; } // TODO: figure out how to make this not need to lock
bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get( true );
+
bool authoritative = cmdObj.getBoolField( "authoritative" );
string configdb = cmdObj["configdb"].valuestrsafe();
@@ -103,17 +171,19 @@ namespace mongo {
return false;
}
- if ( shardConfigServer.size() == 0 ){
+ if ( shardingState.enabled() ){
+ if ( configdb != shardingState.getConfigServer() ){
+ errmsg = "specified a different configdb!";
+ return false;
+ }
+ }
+ else {
if ( ! authoritative ){
result.appendBool( "need_authoritative" , true );
errmsg = "first setShardVersion";
return false;
}
- shardConfigServer = configdb;
- }
- else if ( shardConfigServer != configdb ){
- errmsg = "specified a different configdb!";
- return false;
+ shardingState.enable( configdb );
}
}
@@ -125,14 +195,10 @@ namespace mongo {
}
else {
OID clientId = cmdObj["serverID"].__oid();
- if ( ! clientServerIds.get() ){
- string s = clientId.str();
-
- OID * nid = new OID();
- nid->init( s );
- clientServerIds.reset( nid );
+ if ( ! info->hasID() ){
+ info->setID( clientId );
}
- else if ( clientId != *clientServerIds.get() ){
+ else if ( clientId != info->getID() ){
errmsg = "server id has changed!";
return 0;
}
@@ -143,14 +209,6 @@ namespace mongo {
if ( errmsg.size() ){
return false;
}
-
- NSVersionMap * versions = clientShardVersions.get();
-
- if ( ! versions ){
- log(1) << "entering shard mode for connection" << endl;
- versions = new NSVersionMap();
- clientShardVersions.reset( versions );
- }
string ns = cmdObj["setShardVersion"].valuestrsafe();
if ( ns.size() == 0 ){
@@ -158,8 +216,8 @@ namespace mongo {
return false;
}
- unsigned long long& oldVersion = (*versions)[ns];
- unsigned long long& globalVersion = globalVersions[ns];
+ ConfigVersion& oldVersion = info->getVersion(ns);
+ unsigned long long& globalVersion = shardingState.getVersion(ns);
if ( version == 0 && globalVersion == 0 ){
// this connection is cleaning itself
@@ -231,11 +289,13 @@ namespace mongo {
return false;
}
- result.append( "configServer" , shardConfigServer.c_str() );
+ result.append( "configServer" , shardingState.getConfigServer() );
- result.appendTimestamp( "global" , globalVersions[ns] );
- if ( clientShardVersions.get() )
- result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] );
+ result.appendTimestamp( "global" , shardingState.getVersion(ns) );
+
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get( false );
+ if ( info )
+ result.appendTimestamp( "mine" , info->getVersion(ns) );
else
result.appendTimestamp( "mine" , 0 );
@@ -247,19 +307,13 @@ namespace mongo {
bool haveLocalShardingInfo( const string& ns ){
- if ( shardConfigServer.empty() )
+ if ( ! shardingState.enabled() )
return false;
-
- unsigned long long version = globalVersions[ns];
- if ( version == 0 )
- return false;
-
- NSVersionMap * versions = clientShardVersions.get();
- if ( ! versions )
+ if ( ! shardingState.hasVersion( ns ) )
return false;
-
- return true;
+
+ return ShardedConnectionInfo::get(false) > 0;
}
/**
@@ -267,24 +321,22 @@ namespace mongo {
or if version for this client is ok
*/
bool shardVersionOk( const string& ns , string& errmsg ){
- if ( shardConfigServer.empty() ){
- return true;
- }
-
- NSVersionMap::iterator i = globalVersions.find( ns );
- if ( i == globalVersions.end() )
+ if ( ! shardingState.enabled() )
return true;
- NSVersionMap * versions = clientShardVersions.get();
- if ( ! versions ){
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get( false );
+ if ( ! info ){
// this means the client has nothing sharded
// so this allows direct connections to do whatever they want
// which i think is the correct behavior
return true;
}
- unsigned long long clientVersion = (*versions)[ns];
- unsigned long long version = i->second;
+ unsigned long long version;
+ if ( ! shardingState.hasVersion( ns , version ) )
+ return true;
+
+ unsigned long long clientVersion = info->getVersion(ns);
if ( version == 0 && clientVersion > 0 ){
stringstream ss;
@@ -308,10 +360,8 @@ namespace mongo {
bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){
-
- if ( shardConfigServer.empty() ){
+ if ( ! shardingState.enabled() )
return false;
- }
int op = m.operation();
if ( op < 2000
@@ -354,8 +404,8 @@ namespace mongo {
return true;
}
- OID * clientID = clientServerIds.get();
- massert( 10422 , "write with bad shard config and no server id!" , clientID );
+ const OID& clientID = ShardedConnectionInfo::get(false)->getID();
+ massert( 10422 , "write with bad shard config and no server id!" , clientID.isSet() );
log() << "got write with an old config - writing back" << endl;
@@ -364,7 +414,7 @@ namespace mongo {
b.append( "ns" , ns );
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
log() << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl;
- queueWriteBack( clientID->str() , b.obj() );
+ queueWriteBack( clientID.str() , b.obj() );
return true;
}