diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-07-12 13:48:02 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-07-12 13:48:20 -0400 |
commit | 68e0ade1059ad93a62e47eb0cff6602f0e854614 (patch) | |
tree | f57b69deed253b37c6772e0001f2a91f488351ae /client | |
parent | db033b9c7e836ab5f814718c2dc646976a3c2326 (diff) | |
download | mongo-68e0ade1059ad93a62e47eb0cff6602f0e854614.tar.gz |
fix connection pooling with socket timeouts
Diffstat (limited to 'client')
-rw-r--r-- | client/connpool.cpp | 56 | ||||
-rw-r--r-- | client/connpool.h | 19 | ||||
-rw-r--r-- | client/dbclient.h | 3 | ||||
-rw-r--r-- | client/dbclient_rs.h | 6 | ||||
-rw-r--r-- | client/syncclusterconnection.h | 2 |
5 files changed, 65 insertions, 21 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp index ab899b0f292..e94a78d1c45 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -46,17 +46,24 @@ namespace mongo { } } - DBClientBase * PoolForHost::get( DBConnectionPool * pool ) { + DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) { time_t now = time(0); - + while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); _pool.pop(); - if ( sc.ok( now ) ) - return sc.conn; - pool->onDestory( sc.conn ); - delete sc.conn; + + if ( ! sc.ok( now ) ) { + pool->onDestory( sc.conn ); + delete sc.conn; + continue; + } + + assert( sc.conn->getSoTimeout() == socketTimeout ); + + return sc.conn; + } return NULL; @@ -125,17 +132,17 @@ namespace mongo { _hooks( new list<DBConnectionHook*>() ) { } - DBClientBase* DBConnectionPool::_get(const string& ident) { + DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) { assert( ! inShutdown() ); scoped_lock L(_mutex); - PoolForHost& p = _pools[ident]; - return p.get( this ); + PoolForHost& p = _pools[PoolKey(ident,socketTimeout)]; + return p.get( this , socketTimeout ); } - DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) { + DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) { { scoped_lock L(_mutex); - PoolForHost& p = _pools[host]; + PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.createdOne( conn ); } @@ -146,7 +153,7 @@ namespace mongo { } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { - DBClientBase * c = _get( url.toString() ); + DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { onHandedOut( c ); return c; @@ -156,11 +163,11 @@ namespace mongo { c = url.connect( errmsg, socketTimeout ); uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); - return _finishCreate( url.toString() , c ); + return _finishCreate( url.toString() , socketTimeout , c ); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { - DBClientBase * c = _get( host ); + DBClientBase * c = _get( host , socketTimeout ); if ( c ) { onHandedOut( c ); return c; @@ -173,7 +180,7 @@ namespace mongo { c = cs.connect( errmsg, socketTimeout ); if ( ! c ) throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg ); - return _finishCreate( host , c ); + return _finishCreate( host , socketTimeout , c ); } void DBConnectionPool::release(const string& host, DBClientBase *c) { @@ -183,7 +190,7 @@ namespace mongo { return; } scoped_lock L(_mutex); - _pools[host].done(this,c); + _pools[PoolKey(host,c->getSoTimeout())].done(this,c); } @@ -244,7 +251,8 @@ namespace mongo { if ( i->second.numCreated() == 0 ) continue; - string s = i->first; + string s = str::stream() << i->first.ident << "::" << i->first.timeout; + BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); temp.appendNumber( "created" , i->second.numCreated() ); @@ -277,6 +285,20 @@ namespace mongo { return ap < bp; } + + bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const { + string ap = str::before( a.ident , "/" ); + string bp = str::before( b.ident , "/" ); + + if ( ap < bp ) + return true; + + if ( ap > bp ) + return false; + + return a.timeout < b.timeout; + } + void DBConnectionPool::taskDoWork() { vector<DBClientBase*> toDelete; diff --git a/client/connpool.h b/client/connpool.h index 3a9229f39ab..a37dad7ff51 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -55,7 +55,7 @@ namespace mongo { /** * gets a connection or return NULL */ - DBClientBase * get( DBConnectionPool * pool); + DBClientBase * get( DBConnectionPool * pool , double socketTimeout ); void done( DBConnectionPool * pool , DBClientBase * c ); @@ -77,6 +77,7 @@ namespace mongo { }; std::stack<StoredConnection> _pool; + long long _created; ConnectionString::ConnectionType _type; @@ -141,11 +142,21 @@ namespace mongo { private: DBConnectionPool( DBConnectionPool& p ); - DBClientBase* _get( const string& ident ); + DBClientBase* _get( const string& ident , double socketTimeout ); - DBClientBase* _finishCreate( const string& ident , DBClientBase* conn ); + DBClientBase* _finishCreate( const string& ident , double socketTimeout, DBClientBase* conn ); - typedef map<string,PoolForHost,serverNameCompare> PoolMap; // servername -> pool + struct PoolKey { + PoolKey( string i , double t ) : ident( i ) , timeout( t ) {} + string ident; + double timeout; + }; + + struct poolKeyCompare { + bool operator()( const PoolKey& a , const PoolKey& b ) const; + }; + + typedef map<PoolKey,PoolForHost,poolKeyCompare> PoolMap; // servername -> pool mongo::mutex _mutex; string _name; diff --git a/client/dbclient.h b/client/dbclient.h index 729424edf89..f48f279e9f5 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -799,6 +799,8 @@ namespace mongo { // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed virtual ConnectionString::ConnectionType type() const = 0; + + virtual double getSoTimeout() const = 0; }; // DBClientBase @@ -917,6 +919,7 @@ namespace mongo { virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 ); virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } void setSoTimeout(double to) { _so_timeout = to; } + double getSoTimeout() const { return _so_timeout; } static int getNumConnections() { return _numConnections; diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index d59bab0bb5c..4a0a832d9ca 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -228,6 +228,11 @@ namespace mongo { // ----- informational ---- + /** + * timeout not supported in DBClientReplicaSet yet + */ + double getSoTimeout() const { return 0; } + string toString() { return getServerAddress(); } string getServerAddress() const { return _monitor->getServerAddress(); } @@ -239,6 +244,7 @@ namespace mongo { virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ); virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); } + protected: virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); } diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index 6889492bb38..edd458fe683 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -92,6 +92,8 @@ namespace mongo { virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } void setAllSoTimeouts( double socketTimeout ); + double getSoTimeout() const { return _socketTimeout; } + virtual bool auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword); private: |