summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-09-14 11:33:42 -0400
committerEliot Horowitz <eliot@10gen.com>2009-09-14 11:33:42 -0400
commit3b5b0caee9acb94f1a5266587213163945cc44a5 (patch)
tree9fbc39227ee15471f57dcf34123eec339023f975
parent4df046f5b07673be1071f9041f08788111a0ccaa (diff)
downloadmongo-3b5b0caee9acb94f1a5266587213163945cc44a5.tar.gz
getLastError sharded SHARDING-16
-rw-r--r--jstests/sharding/error1.js32
-rw-r--r--s/commands_admin.cpp53
-rw-r--r--s/request.cpp70
-rw-r--r--s/request.h40
-rw-r--r--s/server.cpp5
5 files changed, 191 insertions, 9 deletions
diff --git a/jstests/sharding/error1.js b/jstests/sharding/error1.js
index f995a2bc26c..4b5019bb9f0 100644
--- a/jstests/sharding/error1.js
+++ b/jstests/sharding/error1.js
@@ -2,6 +2,9 @@
s = new ShardingTest( "error1" , 2 , 1 , 1 );
s.adminCommand( { enablesharding : "test" } );
+a = s._connections[0].getDB( "test" );
+b = s._connections[1].getDB( "test" );
+
// ---- simple getLastError ----
db = s.getDB( "test" );
@@ -9,7 +12,36 @@ db.foo.insert( { _id : 1 } );
assert.isnull( db.getLastError() , "gle 1" );
db.foo.insert( { _id : 1 } );
assert( db.getLastError() , "gle21" );
+assert( db.getLastError() , "gle22" );
+
+// --- sharded getlasterror
+
+s.adminCommand( { shardcollection : "test.foo2" , key : { num : 1 } } );
+
+db.foo2.save( { _id : 1 , num : 5 } );
+db.foo2.save( { _id : 2 , num : 10 } );
+db.foo2.save( { _id : 3 , num : 15 } );
+db.foo2.save( { _id : 4 , num : 20 } );
+
+s.adminCommand( { split : "test.foo2" , middle : { num : 10 } } );
+s.adminCommand( { movechunk : "test.foo2" , find : { num : 20 } , to : s.getOther( s.getServer( "test" ) ).name } );
+
+assert( a.foo2.count() > 0 && a.foo2.count() < 4 , "se1" );
+assert( b.foo2.count() > 0 && b.foo2.count() < 4 , "se2" );
+assert.eq( 4 , db.foo2.count() , "se3" );
+
+db.foo2.save( { _id : 5 , num : 25 } );
+assert( ! db.getLastError() , "se3.5" );
+s.sync();
+assert.eq( 5 , db.foo2.count() , "se4" );
+
+
+
+db.foo2.insert( { _id : 5 , num : 30 } );
+assert( db.getLastError() , "se5" );
+assert( db.getLastError() , "se6" );
+assert.eq( 5 , db.foo2.count() , "se5" );
// ----
s.stop();
diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp
index 7aa5b511b53..981a7f5b834 100644
--- a/s/commands_admin.cpp
+++ b/s/commands_admin.cpp
@@ -588,15 +588,54 @@ namespace mongo {
DBConfig * conf = grid.getDBConfig( dbName , false );
- ScopedDbConnection conn( conf->getPrimary() );
- BSONObj res;
- bool ok = conn->runCommand( conf->getName() , cmdObj , res );
- result.appendElements( res );
- conn.done();
- return ok;
+ ClientInfo * client = ClientInfo::get();
+ set<string> * shards = client->getPrev();
+
+ if ( shards->size() == 0 ){
+ result.appendNull( "err" );
+ result.append( "ok" , 1 );
+ return true;
+ }
+
+ if ( shards->size() == 1 ){
+ string theShard = *(shards->begin() );
+ result.append( "theshard" , theShard.c_str() );
+ ScopedDbConnection conn( theShard );
+ BSONObj res;
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ result.appendElements( res );
+ conn.done();
+ return ok;
+ }
+
+ vector<string> errors;
+ for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ){
+ string theShard = *i;
+ ScopedDbConnection conn( theShard );
+ string temp = conn->getLastError();
+ if ( temp.size() )
+ errors.push_back( temp );
+ conn.done();
+ }
+
+ if ( errors.size() == 0 ){
+ result.appendNull( "err" );
+ result.append( "ok" , 1 );
+ return true;
+ }
+
+ result.append( "err" , errors[0].c_str() );
+
+ BSONObjBuilder all;
+ for ( unsigned i=0; i<errors.size(); i++ ){
+ all.append( all.numStr( i ).c_str() , errors[i].c_str() );
+ }
+ result.appendArray( "errs" , all.obj() );
+ result.append( "ok" , 1 );
+ return true;
}
} cmdGetLastError;
-
+
}
} // namespace mongo
diff --git a/s/request.cpp b/s/request.cpp
index 0b75b9aa3bb..abf741bdb7c 100644
--- a/s/request.cpp
+++ b/s/request.cpp
@@ -31,10 +31,16 @@
namespace mongo {
- Request::Request( Message& m, AbstractMessagingPort* p ) : _m(m) , _d( m ) , _p(p){
+ Request::Request( Message& m, AbstractMessagingPort* p ) :
+ _m(m) , _d( m ) , _p(p){
+
assert( _d.getns() );
_id = _m.data->id;
+ _clientId = p->remotePort() << 16;
+ _clientInfo = ClientInfo::get( _clientId );
+ _clientInfo->newRequest();
+
reset();
}
@@ -102,4 +108,66 @@ namespace mongo {
}
}
+
+ ClientInfo::ClientInfo( int clientId ) : _id( clientId ){
+ _cur = &_a;
+ _prev = &_b;
+ newRequest();
+ }
+
+ ClientInfo::~ClientInfo(){
+ boostlock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( _id );
+ if ( i != _clients.end() ){
+ _clients.erase( i );
+ }
+ }
+
+ void ClientInfo::addShard( const string& shard ){
+ _cur->insert( shard );
+ }
+
+ void ClientInfo::newRequest(){
+ _lastAccess = time(0);
+
+ set<string> * temp = _cur;
+ _cur = _prev;
+ _prev = temp;
+ _cur->clear();
+ }
+
+ void ClientInfo::disconnect(){
+ _lastAccess = 0;
+ }
+
+ ClientInfo * ClientInfo::get( int clientId , bool create ){
+
+ if ( ! clientId )
+ clientId = getClientId();
+
+ if ( ! clientId ){
+ ClientInfo * info = _tlInfo.get();
+ if ( ! info ){
+ info = new ClientInfo( 0 );
+ _tlInfo.reset( info );
+ }
+ info->newRequest();
+ return info;
+ }
+
+ boostlock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( clientId );
+ if ( i != _clients.end() )
+ return i->second;
+ if ( ! create )
+ return 0;
+ ClientInfo * info = new ClientInfo( clientId );
+ _clients[clientId] = info;
+ return info;
+ }
+
+ map<int,ClientInfo*> ClientInfo::_clients;
+ mutex ClientInfo::_clientsLock;
+ thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo;
+
} // namespace mongo
diff --git a/s/request.h b/s/request.h
index 6be88afee3c..a198cbe8e4b 100644
--- a/s/request.h
+++ b/s/request.h
@@ -9,6 +9,8 @@
namespace mongo {
+ class ClientInfo;
+
class Request : boost::noncopyable {
public:
Request( Message& m, AbstractMessagingPort* p );
@@ -40,6 +42,13 @@ namespace mongo {
ChunkManager * getChunkManager(){
return _chunkManager;
}
+
+ int getClientId(){
+ return _clientId;
+ }
+ ClientInfo * getClientInfo(){
+ return _clientInfo;
+ }
// ---- remote location info -----
@@ -73,6 +82,9 @@ namespace mongo {
MSGID _id;
DBConfig * _config;
ChunkManager * _chunkManager;
+
+ int _clientId;
+ ClientInfo * _clientInfo;
};
class StaleConfigException : public std::exception {
@@ -91,6 +103,34 @@ namespace mongo {
private:
string _msg;
};
+
+ typedef map<int,ClientInfo*> ClientCache;
+
+ class ClientInfo {
+ public:
+ ClientInfo( int clientId );
+ ~ClientInfo();
+
+ void addShard( const string& shard );
+ set<string> * getPrev() const { return _prev; };
+
+ void newRequest();
+ void disconnect();
+
+ static ClientInfo * get( int clientId = 0 , bool create = true );
+
+ private:
+ int _id;
+ set<string> _a;
+ set<string> _b;
+ set<string> * _cur;
+ set<string> * _prev;
+ int _lastAccess;
+
+ static mutex _clientsLock;
+ static ClientCache _clients;
+ static thread_specific_ptr<ClientInfo> _tlInfo;
+ };
}
#include "strategy.h"
diff --git a/s/server.cpp b/s/server.cpp
index 0dffe9ccb46..a663100e4fc 100644
--- a/s/server.cpp
+++ b/s/server.cpp
@@ -64,6 +64,9 @@ namespace mongo {
virtual void onCreate( DBClientBase * conn ){
conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" );
}
+ virtual void onHandedOut( DBClientBase * conn ){
+ ClientInfo::get()->addShard( conn->getServerAddress() );
+ }
} shardingConnectionHook;
class ShardedMessageHandler : public MessageHandler {
@@ -72,7 +75,7 @@ namespace mongo {
virtual void process( Message& m , AbstractMessagingPort* p ){
Request r( m , p );
try {
- setClientId( p->remotePort() << 16 );
+ setClientId( r.getClientId() );
r.process();
}
catch ( DBException& e ){