diff options
author | Eliot Horowitz <eliot@10gen.com> | 2009-09-14 11:33:42 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2009-09-14 11:33:42 -0400 |
commit | 3b5b0caee9acb94f1a5266587213163945cc44a5 (patch) | |
tree | 9fbc39227ee15471f57dcf34123eec339023f975 | |
parent | 4df046f5b07673be1071f9041f08788111a0ccaa (diff) | |
download | mongo-3b5b0caee9acb94f1a5266587213163945cc44a5.tar.gz |
getLastError sharded SHARDING-16
-rw-r--r-- | jstests/sharding/error1.js | 32 | ||||
-rw-r--r-- | s/commands_admin.cpp | 53 | ||||
-rw-r--r-- | s/request.cpp | 70 | ||||
-rw-r--r-- | s/request.h | 40 | ||||
-rw-r--r-- | s/server.cpp | 5 |
5 files changed, 191 insertions, 9 deletions
diff --git a/jstests/sharding/error1.js b/jstests/sharding/error1.js index f995a2bc26c..4b5019bb9f0 100644 --- a/jstests/sharding/error1.js +++ b/jstests/sharding/error1.js @@ -2,6 +2,9 @@ s = new ShardingTest( "error1" , 2 , 1 , 1 ); s.adminCommand( { enablesharding : "test" } ); +a = s._connections[0].getDB( "test" ); +b = s._connections[1].getDB( "test" ); + // ---- simple getLastError ---- db = s.getDB( "test" ); @@ -9,7 +12,36 @@ db.foo.insert( { _id : 1 } ); assert.isnull( db.getLastError() , "gle 1" ); db.foo.insert( { _id : 1 } ); assert( db.getLastError() , "gle21" ); +assert( db.getLastError() , "gle22" ); + +// --- sharded getlasterror + +s.adminCommand( { shardcollection : "test.foo2" , key : { num : 1 } } ); + +db.foo2.save( { _id : 1 , num : 5 } ); +db.foo2.save( { _id : 2 , num : 10 } ); +db.foo2.save( { _id : 3 , num : 15 } ); +db.foo2.save( { _id : 4 , num : 20 } ); + +s.adminCommand( { split : "test.foo2" , middle : { num : 10 } } ); +s.adminCommand( { movechunk : "test.foo2" , find : { num : 20 } , to : s.getOther( s.getServer( "test" ) ).name } ); + +assert( a.foo2.count() > 0 && a.foo2.count() < 4 , "se1" ); +assert( b.foo2.count() > 0 && b.foo2.count() < 4 , "se2" ); +assert.eq( 4 , db.foo2.count() , "se3" ); + +db.foo2.save( { _id : 5 , num : 25 } ); +assert( ! db.getLastError() , "se3.5" ); +s.sync(); +assert.eq( 5 , db.foo2.count() , "se4" ); + + + +db.foo2.insert( { _id : 5 , num : 30 } ); +assert( db.getLastError() , "se5" ); +assert( db.getLastError() , "se6" ); +assert.eq( 5 , db.foo2.count() , "se5" ); // ---- s.stop(); diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp index 7aa5b511b53..981a7f5b834 100644 --- a/s/commands_admin.cpp +++ b/s/commands_admin.cpp @@ -588,15 +588,54 @@ namespace mongo { DBConfig * conf = grid.getDBConfig( dbName , false ); - ScopedDbConnection conn( conf->getPrimary() ); - BSONObj res; - bool ok = conn->runCommand( conf->getName() , cmdObj , res ); - result.appendElements( res ); - conn.done(); - return ok; + ClientInfo * client = ClientInfo::get(); + set<string> * shards = client->getPrev(); + + if ( shards->size() == 0 ){ + result.appendNull( "err" ); + result.append( "ok" , 1 ); + return true; + } + + if ( shards->size() == 1 ){ + string theShard = *(shards->begin() ); + result.append( "theshard" , theShard.c_str() ); + ScopedDbConnection conn( theShard ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + result.appendElements( res ); + conn.done(); + return ok; + } + + vector<string> errors; + for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ){ + string theShard = *i; + ScopedDbConnection conn( theShard ); + string temp = conn->getLastError(); + if ( temp.size() ) + errors.push_back( temp ); + conn.done(); + } + + if ( errors.size() == 0 ){ + result.appendNull( "err" ); + result.append( "ok" , 1 ); + return true; + } + + result.append( "err" , errors[0].c_str() ); + + BSONObjBuilder all; + for ( unsigned i=0; i<errors.size(); i++ ){ + all.append( all.numStr( i ).c_str() , errors[i].c_str() ); + } + result.appendArray( "errs" , all.obj() ); + result.append( "ok" , 1 ); + return true; } } cmdGetLastError; - + } } // namespace mongo diff --git a/s/request.cpp b/s/request.cpp index 0b75b9aa3bb..abf741bdb7c 100644 --- a/s/request.cpp +++ b/s/request.cpp @@ -31,10 +31,16 @@ namespace mongo { - Request::Request( Message& m, AbstractMessagingPort* p ) : _m(m) , _d( m ) , _p(p){ + Request::Request( Message& m, AbstractMessagingPort* p ) : + _m(m) , _d( m ) , _p(p){ + assert( _d.getns() ); _id = _m.data->id; + _clientId = p->remotePort() << 16; + _clientInfo = ClientInfo::get( _clientId ); + _clientInfo->newRequest(); + reset(); } @@ -102,4 +108,66 @@ namespace mongo { } } + + ClientInfo::ClientInfo( int clientId ) : _id( clientId ){ + _cur = &_a; + _prev = &_b; + newRequest(); + } + + ClientInfo::~ClientInfo(){ + boostlock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( _id ); + if ( i != _clients.end() ){ + _clients.erase( i ); + } + } + + void ClientInfo::addShard( const string& shard ){ + _cur->insert( shard ); + } + + void ClientInfo::newRequest(){ + _lastAccess = time(0); + + set<string> * temp = _cur; + _cur = _prev; + _prev = temp; + _cur->clear(); + } + + void ClientInfo::disconnect(){ + _lastAccess = 0; + } + + ClientInfo * ClientInfo::get( int clientId , bool create ){ + + if ( ! clientId ) + clientId = getClientId(); + + if ( ! clientId ){ + ClientInfo * info = _tlInfo.get(); + if ( ! info ){ + info = new ClientInfo( 0 ); + _tlInfo.reset( info ); + } + info->newRequest(); + return info; + } + + boostlock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( clientId ); + if ( i != _clients.end() ) + return i->second; + if ( ! create ) + return 0; + ClientInfo * info = new ClientInfo( clientId ); + _clients[clientId] = info; + return info; + } + + map<int,ClientInfo*> ClientInfo::_clients; + mutex ClientInfo::_clientsLock; + thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo; + } // namespace mongo diff --git a/s/request.h b/s/request.h index 6be88afee3c..a198cbe8e4b 100644 --- a/s/request.h +++ b/s/request.h @@ -9,6 +9,8 @@ namespace mongo { + class ClientInfo; + class Request : boost::noncopyable { public: Request( Message& m, AbstractMessagingPort* p ); @@ -40,6 +42,13 @@ namespace mongo { ChunkManager * getChunkManager(){ return _chunkManager; } + + int getClientId(){ + return _clientId; + } + ClientInfo * getClientInfo(){ + return _clientInfo; + } // ---- remote location info ----- @@ -73,6 +82,9 @@ namespace mongo { MSGID _id; DBConfig * _config; ChunkManager * _chunkManager; + + int _clientId; + ClientInfo * _clientInfo; }; class StaleConfigException : public std::exception { @@ -91,6 +103,34 @@ namespace mongo { private: string _msg; }; + + typedef map<int,ClientInfo*> ClientCache; + + class ClientInfo { + public: + ClientInfo( int clientId ); + ~ClientInfo(); + + void addShard( const string& shard ); + set<string> * getPrev() const { return _prev; }; + + void newRequest(); + void disconnect(); + + static ClientInfo * get( int clientId = 0 , bool create = true ); + + private: + int _id; + set<string> _a; + set<string> _b; + set<string> * _cur; + set<string> * _prev; + int _lastAccess; + + static mutex _clientsLock; + static ClientCache _clients; + static thread_specific_ptr<ClientInfo> _tlInfo; + }; } #include "strategy.h" diff --git a/s/server.cpp b/s/server.cpp index 0dffe9ccb46..a663100e4fc 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -64,6 +64,9 @@ namespace mongo { virtual void onCreate( DBClientBase * conn ){ conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" ); } + virtual void onHandedOut( DBClientBase * conn ){ + ClientInfo::get()->addShard( conn->getServerAddress() ); + } } shardingConnectionHook; class ShardedMessageHandler : public MessageHandler { @@ -72,7 +75,7 @@ namespace mongo { virtual void process( Message& m , AbstractMessagingPort* p ){ Request r( m , p ); try { - setClientId( p->remotePort() << 16 ); + setClientId( r.getClientId() ); r.process(); } catch ( DBException& e ){ |