/* connpool.cpp */ /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ // _ todo: reconnect? #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/connpool.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/client/syncclusterconnection.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" namespace mongo { using std::endl; using std::list; using std::map; using std::set; using std::string; using std::vector; // ------ PoolForHost ------ PoolForHost::~PoolForHost() { clear(); } void PoolForHost::clear() { while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); delete sc.conn; _pool.pop(); } } void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c) { bool isFailed = c->isFailed(); // Remember that this host had a broken connection for later if (isFailed) reportBadConnectionAt(c->getSockCreationMicroSec()); if (isFailed || // Another (later) connection was reported as broken to this host (c->getSockCreationMicroSec() < _minValidCreationTimeMicroSec) || // We have a pool size that we need to enforce (_maxPoolSize >= 0 && static_cast(_pool.size()) >= _maxPoolSize)) { pool->onDestroy(c); delete c; } else { // The connection is probably fine, save for later _pool.push(c); } } void PoolForHost::reportBadConnectionAt(uint64_t microSec) { if (microSec != DBClientBase::INVALID_SOCK_CREATION_TIME && microSec > _minValidCreationTimeMicroSec) { _minValidCreationTimeMicroSec = microSec; log() << "Detected bad connection created at " << _minValidCreationTimeMicroSec << " microSec, clearing pool for " << _hostName << " of " << _pool.size() << " connections" << endl; clear(); } } bool PoolForHost::isBadSocketCreationTime(uint64_t microSec) { return microSec != DBClientBase::INVALID_SOCK_CREATION_TIME && microSec <= _minValidCreationTimeMicroSec; } DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) { time_t now = time(0); while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); _pool.pop(); if ( ! sc.ok( now ) ) { pool->onDestroy( sc.conn ); delete sc.conn; continue; } verify( sc.conn->getSoTimeout() == socketTimeout ); return sc.conn; } return NULL; } void PoolForHost::flush() { while (!_pool.empty()) { StoredConnection c = _pool.top(); _pool.pop(); delete c.conn; } } void PoolForHost::getStaleConnections( vector& stale ) { time_t now = time(0); vector all; while ( ! _pool.empty() ) { StoredConnection c = _pool.top(); _pool.pop(); if ( c.ok( now ) ) all.push_back( c ); else stale.push_back( c.conn ); } for ( size_t i=0; iisStillConnected(); } void PoolForHost::createdOne( DBClientBase * base) { if ( _created == 0 ) _type = base->type(); _created++; } void PoolForHost::initializeHostName(const std::string& hostName) { if (_hostName.empty()) { _hostName = hostName; } } // ------ DBConnectionPool ------ DBConnectionPool pool; const int PoolForHost::kPoolSizeUnlimited(-1); DBConnectionPool::DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) , _maxPoolSize(PoolForHost::kPoolSizeUnlimited) , _hooks( new list() ) { } DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) { uassert(17382, "Can't use connection pool during shutdown", !inShutdown()); scoped_lock L(_mutex); PoolForHost& p = _pools[PoolKey(ident,socketTimeout)]; p.setMaxPoolSize(_maxPoolSize); p.initializeHostName(ident); return p.get( this , socketTimeout ); } DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) { { scoped_lock L(_mutex); PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.setMaxPoolSize(_maxPoolSize); p.initializeHostName(host); p.createdOne( conn ); } try { onCreate( conn ); onHandedOut( conn ); } catch ( std::exception & ) { delete conn; throw; } return conn; } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { try { onHandedOut( c ); } catch ( std::exception& ) { delete c; throw; } return c; } string errmsg; c = url.connect( errmsg, socketTimeout ); uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); return _finishCreate( url.toString() , socketTimeout , c ); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { DBClientBase * c = _get( host , socketTimeout ); if ( c ) { try { onHandedOut( c ); } catch ( std::exception& ) { delete c; throw; } return c; } string errmsg; ConnectionString cs = ConnectionString::parse( host , errmsg ); uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); c = cs.connect( errmsg, socketTimeout ); if ( ! c ) throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg ); return _finishCreate( host , socketTimeout , c ); } void DBConnectionPool::onRelease(DBClientBase* conn) { if (_hooks->empty()) { return; } for (list::iterator i = _hooks->begin(); i != _hooks->end(); i++) { (*i)->onRelease( conn ); } } void DBConnectionPool::release(const string& host, DBClientBase *c) { onRelease(c); scoped_lock L(_mutex); _pools[PoolKey(host,c->getSoTimeout())].done(this,c); } DBConnectionPool::~DBConnectionPool() { // connection closing is handled by ~PoolForHost } void DBConnectionPool::flush() { scoped_lock L(_mutex); for ( PoolMap::iterator i = _pools.begin(); i != _pools.end(); i++ ) { PoolForHost& p = i->second; p.flush(); } } void DBConnectionPool::clear() { scoped_lock L(_mutex); LOG(2) << "Removing connections on all pools owned by " << _name << endl; for (PoolMap::iterator iter = _pools.begin(); iter != _pools.end(); ++iter) { iter->second.clear(); } } void DBConnectionPool::removeHost( const string& host ) { scoped_lock L(_mutex); LOG(2) << "Removing connections from all pools for host: " << host << endl; for ( PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i ) { const string& poolHost = i->first.ident; if ( !serverNameCompare()(host, poolHost) && !serverNameCompare()(poolHost, host) ) { // hosts are the same i->second.clear(); } } } void DBConnectionPool::addHook( DBConnectionHook * hook ) { _hooks->push_back( hook ); } void DBConnectionPool::onCreate( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onCreate( conn ); } } void DBConnectionPool::onHandedOut( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onHandedOut( conn ); } } void DBConnectionPool::onDestroy( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onDestroy( conn ); } } void DBConnectionPool::appendInfo( BSONObjBuilder& b ) { int avail = 0; long long created = 0; map createdByType; BSONObjBuilder bb( b.subobjStart( "hosts" ) ); { scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { if ( i->second.numCreated() == 0 ) continue; string s = str::stream() << i->first.ident << "::" << i->first.timeout; BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); temp.appendNumber( "created" , i->second.numCreated() ); temp.done(); avail += i->second.numAvailable(); created += i->second.numCreated(); long long& x = createdByType[i->second.type()]; x += i->second.numCreated(); } } bb.done(); // Always report all replica sets being tracked set replicaSets = ReplicaSetMonitor::getAllTrackedSets(); BSONObjBuilder setBuilder( b.subobjStart( "replicaSets" ) ); for ( set::iterator i=replicaSets.begin(); i!=replicaSets.end(); ++i ) { string rs = *i; ReplicaSetMonitorPtr m = ReplicaSetMonitor::get( rs ); if ( ! m ) { warning() << "no monitor for set: " << rs << endl; continue; } BSONObjBuilder temp( setBuilder.subobjStart( rs ) ); m->appendInfo( temp ); temp.done(); } setBuilder.done(); { BSONObjBuilder temp( bb.subobjStart( "createdByType" ) ); for ( map::iterator i=createdByType.begin(); i!=createdByType.end(); ++i ) { temp.appendNumber( ConnectionString::typeToString( i->first ) , i->second ); } temp.done(); } b.append( "totalAvailable" , avail ); b.appendNumber( "totalCreated" , created ); } bool DBConnectionPool::serverNameCompare::operator()( const string& a , const string& b ) const{ const char* ap = a.c_str(); const char* bp = b.c_str(); while (true){ if (*ap == '\0' || *ap == '/'){ if (*bp == '\0' || *bp == '/') return false; // equal strings else return true; // a is shorter } if (*bp == '\0' || *bp == '/') return false; // b is shorter if ( *ap < *bp) return true; else if (*ap > *bp) return false; ++ap; ++bp; } verify(false); } bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const { if (DBConnectionPool::serverNameCompare()( a.ident , b.ident )) return true; if (DBConnectionPool::serverNameCompare()( b.ident , a.ident )) return false; return a.timeout < b.timeout; } bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* conn) { if (conn == NULL) { return false; } if (conn->isFailed()) { return false; } { scoped_lock sl(_mutex); PoolForHost& pool = _pools[PoolKey(hostName, conn->getSoTimeout())]; if (pool.isBadSocketCreationTime(conn->getSockCreationMicroSec())) { return false; } } return true; } void DBConnectionPool::taskDoWork() { vector toDelete; { // we need to get the connections inside the lock // but we can actually delete them outside scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { i->second.getStaleConnections( toDelete ); } } for ( size_t i=0; itype() == ConnectionString::MASTER ) (( DBClientConnection* ) _conn)->setSoTimeout( _socketTimeout ); else if( _conn->type() == ConnectionString::SYNC ) (( SyncClusterConnection* ) _conn)->setAllSoTimeouts( _socketTimeout ); } ScopedDbConnection::~ScopedDbConnection() { if ( _conn ) { if (_conn->isFailed()) { if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) { kill(); } else { // The pool takes care of deleting the failed connection - this // will also trigger disposal of older connections in the pool done(); } } else { /* see done() comments above for why we log this line */ log() << "scoped connection to " << _conn->getServerAddress() << " not being returned to the pool" << endl; kill(); } } } void ScopedDbConnection::clearPool() { pool.clear(); } AtomicInt32 AScopedConnection::_numConnections; } // namespace mongo