diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-03-30 17:03:52 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-03-31 03:12:43 -0400 |
commit | 134e8eb0e0913747ac648875514332fe9949d4cd (patch) | |
tree | 9d74bf3c35cf7920fd39fe18b890cc1d4774a63f | |
parent | 7c5b035e1bad0a4f1d84adde04df49b57dae7828 (diff) | |
download | mongo-134e8eb0e0913747ac648875514332fe9949d4cd.tar.gz |
remove all clientId related code and systems, and just use tls SERVER-2872
-rw-r--r-- | db/lasterror.cpp | 134 | ||||
-rw-r--r-- | db/lasterror.h | 19 | ||||
-rw-r--r-- | s/client.cpp | 107 | ||||
-rw-r--r-- | s/client.h | 18 | ||||
-rw-r--r-- | s/request.cpp | 5 | ||||
-rw-r--r-- | s/request.h | 4 | ||||
-rw-r--r-- | s/server.cpp | 17 | ||||
-rw-r--r-- | s/writeback_listener.cpp | 2 | ||||
-rw-r--r-- | util/message.cpp | 29 | ||||
-rw-r--r-- | util/message.h | 5 | ||||
-rw-r--r-- | util/message_server.h | 4 | ||||
-rw-r--r-- | util/message_server_port.cpp | 10 |
12 files changed, 89 insertions, 265 deletions
diff --git a/db/lasterror.cpp b/db/lasterror.cpp index ba52111c883..4643aa9bfea 100644 --- a/db/lasterror.cpp +++ b/db/lasterror.cpp @@ -28,7 +28,6 @@ namespace mongo { LastError LastError::noError; LastErrorHolder lastError; - mongo::mutex LastErrorHolder::_idsmutex("LastErrorHolder"); bool isShell = false; void raiseError(int code , const char *msg) { @@ -79,22 +78,9 @@ namespace mongo { } LastErrorHolder::~LastErrorHolder() { - for ( IDMap::iterator i = _ids.begin(); i != _ids.end(); ++i ) { - delete i->second.lerr; - i->second.lerr = 0; - } - _ids.clear(); } - void LastErrorHolder::setID( int id ) { - _id.set( id ); - } - - int LastErrorHolder::getID() { - return _id.get(); - } - LastError * LastErrorHolder::disableForCommand() { LastError *le = _get(); assert( le ); @@ -111,77 +97,31 @@ namespace mongo { } LastError * LastErrorHolder::_get( bool create ) { - int id = _id.get(); - if ( id == 0 ) { - LastError * le = _tl.get(); - if ( ! le && create ) { - le = new LastError(); - _tl.reset( le ); - } - return le; - } - - scoped_lock lock(_idsmutex); - map<int,Status>::iterator i = _ids.find( id ); - if ( i == _ids.end() ) { - if ( ! create ) - return 0; - - LastError * le = new LastError(); - Status s; - s.time = time(0); - s.lerr = le; - _ids[id] = s; - return le; + LastError * le = _tl.get(); + if ( ! le && create ) { + le = new LastError(); + _tl.reset( le ); } - - Status &status = i->second; - status.time = time(0); - return status.lerr; - } - - void LastErrorHolder::remove( int id ) { - scoped_lock lock(_idsmutex); - map<int,Status>::iterator i = _ids.find( id ); - if ( i == _ids.end() ) - return; - - delete i->second.lerr; - _ids.erase( i ); + return le; } void LastErrorHolder::release() { - int id = _id.get(); - if ( id == 0 ) { - _tl.release(); - return; - } - - remove( id ); + _tl.release(); } /** ok to call more than once. */ void LastErrorHolder::initThread() { - if( _tl.get() ) return; - assert( _id.get() == 0 ); - _tl.reset( new LastError() ); + if( ! _tl.get() ) + _tl.reset( new LastError() ); } void LastErrorHolder::reset( LastError * le ) { - int id = _id.get(); - if ( id == 0 ) { - _tl.reset( le ); - return; - } - - scoped_lock lock(_idsmutex); - Status & status = _ids[id]; - status.time = time(0); - status.lerr = le; + _tl.reset( le ); } void prepareErrForNewRequest( Message &m, LastError * err ) { // a killCursors message shouldn't affect last error + assert( err ); if ( m.operation() == dbKillCursors ) { err->disabled = true; } @@ -191,60 +131,10 @@ namespace mongo { } } - LastError * LastErrorHolder::startRequest( Message& m , int clientId ) { - assert( clientId ); - setID( clientId ); - - LastError * le = _get( true ); + LastError * LastErrorHolder::startRequest( Message& m , LastError * le ) { + assert( le ); prepareErrForNewRequest( m, le ); return le; } - void LastErrorHolder::startRequest( Message& m , LastError * connectionOwned ) { - prepareErrForNewRequest( m, connectionOwned ); - } - - void LastErrorHolder::disconnect( int clientId ) { - if ( clientId ) - remove(clientId); - } - - struct LastErrorHolderTest : public UnitTest { - public: - - void test( int i ) { - _tl.set( i ); - assert( _tl.get() == i ); - } - - void tlmaptest() { - test( 1 ); - test( 12123123 ); - test( -123123 ); - test( numeric_limits<int>::min() ); - test( numeric_limits<int>::max() ); - } - - void run() { - tlmaptest(); - - LastError * a = new LastError(); - LastError * b = new LastError(); - - LastErrorHolder holder; - holder.reset( a ); - assert( a == holder.get() ); - holder.setID( 1 ); - assert( 0 == holder.get() ); - holder.reset( b ); - assert( b == holder.get() ); - holder.setID( 0 ); - assert( a == holder.get() ); - - holder.remove( 1 ); - } - - ThreadLocalValue<int> _tl; - } lastErrorHolderTest; - } // namespace mongo diff --git a/db/lasterror.h b/db/lasterror.h index c77ec740f03..86250e496a8 100644 --- a/db/lasterror.h +++ b/db/lasterror.h @@ -100,14 +100,14 @@ namespace mongo { extern class LastErrorHolder { public: - LastErrorHolder() : _id( 0 ) {} + LastErrorHolder(){} ~LastErrorHolder(); LastError * get( bool create = false ); LastError * getSafe() { LastError * le = get(false); if ( ! le ) { - log( LL_ERROR ) << " no LastError! id: " << getID() << endl; + error() << " no LastError!" << endl; assert( le ); } return le; @@ -120,18 +120,12 @@ namespace mongo { /** ok to call more than once. */ void initThread(); - /** - * id of 0 means should use thread local management - */ - void setID( int id ); int getID(); - - void remove( int id ); + void release(); /** when db receives a message/request, call this */ - void startRequest( Message& m , LastError * connectionOwned ); - LastError * startRequest( Message& m , int clientId ); + LastError * startRequest( Message& m , LastError * connectionOwned ); void disconnect( int clientId ); @@ -139,17 +133,12 @@ namespace mongo { // disable causes get() to return 0. LastError *disableForCommand(); // only call once per command invocation! private: - ThreadLocalValue<int> _id; boost::thread_specific_ptr<LastError> _tl; struct Status { time_t time; LastError *lerr; }; - typedef map<int,Status> IDMap; - - static mongo::mutex _idsmutex; - IDMap _ids; } lastError; void raiseError(int code , const char *msg); diff --git a/s/client.cpp b/s/client.cpp index 1289b79b46f..e1cb0229914 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -36,7 +36,7 @@ namespace mongo { - ClientInfo::ClientInfo( int clientId ) : _id( clientId ) { + ClientInfo::ClientInfo() { _cur = &_a; _prev = &_b; _autoSplitOk = true; @@ -44,13 +44,6 @@ namespace mongo { } ClientInfo::~ClientInfo() { - if ( _lastAccess ) { - scoped_lock lk( _clientsLock ); - Cache::iterator i = _clients.find( _id ); - if ( i != _clients.end() ) { - _clients.erase( i ); - } - } } void ClientInfo::addShard( const string& shard ) { @@ -79,49 +72,19 @@ namespace mongo { _cur->clear(); } - void ClientInfo::disconnect() { - _lastAccess = 0; - } - - ClientInfo * ClientInfo::get( int clientId , bool create ) { - - if ( ! clientId ) - clientId = getClientId(); - - if ( ! clientId ) { - ClientInfo * info = _tlInfo.get(); - if ( ! info ) { - info = new ClientInfo( 0 ); - _tlInfo.reset( info ); - } + ClientInfo * ClientInfo::get() { + ClientInfo * info = _tlInfo.get(); + if ( ! info ) { + info = new ClientInfo(); + _tlInfo.reset( info ); info->newRequest(); - return info; } - - scoped_lock lk( _clientsLock ); - Cache::iterator i = _clients.find( clientId ); - if ( i != _clients.end() ) - return i->second; - if ( ! create ) - return 0; - ClientInfo * info = new ClientInfo( clientId ); - _clients[clientId] = info; return info; } - void ClientInfo::disconnect( int clientId ) { - if ( ! clientId ) - return; - - scoped_lock lk( _clientsLock ); - Cache::iterator i = _clients.find( clientId ); - if ( i == _clients.end() ) - return; - - ClientInfo* ci = i->second; - ci->disconnect(); - delete ci; - _clients.erase( i ); + void ClientInfo::disconnect() { + // should be handled by TL cleanup + _lastAccess = 0; } void ClientInfo::_addWriteBack( vector<WBInfo>& all , const BSONObj& gle ) { @@ -142,14 +105,14 @@ namespace mongo { vector<BSONObj> ClientInfo::_handleWriteBacks( vector<WBInfo>& all , bool fromWriteBackListener ) { vector<BSONObj> res; + + if ( all.size() == 0 ) + return res; if ( fromWriteBackListener ) { LOG(1) << "not doing recusrive writeback" << endl; return res; } - - if ( all.size() == 0 ) - return res; for ( unsigned i=0; i<all.size(); i++ ) { res.push_back( WriteBackListener::waitFor( all[i].connectionId , all[i].id ) ); @@ -182,16 +145,16 @@ namespace mongo { ok = conn->runCommand( "admin" , options , res ); } catch( std::exception &e ){ - - warning() << "Could not get last error." << m_error_message( e.what() ) << endl; - - // Catch everything that happens here, since we need to ensure we return our connection when we're + + warning() << "Could not get last error." << m_error_message( e.what() ) << endl; + + // Catch everything that happens here, since we need to ensure we return our connection when we're // finished. conn.done(); - + return false; } - + res = res.getOwned(); conn.done(); @@ -219,6 +182,7 @@ namespace mongo { assert( v.size() == 1 ); result.appendElements( v[0] ); result.appendElementsUnique( res ); + result.append( "writebackGLE" , v[0] ); result.append( "initialGLEHost" , theShard ); } } @@ -231,8 +195,11 @@ namespace mongo { } BSONArrayBuilder bbb( result.subarrayStart( "shards" ) ); + BSONObjBuilder shardRawGLE; long long n = 0; + + int updatedExistingStat = 0; // 0 is none, -1 has but false, 1 has true // hit each shard vector<string> errors; @@ -243,20 +210,21 @@ namespace mongo { ShardConnection conn( theShard , "" ); BSONObj res; bool ok = false; - try{ - ok = conn->runCommand( "admin" , options , res ); + try { + ok = conn->runCommand( "admin" , options , res ); + shardRawGLE.append( theShard , res ); } catch( std::exception &e ){ // Safe to return here, since we haven't started any extra processing yet, just collecting // responses. - + warning() << "Could not get last error." << m_error_message( e.what() ) << endl; - conn.done(); - - return false; - } - + conn.done(); + + return false; + } + _addWriteBack( writebacks, res ); string temp = DBClientWithCommands::getLastErrorString( res ); @@ -264,13 +232,24 @@ namespace mongo { errors.push_back( temp ); errorObjects.push_back( res ); } + n += res["n"].numberLong(); + if ( res["updatedExisting"].type() ) { + if ( res["updatedExisting"].trueValue() ) + updatedExistingStat = 1; + else if ( updatedExistingStat == 0 ) + updatedExistingStat = -1; + } + conn.done(); } bbb.done(); + result.append( "shardRawGLE" , shardRawGLE.obj() ); result.appendNumber( "n" , n ); + if ( updatedExistingStat ) + result.appendBool( "updatedExisting" , updatedExistingStat > 0 ); // hit other machines just to block for ( set<string>::const_iterator i=sinceLastGetError().begin(); i!=sinceLastGetError().end(); ++i ) { @@ -313,8 +292,6 @@ namespace mongo { return true; } - ClientInfo::Cache& ClientInfo::_clients = *(new ClientInfo::Cache()); - mongo::mutex ClientInfo::_clientsLock("_clientsLock"); boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo; } // namespace mongo diff --git a/s/client.h b/s/client.h index bd4295fe22b..2e9fefea6e3 100644 --- a/s/client.h +++ b/s/client.h @@ -26,11 +26,8 @@ namespace mongo { * currently implemented with a thread local */ class ClientInfo { - - typedef map<int,ClientInfo*> Cache; - public: - ClientInfo( int clientId ); + ClientInfo(); ~ClientInfo(); /** new request from client, adjusts internal state */ @@ -54,7 +51,7 @@ namespace mongo { * gets shards used on the previous request */ set<string> * getPrev() const { return _prev; }; - + /** * gets all shards we've accessed since the last time we called clearSinceLastGetError */ @@ -65,6 +62,12 @@ namespace mongo { */ void clearSinceLastGetError() { _sinceLastGetError.clear(); } + + /** + * resets the list of shards using to process the current request + */ + void clearCurrentShards(){ _cur->clear(); } + /** * calls getLastError * resets shards since get last error @@ -77,8 +80,7 @@ namespace mongo { void noAutoSplit() { _autoSplitOk = false; } - static ClientInfo * get( int clientId = 0 , bool create = true ); - static void disconnect( int clientId ); + static ClientInfo * get(); private: @@ -111,8 +113,6 @@ namespace mongo { int _lastAccess; bool _autoSplitOk; - static mongo::mutex _clientsLock; - static Cache& _clients; static boost::thread_specific_ptr<ClientInfo> _tlInfo; }; diff --git a/s/request.cpp b/s/request.cpp index 52f2e547bf9..32c17cc282c 100644 --- a/s/request.cpp +++ b/s/request.cpp @@ -41,8 +41,7 @@ namespace mongo { assert( _d.getns() ); _id = _m.header()->id; - _clientId = p ? p->getClientId() : 0; - _clientInfo = ClientInfo::get( _clientId ); + _clientInfo = ClientInfo::get(); _clientInfo->newRequest( p ); } @@ -74,7 +73,7 @@ namespace mongo { } _m.header()->id = _id; - + _clientInfo->clearCurrentShards(); } Shard Request::primaryShard() const { diff --git a/s/request.h b/s/request.h index 5b4c228588b..7c51e5c9d9b 100644 --- a/s/request.h +++ b/s/request.h @@ -66,9 +66,6 @@ namespace mongo { return _chunkManager; } - int getClientId() const { - return _clientId; - } ClientInfo * getClientInfo() const { return _clientInfo; } @@ -103,7 +100,6 @@ namespace mongo { DBConfigPtr _config; ChunkManagerPtr _chunkManager; - int _clientId; ClientInfo * _clientInfo; OpCounters* _counter; diff --git a/s/server.cpp b/s/server.cpp index 9bdeedeec6d..51f30f16a2a 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -77,19 +77,19 @@ namespace mongo { public: virtual ~ShardedMessageHandler() {} - virtual void process( Message& m , AbstractMessagingPort* p ) { + virtual void connected( AbstractMessagingPort* p ) { + assert( ClientInfo::get() ); + } + + virtual void process( Message& m , AbstractMessagingPort* p , LastError * le) { assert( p ); Request r( m , p ); - LastError * le = lastError.startRequest( m , r.getClientId() ); - assert( le ); + assert( le ); + lastError.startRequest( m , le ); - if ( logLevel > 5 ) { - log(5) << "client id: " << hex << r.getClientId() << "\t" << r.getns() << "\t" << dec << r.op() << endl; - } try { r.init(); - setClientId( r.getClientId() ); r.process(); } catch ( AssertionException & e ) { @@ -119,8 +119,7 @@ namespace mongo { } virtual void disconnected( AbstractMessagingPort* p ) { - ClientInfo::disconnect( p->getClientId() ); - lastError.disconnect( p->getClientId() ); + // all things are thread local } }; diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index 21d59d0bae6..3051013d747 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -159,7 +159,7 @@ namespace mongo { DBConfigPtr db = grid.getDBConfig( ns ); ShardChunkVersion needVersion( data["version"] ); - log(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString() + LOG(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString() << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3) if ( logLevel ) log(1) << debugString( m ) << endl; diff --git a/util/message.cpp b/util/message.cpp index 37099dcbbc5..916aa342ce9 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -703,7 +703,6 @@ again: MSGID NextMsgId; - ThreadLocalValue<int> clientId; struct MsgStart { MsgStart() { @@ -721,14 +720,6 @@ again: return op == dbQuery || op == dbGetMore; } - void setClientId( int id ) { - clientId.set( id ); - } - - int getClientId() { - return clientId.get(); - } - const int DEFAULT_MAX_CONN = 20000; const int MAX_MAX_CONN = 20000; @@ -773,24 +764,4 @@ again: TicketHolder connTicketHolder(DEFAULT_MAX_CONN); - int AbstractMessagingPort::getClientId() { - if ( _clientId == 0 ) { - /** - * highest 2 bytes is port - * this is unique except when there are multiple ips - * or can be issue with high connection churn - * - * lowest 2 bytes is part of the address - * space there is 128 * 2^16 - * so if there is 8gb of heap area for sockets, then this 100% unique - */ - DEV assert( sizeof( MessagingPort ) > 128 ); - int x = remotePort(); - x = x << 16; - x |= ( ( (long long)this >> 7 ) & 0xFFFF ); // lowest 7 bits isn't helpful - _clientId = x; - } - return _clientId; - } - } // namespace mongo diff --git a/util/message.h b/util/message.h index 04cdc968d15..f1144454a43 100644 --- a/util/message.h +++ b/util/message.h @@ -85,8 +85,6 @@ namespace mongo { virtual HostAndPort remote() const = 0; virtual unsigned remotePort() const = 0; - virtual int getClientId(); - private: int _clientId; }; @@ -469,9 +467,6 @@ namespace mongo { MSGID nextMessageId(); - void setClientId( int id ); - int getClientId(); - extern TicketHolder connTicketHolder; class ElapsedTracker { diff --git a/util/message_server.h b/util/message_server.h index 39375c8b2ba..defae0b59ed 100644 --- a/util/message_server.h +++ b/util/message_server.h @@ -29,7 +29,9 @@ namespace mongo { class MessageHandler { public: virtual ~MessageHandler() {} - virtual void process( Message& m , AbstractMessagingPort* p ) = 0; + + virtual void connected( AbstractMessagingPort* p ) = 0; + virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0; virtual void disconnected( AbstractMessagingPort* p ) = 0; }; diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp index 6d00628c8ad..76bd78d516c 100644 --- a/util/message_server_port.cpp +++ b/util/message_server_port.cpp @@ -23,6 +23,7 @@ #include "message_server.h" #include "../db/cmdline.h" +#include "../db/lasterror.h" #include "../db/stats/counters.h" namespace mongo { @@ -38,14 +39,19 @@ namespace mongo { setThreadName( "conn" ); - auto_ptr<MessagingPort> p( inPort ); + scoped_ptr<MessagingPort> p( inPort ); string otherSide; Message m; try { + LastError * le = new LastError(); + lastError.reset( le ); // lastError now has ownership + otherSide = p->farEnd.toString(); + handler->connected( p.get() ); + while ( 1 ) { m.reset(); p->clearCounters(); @@ -57,7 +63,7 @@ namespace mongo { break; } - handler->process( m , p.get() ); + handler->process( m , p.get() , le ); networkCounter.hit( p->getBytesIn() , p->getBytesOut() ); } } |