diff options
author | Jason Carey <jcarey@argv.me> | 2018-01-02 13:32:21 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-01-02 13:36:01 -0500 |
commit | b48f6b016aa09aed2dec3b964ef982af8ab11a4c (patch) | |
tree | 3017200f8bb30c1b6d75e3659613267bb6c13228 /src/mongo/client/connpool.cpp | |
parent | 071e4e8b7b3cee57bfa9b48c4320ca1abea7aa75 (diff) | |
download | mongo-b48f6b016aa09aed2dec3b964ef982af8ab11a4c.tar.gz |
Revert "SERVER-28822 Add new dbclient connection pool options"
This reverts commit 9e7df79d3907a743e29b7333de6ea08a01f33a05.
Diffstat (limited to 'src/mongo/client/connpool.cpp')
-rw-r--r-- | src/mongo/client/connpool.cpp | 219 |
1 files changed, 62 insertions, 157 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 9f36cfccb1c..6c5a3d9cdfa 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -36,25 +36,18 @@ #include "mongo/client/connpool.h" -#include <limits> #include <string> #include "mongo/client/connection_string.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/connection_pool_stats.h" -#include "mongo/stdx/chrono.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" namespace mongo { -namespace { -const int kDefaultIdleTimeout = std::numeric_limits<int>::max(); -const int kDefaultMaxInUse = std::numeric_limits<int>::max(); -} // namespace - using std::endl; using std::list; using std::map; @@ -64,17 +57,6 @@ using std::vector; // ------ PoolForHost ------ -PoolForHost::PoolForHost() - : _created(0), - _minValidCreationTimeMicroSec(0), - _type(ConnectionString::INVALID), - _maxPoolSize(kPoolSizeUnlimited), - _maxInUse(kDefaultMaxInUse), - _checkedOut(0), - _badConns(0), - _parentDestroyed(false), - _inShutdown(false) {} - PoolForHost::~PoolForHost() { clear(); } @@ -82,7 +64,7 @@ PoolForHost::~PoolForHost() { void PoolForHost::clear() { if (!_parentDestroyed) { logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of " - << _socketTimeoutSecs << " seconds)"; + << _socketTimeout << " seconds)"; } _pool = decltype(_pool){}; @@ -104,14 +86,14 @@ void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) { if (isFailed || isBroken) { _badConns++; logNoCache() << "Ending connection to host " << _hostName << "(with timeout of " - << _socketTimeoutSecs << " seconds)" + << _socketTimeout << " seconds)" << " due to bad connection status; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); } else if (_maxPoolSize >= 0 && static_cast<int>(_pool.size()) >= _maxPoolSize) { // We have a pool size that we need to enforce logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of " - << _socketTimeoutSecs << " seconds)" + << _socketTimeout << " seconds)" << " because the pool meets constraints; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); @@ -161,13 +143,13 @@ void PoolForHost::flush() { clear(); } -void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*>& stale) { +void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) { vector<StoredConnection> all; while (!_pool.empty()) { StoredConnection c = std::move(_pool.top()); _pool.pop(); - if (c.ok() && !c.addedBefore(idleThreshold)) { + if (c.ok()) { all.push_back(std::move(c)); } else { _badConns++; @@ -182,17 +164,13 @@ void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase* PoolForHost::StoredConnection::StoredConnection(std::unique_ptr<DBClientBase> c) - : conn(std::move(c)), added(Date_t::now()) {} + : conn(std::move(c)), when(time(nullptr)) {} bool PoolForHost::StoredConnection::ok() { // Poke the connection to see if we're still ok return conn->isStillConnected(); } -bool PoolForHost::StoredConnection::addedBefore(Date_t time) { - return added < time; -} - void PoolForHost::createdOne(DBClientBase* base) { if (_created == 0) _type = base->type(); @@ -208,80 +186,6 @@ void PoolForHost::initializeHostName(const std::string& hostName) { } } -void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk) { - auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); }; - - if (timeout > 0) { - stdx::chrono::seconds timeoutSeconds{timeout}; - - // If we timed out waiting without getting a new connection, throw. - uassert(ErrorCodes::ExceededTimeLimit, - str::stream() << "too many connections to " << _hostName << ":" << timeout, - !_cv.wait_for(lk, timeoutSeconds, condition)); - } else { - _cv.wait(lk, condition); - } -} - -void PoolForHost::notifyWaiters() { - _cv.notify_one(); -} - -void PoolForHost::shutdown() { - _inShutdown.store(true); - _cv.notify_all(); -} - -// ------ DBConnectionPool::Detail ------ - -class DBConnectionPool::Detail { -public: - template <typename Connect> - static DBClientBase* get(DBConnectionPool* _this, - const std::string& host, - double timeout, - Connect connect) { - while (!(_this->_inShutdown.load())) { - // Get a connection from the pool, if there is one. - std::unique_ptr<DBClientBase> c(_this->_get(host, timeout)); - if (c) { - // This call may throw. - _this->onHandedOut(c.get()); - return c.release(); - } - - // If there are no pooled connections for this host, create a new connection. If - // there are too many connections in this pool to make a new one, block until a - // connection is released. - { - stdx::unique_lock<stdx::mutex> lk(_this->_mutex); - PoolForHost& p = _this->_pools[PoolKey(host, timeout)]; - - if (p.openConnections() >= _this->_maxInUse) { - log() << "Too many in-use connections; waiting until there are fewer than " - << _this->_maxInUse; - p.waitForFreeConnection(timeout, lk); - } else { - // Drop the lock here, so we can connect without holding it. - // _finishCreate will take the lock again. - lk.unlock(); - - // Create a new connection and return. All Connect functions - // should throw if they cannot create a connection. - auto c = connect(); - invariant(c); - return _this->_finishCreate(host, timeout, c); - } - } - } - - // If we get here, we are in shutdown, and it does not matter what we return. - invariant(_this->_inShutdown.load()); - uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false); - MONGO_UNREACHABLE; - } -}; - // ------ DBConnectionPool ------ const int PoolForHost::kPoolSizeUnlimited(-1); @@ -289,22 +193,7 @@ const int PoolForHost::kPoolSizeUnlimited(-1); DBConnectionPool::DBConnectionPool() : _name("dbconnectionpool"), _maxPoolSize(PoolForHost::kPoolSizeUnlimited), - _maxInUse(kDefaultMaxInUse), - _idleTimeout(kDefaultIdleTimeout), - _inShutdown(false), - _hooks(new list<DBConnectionHook*>()) - -{} - -void DBConnectionPool::shutdown() { - if (!_inShutdown.swap(true)) { - stdx::lock_guard<stdx::mutex> L(_mutex); - for (auto i = _pools.begin(); i != _pools.end(); i++) { - PoolForHost& p = i->second; - p.shutdown(); - } - } -} + _hooks(new list<DBConnectionHook*>()) {} DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) { uassert(ErrorCodes::ShutdownInProgress, @@ -351,44 +240,65 @@ DBClientBase* DBConnectionPool::_finishCreate(const string& ident, } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { - auto connect = [&]() { - string errmsg; - auto c = url.connect(StringData(), errmsg, socketTimeout).release(); - uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c); + // If a connection for this host is available from the underlying PoolForHost, use the + // connection in the pool. + DBClientBase* c = _get(url.toString(), socketTimeout); + if (c) { + try { + onHandedOut(c); + } catch (std::exception&) { + delete c; + throw; + } return c; - }; + } - return Detail::get(this, url.toString(), socketTimeout, connect); + // If no connections for this host are available in the PoolForHost (that is, all the + // connections have been checked out, or none have been created yet), create a new connection. + string errmsg; + c = url.connect(StringData(), errmsg, socketTimeout).release(); + uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c); + + return _finishCreate(url.toString(), socketTimeout, c); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { - auto connect = [&] { - const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host))); - - string errmsg; - auto c = cs.connect(StringData(), errmsg, socketTimeout).release(); - if (!c) { - throw SocketException(SocketException::CONNECT_ERROR, - host, - 11002, - str::stream() << _name << " error: " << errmsg); + DBClientBase* c = _get(host, socketTimeout); + if (c) { + try { + onHandedOut(c); + } catch (std::exception&) { + delete c; + throw; } - return c; - }; + } + + const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host))); - return Detail::get(this, host, socketTimeout, connect); + string errmsg; + c = cs.connect(StringData(), errmsg, socketTimeout).release(); + if (!c) + throw SocketException(SocketException::CONNECT_ERROR, + host, + 11002, + str::stream() << _name << " error: " << errmsg); + + return _finishCreate(host, socketTimeout, c); } DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) { - auto connect = [&] { - string errmsg; - std::unique_ptr<DBClientBase> c(uri.connect(StringData(), errmsg, socketTimeout)); - uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c); + std::unique_ptr<DBClientBase> c(_get(uri.toString(), socketTimeout)); + if (c) { + onHandedOut(c.get()); return c.release(); - }; + } + + string errmsg; + c = std::unique_ptr<DBClientBase>(uri.connect(StringData(), errmsg, socketTimeout)); + uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c); - return Detail::get(this, uri.toString(), socketTimeout, connect); + return _finishCreate(uri.toString(), socketTimeout, c.release()); } int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const { @@ -416,14 +326,11 @@ void DBConnectionPool::onRelease(DBClientBase* conn) { void DBConnectionPool::release(const string& host, DBClientBase* c) { onRelease(c); - stdx::unique_lock<stdx::mutex> lk(_mutex); - PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())]; - p.done(this, c); - - lk.unlock(); - p.notifyWaiters(); + stdx::lock_guard<stdx::mutex> L(_mutex); + _pools[PoolKey(host, c->getSoTimeout())].done(this, c); } + DBConnectionPool::~DBConnectionPool() { // Do not log in destruction, because global connection pools get // destroyed after the logging framework. @@ -575,13 +482,13 @@ bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* co void DBConnectionPool::taskDoWork() { vector<DBClientBase*> toDelete; - auto idleThreshold = Date_t::now() - _idleTimeout; + { // we need to get the connections inside the lock // but we can actually delete them outside stdx::lock_guard<stdx::mutex> lk(_mutex); for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) { - i->second.getStaleConnections(idleThreshold, toDelete); + i->second.getStaleConnections(toDelete); } } @@ -598,23 +505,21 @@ void DBConnectionPool::taskDoWork() { // ------ ScopedDbConnection ------ ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout) - : _host(host), - _conn(globalConnPool.get(host, socketTimeout)), - _socketTimeoutSecs(socketTimeout) { + : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeout(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout) : _host(host.toString()), _conn(globalConnPool.get(host, socketTimeout)), - _socketTimeoutSecs(socketTimeout) { + _socketTimeout(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout) : _host(uri.toString()), _conn(globalConnPool.get(uri, socketTimeout)), - _socketTimeoutSecs(socketTimeout) { + _socketTimeout(socketTimeout) { _setSocketTimeout(); } @@ -632,7 +537,7 @@ void ScopedDbConnection::_setSocketTimeout() { return; if (_conn->type() == ConnectionString::MASTER) - static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeoutSecs); + static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout); } ScopedDbConnection::~ScopedDbConnection() { |