/* connpool.cpp */ /* Copyright 2009 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // _ todo: reconnect? #include "pch.h" #include "connpool.h" #include "../db/commands.h" #include "syncclusterconnection.h" #include "../s/shard.h" namespace mongo { // ------ PoolForHost ------ PoolForHost::~PoolForHost() { while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); delete sc.conn; _pool.pop(); } } void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) { if ( _pool.size() >= _maxPerHost ) { pool->onDestory( c ); delete c; } else { _pool.push(c); } } DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) { time_t now = time(0); while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); _pool.pop(); if ( ! sc.ok( now ) ) { pool->onDestory( sc.conn ); delete sc.conn; continue; } assert( sc.conn->getSoTimeout() == socketTimeout ); return sc.conn; } return NULL; } void PoolForHost::flush() { vector all; while ( ! _pool.empty() ) { StoredConnection c = _pool.top(); _pool.pop(); all.push_back( c ); bool res; c.conn->isMaster( res ); } for ( vector::iterator i=all.begin(); i != all.end(); ++i ) { _pool.push( *i ); } } void PoolForHost::getStaleConnections( vector& stale ) { time_t now = time(0); vector all; while ( ! _pool.empty() ) { StoredConnection c = _pool.top(); _pool.pop(); if ( c.ok( now ) ) all.push_back( c ); else stale.push_back( c.conn ); } for ( size_t i=0; itype(); _created++; } unsigned PoolForHost::_maxPerHost = 50; // ------ DBConnectionPool ------ DBConnectionPool pool; DBConnectionPool::DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) , _hooks( new list() ) { } DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) { assert( ! inShutdown() ); scoped_lock L(_mutex); PoolForHost& p = _pools[PoolKey(ident,socketTimeout)]; return p.get( this , socketTimeout ); } DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) { { scoped_lock L(_mutex); PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.createdOne( conn ); } onCreate( conn ); onHandedOut( conn ); return conn; } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { onHandedOut( c ); return c; } string errmsg; c = url.connect( errmsg, socketTimeout ); uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); return _finishCreate( url.toString() , socketTimeout , c ); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { DBClientBase * c = _get( host , socketTimeout ); if ( c ) { onHandedOut( c ); return c; } string errmsg; ConnectionString cs = ConnectionString::parse( host , errmsg ); uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); c = cs.connect( errmsg, socketTimeout ); if ( ! c ) throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg ); return _finishCreate( host , socketTimeout , c ); } void DBConnectionPool::release(const string& host, DBClientBase *c) { if ( c->isFailed() ) { onDestory( c ); delete c; return; } scoped_lock L(_mutex); _pools[PoolKey(host,c->getSoTimeout())].done(this,c); } DBConnectionPool::~DBConnectionPool() { // connection closing is handled by ~PoolForHost } void DBConnectionPool::flush() { scoped_lock L(_mutex); for ( PoolMap::iterator i = _pools.begin(); i != _pools.end(); i++ ) { PoolForHost& p = i->second; p.flush(); } } void DBConnectionPool::addHook( DBConnectionHook * hook ) { _hooks->push_back( hook ); } void DBConnectionPool::onCreate( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onCreate( conn ); } } void DBConnectionPool::onHandedOut( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onHandedOut( conn ); } } void DBConnectionPool::onDestory( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onDestory( conn ); } } void DBConnectionPool::appendInfo( BSONObjBuilder& b ) { int avail = 0; long long created = 0; map createdByType; set replicaSets; BSONObjBuilder bb( b.subobjStart( "hosts" ) ); { scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { if ( i->second.numCreated() == 0 ) continue; string s = str::stream() << i->first.ident << "::" << i->first.timeout; BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); temp.appendNumber( "created" , i->second.numCreated() ); temp.done(); avail += i->second.numAvailable(); created += i->second.numCreated(); long long& x = createdByType[i->second.type()]; x += i->second.numCreated(); { string setName = i->first.ident; if ( setName.find( "/" ) != string::npos ) { setName = setName.substr( 0 , setName.find( "/" ) ); replicaSets.insert( setName ); } } } } bb.done(); BSONObjBuilder setBuilder( b.subobjStart( "replicaSets" ) ); for ( set::iterator i=replicaSets.begin(); i!=replicaSets.end(); ++i ) { string rs = *i; ReplicaSetMonitorPtr m = ReplicaSetMonitor::get( rs ); if ( ! m ) { warning() << "no monitor for set: " << rs << endl; continue; } BSONObjBuilder temp( setBuilder.subobjStart( rs ) ); m->appendInfo( temp ); temp.done(); } setBuilder.done(); { BSONObjBuilder temp( bb.subobjStart( "createdByType" ) ); for ( map::iterator i=createdByType.begin(); i!=createdByType.end(); ++i ) { temp.appendNumber( ConnectionString::typeToString( i->first ) , i->second ); } temp.done(); } b.append( "totalAvailable" , avail ); b.appendNumber( "totalCreated" , created ); } bool DBConnectionPool::serverNameCompare::operator()( const string& a , const string& b ) const{ string ap = str::before( a , "/" ); string bp = str::before( b , "/" ); return ap < bp; } bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const { string ap = str::before( a.ident , "/" ); string bp = str::before( b.ident , "/" ); if ( ap < bp ) return true; if ( ap > bp ) return false; return a.timeout < b.timeout; } void DBConnectionPool::taskDoWork() { vector toDelete; { // we need to get the connections inside the lock // but we can actually delete them outside scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { i->second.getStaleConnections( toDelete ); } } for ( size_t i=0; itype() == ConnectionString::MASTER ) (( DBClientConnection* ) _conn)->setSoTimeout( _socketTimeout ); else if( _conn->type() == ConnectionString::SYNC ) (( SyncClusterConnection* ) _conn)->setAllSoTimeouts( _socketTimeout ); } ScopedDbConnection::~ScopedDbConnection() { if ( _conn ) { if ( ! _conn->isFailed() ) { /* see done() comments above for why we log this line */ log() << "~ScopedDbConnection: _conn != null" << endl; } kill(); } } ScopedDbConnection::ScopedDbConnection(const Shard& shard, double socketTimeout ) : _host( shard.getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const Shard* shard, double socketTimeout ) : _host( shard->getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) { _setSocketTimeout(); } class PoolFlushCmd : public Command { public: PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ) {} virtual void help( stringstream &help ) const { help<<"internal"; } virtual LockType locktype() const { return NONE; } virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) { pool.flush(); return true; } virtual bool slaveOk() const { return true; } } poolFlushCmd; class PoolStats : public Command { public: PoolStats() : Command( "connPoolStats" ) {} virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } virtual LockType locktype() const { return NONE; } virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) { pool.appendInfo( result ); result.append( "numDBClientConnection" , DBClientConnection::getNumConnections() ); result.append( "numAScopedConnection" , AScopedConnection::getNumConnections() ); return true; } virtual bool slaveOk() const { return true; } } poolStatsCmd; AtomicUInt AScopedConnection::_numConnections; } // namespace mongo