// shardconnection.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #define MONGO_PCH_WHITELISTED #include "mongo/platform/basic.h" #include "mongo/pch.h" #undef MONGO_PCH_WHITELISTED #include #include #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/db/server_parameters.h" #include "mongo/s/config.h" #include "mongo/s/request.h" #include "mongo/s/shard.h" #include "mongo/s/stale_exception.h" #include "mongo/s/version_manager.h" #include "mongo/server.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/stacktrace.h" namespace mongo { using std::auto_ptr; using std::endl; using std::map; using std::set; using std::string; using std::stringstream; using std::vector; DBConnectionPool shardConnectionPool; class ClientConnections; /** * Class which tracks ClientConnections (the client connection pool) for each incoming * connection, allowing stats access. */ class ActiveClientConnections { public: ActiveClientConnections() : _mutex( "ActiveClientConnections" ) { } void add( const ClientConnections* cc ) { scoped_lock lock( _mutex ); _clientConnections.insert( cc ); } void remove( const ClientConnections* cc ) { scoped_lock lock( _mutex ); _clientConnections.erase( cc ); } // Implemented after ClientConnections void appendInfo( BSONObjBuilder& b ); private: mongo::mutex _mutex; set _clientConnections; } activeClientConnections; /** * Command to allow access to the sharded conn pool information in mongos. * TODO: Refactor with other connection pooling changes */ class ShardedPoolStats : public Command { public: ShardedPoolStats() : Command( "shardConnPoolStats" ) {} virtual void help( stringstream &help ) const { help << "stats about the shard connection pool"; } virtual bool isWriteCommandForConfigServer() const { return false; } virtual bool slaveOk() const { return true; } // Same privs as connPoolStats virtual void addRequiredPrivileges( const std::string& dbname, const BSONObj& cmdObj, std::vector* out ) { ActionSet actions; actions.addAction( ActionType::connPoolStats ); out->push_back( Privilege( ResourcePattern::forClusterResource(), actions ) ); } virtual bool run( OperationContext* txn, const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool ) { // Base pool info shardConnectionPool.appendInfo( result ); // Thread connection info activeClientConnections.appendInfo( result ); return true; } } shardedPoolStatsCmd; /** * holds all the actual db connections for a client to various servers * 1 per thread, so doesn't have to be thread safe */ class ClientConnections : boost::noncopyable { public: struct Status : boost::noncopyable { Status() : created(0), avail(0) {} // May be read concurrently, but only written from // this thread. long long created; DBClientBase* avail; }; // Gets or creates the status object for the host Status* _getStatus( const string& addr ) { scoped_spinlock lock( _lock ); Status* &temp = _hosts[addr]; if ( ! temp ) temp = new Status(); return temp; } ClientConnections() { // Start tracking client connections activeClientConnections.add( this ); } ~ClientConnections() { // Stop tracking these client connections activeClientConnections.remove( this ); releaseAll( true ); } void releaseAll( bool fromDestructor = false ) { // Don't need spinlock protection because if not in the destructor, we don't // modify _hosts, and if in the destructor we are not accessible to external // threads. for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { string addr = i->first; Status* ss = i->second; verify( ss ); if ( ss->avail ) { /* if we're shutting down, don't want to initiate release mechanism as it is slow, and isn't needed since all connections will be closed anyway */ if ( inShutdown() ) { if( versionManager.isVersionableCB( ss->avail ) ) versionManager.resetShardVersionCB( ss->avail ); delete ss->avail; } else release( addr , ss->avail ); ss->avail = 0; } if ( fromDestructor ) delete ss; } if ( fromDestructor ) _hosts.clear(); } DBClientBase * get( const string& addr , const string& ns ) { { // We want to report ns stats scoped_spinlock lock(_lock); if (ns.size() > 0) _seenNS.insert(ns); } Status* s = _getStatus( addr ); auto_ptr c; // Handles cleanup if there's an exception thrown if ( s->avail ) { c.reset( s->avail ); s->avail = 0; shardConnectionPool.onHandedOut( c.get() ); // May throw an exception } else { c.reset( shardConnectionPool.get( addr ) ); s->created++; // After, so failed creation doesn't get counted } return c.release(); } void done( const string& addr , DBClientBase* conn ) { Status* s = _hosts[addr]; verify( s ); const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn); if (s->avail != NULL) { warning() << "Detected additional sharded connection in the " "thread local pool for " << addr << endl; if (DBException::traceExceptions) { // There shouldn't be more than one connection checked out to the same // host on the same thread. printStackTrace(); } if (!isConnGood) { delete s->avail; s->avail = NULL; } // Let the internal pool handle the bad connection, this can also // update the lower bounds for the known good socket creation time // for this host. release(addr, conn); return; } if (!isConnGood) { // Let the internal pool handle the bad connection. release(addr, conn); return; } // Note: Although we try our best to clear bad connections as much as possible, // some of them can still slip through because of how ClientConnections are being // used - as thread local variables. This means that threads won't be able to // see the s->avail connection of other threads. s->avail = conn; } void sync() { for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { string addr = i->first; Status* ss = i->second; if ( ss->avail ) ss->avail->getLastError(); } } void checkVersions( const string& ns ) { vector all; Shard::getAllShards( all ); // Don't report exceptions here as errors in GetLastError LastError::Disabled ignoreForGLE(lastError.get(false)); // Now only check top-level shard connections for ( unsigned i=0; iavail ) { s->avail = shardConnectionPool.get( sconnString ); s->created++; // After, so failed creation doesn't get counted } versionManager.checkShardVersionCB( s->avail, ns, false, 1 ); } catch ( const DBException& ex ) { warning() << "problem while initially checking shard versions on" << " " << shard.getName() << causedBy( ex ) << endl; // NOTE: This is only a heuristic, to avoid multiple stale version retries // across multiple shards, and does not affect correctness. } } } void release( const string& addr , DBClientBase * conn ) { shardConnectionPool.release( addr , conn ); } /** * Appends info about the client connection pool to a BOBuilder * Safe to call with activeClientConnections lock */ void appendInfo( BSONObjBuilder& b ) const { scoped_spinlock lock( _lock ); BSONArrayBuilder hostsArrB( b.subarrayStart( "hosts" ) ); for ( HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i ) { BSONObjBuilder bb( hostsArrB.subobjStart() ); bb.append( "host", i->first ); bb.append( "created", i->second->created ); bb.appendBool( "avail", static_cast( i->second->avail ) ); bb.done(); } hostsArrB.done(); BSONArrayBuilder nsArrB( b.subarrayStart( "seenNS" ) ); for ( set::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i ) { nsArrB.append(*i); } nsArrB.done(); } // Protects only the creation of new entries in the _hosts and _seenNS map // from external threads. Reading _hosts / _seenNS in this thread doesn't // need protection. mutable SpinLock _lock; typedef map HostMap; HostMap _hosts; set _seenNS; /** * Clears the connections kept by this pool (ie, not including the global pool) */ void clearPool() { for(HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) { if (iter->second->avail != NULL) { delete iter->second->avail; } delete iter->second; } _hosts.clear(); } void forgetNS( const string& ns ) { scoped_spinlock lock( _lock ); _seenNS.erase( ns ); } // ----- static thread_specific_ptr _perThread; static ClientConnections* threadInstance() { ClientConnections* cc = _perThread.get(); if ( ! cc ) { cc = new ClientConnections(); _perThread.reset( cc ); } return cc; } }; thread_specific_ptr ClientConnections::_perThread; /** * Appends info about all active client shard connections to a BOBuilder */ void ActiveClientConnections::appendInfo( BSONObjBuilder& b ) { BSONArrayBuilder arr( 64 * 1024 ); // There may be quite a few threads { scoped_lock lock( _mutex ); for ( set::const_iterator i = _clientConnections.begin(); i != _clientConnections.end(); ++i ) { BSONObjBuilder bb( arr.subobjStart() ); (*i)->appendInfo( bb ); bb.done(); } } b.appendArray( "threads", arr.obj() ); } ShardConnection::ShardConnection( const Shard * s , const string& ns, ChunkManagerPtr manager ) : _addr( s->getConnString() ) , _ns( ns ), _manager( manager ) { _init(); } ShardConnection::ShardConnection( const Shard& s , const string& ns, ChunkManagerPtr manager ) : _addr( s.getConnString() ) , _ns( ns ), _manager( manager ) { _init(); } ShardConnection::ShardConnection( const string& addr , const string& ns, ChunkManagerPtr manager ) : _addr( addr ) , _ns( ns ), _manager( manager ) { _init(); } void usingAShardConnection( const string& addr ); void ShardConnection::_init() { verify( _addr.size() ); _conn = ClientConnections::threadInstance()->get( _addr , _ns ); _finishedInit = false; usingAShardConnection( _addr ); } void ShardConnection::_finishInit() { if ( _finishedInit ) return; _finishedInit = true; if (versionManager.isVersionableCB(_conn)) { // Make sure we specified a manager for the correct namespace if (_ns.size() && _manager) verify(_manager->getns() == _ns); _setVersion = versionManager.checkShardVersionCB( this , false , 1 ); } else { // Make sure we didn't specify a manager for a non-versionable connection (i.e. config) verify(!_manager); _setVersion = false; } } void ShardConnection::done() { if ( _conn ) { ClientConnections::threadInstance()->done( _addr , _conn ); _conn = 0; _finishedInit = true; } } void ShardConnection::kill() { if ( _conn ) { if( versionManager.isVersionableCB( _conn ) ) versionManager.resetShardVersionCB( _conn ); if (_conn->isFailed()) { // Let the pool know about the bad connection and also delegate disposal to it. ClientConnections::threadInstance()->done(_addr, _conn); } else { delete _conn; } _conn = 0; _finishedInit = true; } } void ShardConnection::sync() { ClientConnections::threadInstance()->sync(); } void ShardConnection::checkMyConnectionVersions( const string & ns ) { ClientConnections::threadInstance()->checkVersions( ns ); } ShardConnection::~ShardConnection() { if ( _conn ) { if (_conn->isFailed()) { if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) { kill(); } else { // The pool takes care of deleting the failed connection - this // will also trigger disposal of older connections in the pool done(); } } else { /* see done() comments above for why we log this line */ log() << "sharded connection to " << _conn->getServerAddress() << " not being returned to the pool" << endl; kill(); } } } void ShardConnection::releaseMyConnections() { ClientConnections::threadInstance()->releaseAll(); } void ShardConnection::clearPool() { shardConnectionPool.clear(); ClientConnections::threadInstance()->clearPool(); } void ShardConnection::forgetNS( const string& ns ) { ClientConnections::threadInstance()->forgetNS( ns ); } }