/* connpool.cpp */ /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ // _ todo: reconnect? #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/connpool.h" #include #include #include "mongo/base/init.h" #include "mongo/client/connection_string.h" #include "mongo/client/dbclient_connection.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/stdx/chrono.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" #if !defined(__has_feature) #define __has_feature(x) 0 #endif #if __has_feature(address_sanitizer) #include #endif namespace mongo { namespace { const int kDefaultIdleTimeout = std::numeric_limits::max(); const int kDefaultMaxInUse = std::numeric_limits::max(); } // namespace using std::endl; using std::list; using std::map; using std::set; using std::string; using std::vector; // ------ PoolForHost ------ PoolForHost::PoolForHost() : _created(0), _minValidCreationTimeMicroSec(0), _type(ConnectionString::INVALID), _maxPoolSize(kPoolSizeUnlimited), _maxInUse(kDefaultMaxInUse), _checkedOut(0), _badConns(0), _parentDestroyed(false), _inShutdown(false) {} PoolForHost::~PoolForHost() { clear(); } void PoolForHost::clear() { if (!_parentDestroyed) { logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of " << _socketTimeoutSecs << " seconds)"; } _pool = decltype(_pool){}; } void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) { std::unique_ptr c{c_raw}; const bool isFailed = c->isFailed(); --_checkedOut; // Remember that this host had a broken connection for later if (isFailed) { reportBadConnectionAt(c->getSockCreationMicroSec()); } // Another (later) connection was reported as broken to this host bool isBroken = c->getSockCreationMicroSec() < _minValidCreationTimeMicroSec; if (isFailed || isBroken) { _badConns++; logNoCache() << "Ending connection to host " << _hostName << "(with timeout of " << _socketTimeoutSecs << " seconds)" << " due to bad connection status; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); } else if (_maxPoolSize >= 0 && static_cast(_pool.size()) >= _maxPoolSize) { // We have a pool size that we need to enforce logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of " << _socketTimeoutSecs << " seconds)" << " because the pool meets constraints; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); } else { // The connection is probably fine, save for later _pool.push(std::move(c)); } } void PoolForHost::reportBadConnectionAt(uint64_t microSec) { if (microSec != DBClientBase::INVALID_SOCK_CREATION_TIME && microSec > _minValidCreationTimeMicroSec) { _minValidCreationTimeMicroSec = microSec; logNoCache() << "Detected bad connection created at " << _minValidCreationTimeMicroSec << " microSec, clearing pool for " << _hostName << " of " << openConnections() << " connections" << endl; clear(); } } bool PoolForHost::isBadSocketCreationTime(uint64_t microSec) { return microSec != DBClientBase::INVALID_SOCK_CREATION_TIME && microSec <= _minValidCreationTimeMicroSec; } DBClientBase* PoolForHost::get(DBConnectionPool* pool, double socketTimeout) { while (!_pool.empty()) { auto sc = std::move(_pool.top()); _pool.pop(); if (!sc.ok()) { _badConns++; pool->onDestroy(sc.conn.get()); continue; } verify(sc.conn->getSoTimeout() == socketTimeout); ++_checkedOut; return sc.conn.release(); } return nullptr; } void PoolForHost::flush() { clear(); } void PoolForHost::getStaleConnections(Date_t idleThreshold, vector& stale) { vector all; while (!_pool.empty()) { StoredConnection c = std::move(_pool.top()); _pool.pop(); if (c.ok() && !c.addedBefore(idleThreshold)) { all.push_back(std::move(c)); } else { _badConns++; stale.emplace_back(c.conn.release()); } } for (auto& conn : all) { _pool.push(std::move(conn)); } } PoolForHost::StoredConnection::StoredConnection(std::unique_ptr c) : conn(std::move(c)), added(Date_t::now()) {} bool PoolForHost::StoredConnection::ok() { // Poke the connection to see if we're still ok return conn->isStillConnected(); } bool PoolForHost::StoredConnection::addedBefore(Date_t time) { return added < time; } void PoolForHost::createdOne(DBClientBase* base) { if (_created == 0) _type = base->type(); ++_created; // _checkedOut is used to indicate the number of in-use connections so // though we didn't actually check this connection out, we bump it here. ++_checkedOut; } void PoolForHost::initializeHostName(const std::string& hostName) { if (_hostName.empty()) { _hostName = hostName; } } void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock& lk) { auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); }; if (timeout > 0) { stdx::chrono::seconds timeoutSeconds{timeout}; // If we timed out waiting without getting a new connection, throw. uassert(ErrorCodes::ExceededTimeLimit, str::stream() << "too many connections to " << _hostName << ":" << timeout, !_cv.wait_for(lk, timeoutSeconds, condition)); } else { _cv.wait(lk, condition); } } void PoolForHost::notifyWaiters() { _cv.notify_one(); } void PoolForHost::shutdown() { _inShutdown.store(true); _cv.notify_all(); } // ------ DBConnectionPool::Detail ------ class DBConnectionPool::Detail { public: template static DBClientBase* get(DBConnectionPool* _this, const std::string& host, double timeout, Connect connect) { while (!(_this->_inShutdown.load())) { // Get a connection from the pool, if there is one. std::unique_ptr c(_this->_get(host, timeout)); if (c) { // This call may throw. _this->onHandedOut(c.get()); return c.release(); } // If there are no pooled connections for this host, create a new connection. If // there are too many connections in this pool to make a new one, block until a // connection is released. { stdx::unique_lock lk(_this->_mutex); PoolForHost& p = _this->_pools[PoolKey(host, timeout)]; if (p.openConnections() >= _this->_maxInUse) { log() << "Too many in-use connections; waiting until there are fewer than " << _this->_maxInUse; p.waitForFreeConnection(timeout, lk); } else { // Drop the lock here, so we can connect without holding it. // _finishCreate will take the lock again. lk.unlock(); // Create a new connection and return. All Connect functions // should throw if they cannot create a connection. auto c = connect(); invariant(c); return _this->_finishCreate(host, timeout, c); } } } // If we get here, we are in shutdown, and it does not matter what we return. invariant(_this->_inShutdown.load()); uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false); MONGO_UNREACHABLE; } }; // ------ DBConnectionPool ------ const int PoolForHost::kPoolSizeUnlimited(-1); DBConnectionPool::DBConnectionPool() : _name("dbconnectionpool"), _maxPoolSize(PoolForHost::kPoolSizeUnlimited), _maxInUse(kDefaultMaxInUse), _idleTimeout(kDefaultIdleTimeout), _inShutdown(false), _hooks(new list()) {} void DBConnectionPool::shutdown() { if (!_inShutdown.swap(true)) { stdx::lock_guard L(_mutex); for (auto i = _pools.begin(); i != _pools.end(); i++) { PoolForHost& p = i->second; p.shutdown(); } } } DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) { uassert(ErrorCodes::ShutdownInProgress, "Can't use connection pool during shutdown", !globalInShutdownDeprecated()); stdx::lock_guard L(_mutex); PoolForHost& p = _pools[PoolKey(ident, socketTimeout)]; p.setMaxPoolSize(_maxPoolSize); p.setSocketTimeout(socketTimeout); p.initializeHostName(ident); return p.get(this, socketTimeout); } int DBConnectionPool::openConnections(const string& ident, double socketTimeout) { stdx::lock_guard L(_mutex); PoolForHost& p = _pools[PoolKey(ident, socketTimeout)]; return p.openConnections(); } DBClientBase* DBConnectionPool::_finishCreate(const string& ident, double socketTimeout, DBClientBase* conn) { { stdx::lock_guard L(_mutex); PoolForHost& p = _pools[PoolKey(ident, socketTimeout)]; p.setMaxPoolSize(_maxPoolSize); p.initializeHostName(ident); p.createdOne(conn); } try { onCreate(conn); onHandedOut(conn); } catch (std::exception&) { delete conn; throw; } log() << "Successfully connected to " << ident << " (" << openConnections(ident, socketTimeout) << " connections now open to " << ident << " with a " << socketTimeout << " second timeout)"; return conn; } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { auto connect = [&]() { string errmsg; auto c = url.connect(StringData(), errmsg, socketTimeout).release(); uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c); return c; }; return Detail::get(this, url.toString(), socketTimeout, connect); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { auto connect = [&] { const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host))); string errmsg; auto c = cs.connect(StringData(), errmsg, socketTimeout).release(); if (!c) { throwSocketError(SocketErrorKind::CONNECT_ERROR, host, str::stream() << _name << " error: " << errmsg); } return c; }; return Detail::get(this, host, socketTimeout, connect); } DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) { auto connect = [&] { string errmsg; std::unique_ptr c(uri.connect(StringData(), errmsg, socketTimeout)); uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c); return c.release(); }; return Detail::get(this, uri.toString(), socketTimeout, connect); } int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const { stdx::lock_guard L(_mutex); auto it = _pools.find(PoolKey(host, socketTimeout)); return (it == _pools.end()) ? 0 : it->second.numAvailable(); } int DBConnectionPool::getNumBadConns(const string& host, double socketTimeout) const { stdx::lock_guard L(_mutex); auto it = _pools.find(PoolKey(host, socketTimeout)); return (it == _pools.end()) ? 0 : it->second.getNumBadConns(); } void DBConnectionPool::onRelease(DBClientBase* conn) { if (_hooks->empty()) { return; } for (list::iterator i = _hooks->begin(); i != _hooks->end(); i++) { (*i)->onRelease(conn); } } void DBConnectionPool::release(const string& host, DBClientBase* c) { onRelease(c); stdx::unique_lock lk(_mutex); PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())]; p.done(this, c); lk.unlock(); p.notifyWaiters(); } DBConnectionPool::~DBConnectionPool() { // Do not log in destruction, because global connection pools get // destroyed after the logging framework. stdx::lock_guard L(_mutex); for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); i++) { PoolForHost& p = i->second; p._parentDestroyed = true; } #if __has_feature(address_sanitizer) __lsan_ignore_object(_hooks); #endif } void DBConnectionPool::flush() { stdx::lock_guard L(_mutex); for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); i++) { PoolForHost& p = i->second; p.flush(); } } void DBConnectionPool::clear() { stdx::lock_guard L(_mutex); LOG(2) << "Removing connections on all pools owned by " << _name << endl; for (PoolMap::iterator iter = _pools.begin(); iter != _pools.end(); ++iter) { iter->second.clear(); } } void DBConnectionPool::removeHost(const string& host) { stdx::lock_guard L(_mutex); LOG(2) << "Removing connections from all pools for host: " << host << endl; for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) { const string& poolHost = i->first.ident; if (!serverNameCompare()(host, poolHost) && !serverNameCompare()(poolHost, host)) { // hosts are the same i->second.clear(); } } } void DBConnectionPool::addHook(DBConnectionHook* hook) { _hooks->push_back(hook); } void DBConnectionPool::onCreate(DBClientBase* conn) { if (_hooks->size() == 0) return; for (list::iterator i = _hooks->begin(); i != _hooks->end(); i++) { (*i)->onCreate(conn); } } void DBConnectionPool::onHandedOut(DBClientBase* conn) { if (_hooks->size() == 0) return; for (list::iterator i = _hooks->begin(); i != _hooks->end(); i++) { (*i)->onHandedOut(conn); } } void DBConnectionPool::onDestroy(DBClientBase* conn) { if (_hooks->size() == 0) return; for (list::iterator i = _hooks->begin(); i != _hooks->end(); i++) { (*i)->onDestroy(conn); } } void DBConnectionPool::appendConnectionStats(executor::ConnectionPoolStats* stats) const { { stdx::lock_guard lk(_mutex); for (PoolMap::const_iterator i = _pools.begin(); i != _pools.end(); ++i) { if (i->second.numCreated() == 0) continue; // Mongos may use either a replica set uri or a list of addresses as // the identifier here, so we always take the first server parsed out // as our label for connPoolStats. Note that these stats will collide // with any existing stats for the chosen host. auto uri = ConnectionString::parse(i->first.ident); invariant(uri.isOK()); HostAndPort host = uri.getValue().getServers().front(); executor::ConnectionStatsPer hostStats{static_cast(i->second.numInUse()), static_cast(i->second.numAvailable()), static_cast(i->second.numCreated()), 0}; stats->updateStatsForHost("global", host, hostStats); } } } bool DBConnectionPool::serverNameCompare::operator()(const string& a, const string& b) const { const char* ap = a.c_str(); const char* bp = b.c_str(); while (true) { if (*ap == '\0' || *ap == '/') { if (*bp == '\0' || *bp == '/') return false; // equal strings else return true; // a is shorter } if (*bp == '\0' || *bp == '/') return false; // b is shorter if (*ap < *bp) return true; else if (*ap > *bp) return false; ++ap; ++bp; } verify(false); } bool DBConnectionPool::poolKeyCompare::operator()(const PoolKey& a, const PoolKey& b) const { if (DBConnectionPool::serverNameCompare()(a.ident, b.ident)) return true; if (DBConnectionPool::serverNameCompare()(b.ident, a.ident)) return false; return a.timeout < b.timeout; } bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* conn) { if (conn == NULL) { return false; } if (conn->isFailed()) { return false; } { stdx::lock_guard sl(_mutex); PoolForHost& pool = _pools[PoolKey(hostName, conn->getSoTimeout())]; if (pool.isBadSocketCreationTime(conn->getSockCreationMicroSec())) { return false; } } return true; } void DBConnectionPool::taskDoWork() { vector toDelete; auto idleThreshold = Date_t::now() - _idleTimeout; { // we need to get the connections inside the lock // but we can actually delete them outside stdx::lock_guard lk(_mutex); for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) { i->second.getStaleConnections(idleThreshold, toDelete); } } for (size_t i = 0; i < toDelete.size(); i++) { try { onDestroy(toDelete[i]); delete toDelete[i]; } catch (...) { // we don't care if there was a socket error } } } // ------ ScopedDbConnection ------ ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout) : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout) : _host(host.toString()), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout) : _host(uri.toString()), _conn(globalConnPool.get(uri, socketTimeout)), _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } void ScopedDbConnection::done() { if (!_conn) { return; } globalConnPool.release(_host, _conn); _conn = NULL; } void ScopedDbConnection::_setSocketTimeout() { if (!_conn) return; if (_conn->type() == ConnectionString::MASTER) static_cast(_conn)->setSoTimeout(_socketTimeoutSecs); } ScopedDbConnection::~ScopedDbConnection() { if (_conn) { if (_conn->isFailed()) { if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) { kill(); } else { // The pool takes care of deleting the failed connection - this // will also trigger disposal of older connections in the pool done(); } } else { /* see done() comments above for why we log this line */ logNoCache() << "scoped connection to " << _conn->getServerAddress() << " not being returned to the pool" << endl; kill(); } } } void ScopedDbConnection::clearPool() { globalConnPool.clear(); } AtomicInt32 AScopedConnection::_numConnections; MONGO_INITIALIZER(SetupDBClientBaseWithConnection)(InitializerContext*) { DBClientBase::withConnection_do_not_use = [](std::string host, std::function cb) { ScopedDbConnection conn(host); cb(conn.get()); conn.done(); }; return Status::OK(); } } // namespace mongo