diff options
author | samantharitter <samantha.ritter@10gen.com> | 2017-12-20 11:37:25 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-02-08 17:51:22 -0500 |
commit | 3d46e7e05bed6c6f19a64007cc28c344deef9513 (patch) | |
tree | d6f6d04a6b4af900f475ea22d31e183729cce7cc | |
parent | 632d0058fcc667f955c77d9bcfcc299592c8c555 (diff) | |
download | mongo-3d46e7e05bed6c6f19a64007cc28c344deef9513.tar.gz |
SERVER-28822 Add new dbclient connection pool options
(cherry picked from commit 48a34a495386b7cbe18419313768929d12028125)
-rw-r--r-- | src/mongo/client/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/client/connpool.cpp | 219 | ||||
-rw-r--r-- | src/mongo/client/connpool.h | 102 | ||||
-rw-r--r-- | src/mongo/client/connpool_integration_test.cpp | 173 | ||||
-rw-r--r-- | src/mongo/db/conn_pool_options.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/conn_pool_options.h | 26 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 3 |
8 files changed, 488 insertions, 82 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index ffe65b7cc89..d034b9698aa 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -193,6 +193,17 @@ clientDriverEnv.Library( ], ) +env.CppIntegrationTest( + target='connpool_integration_test', + source=[ + 'connpool_integration_test.cpp', + ], + LIBDEPS=[ + 'clientdriver', + '$BUILD_DIR/mongo/util/version_impl', + ], +) + env.Library( target='connection_pool', source=[ diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 6c5a3d9cdfa..9f36cfccb1c 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -36,18 +36,25 @@ #include "mongo/client/connpool.h" +#include <limits> #include <string> #include "mongo/client/connection_string.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/connection_pool_stats.h" +#include "mongo/stdx/chrono.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" namespace mongo { +namespace { +const int kDefaultIdleTimeout = std::numeric_limits<int>::max(); +const int kDefaultMaxInUse = std::numeric_limits<int>::max(); +} // namespace + using std::endl; using std::list; using std::map; @@ -57,6 +64,17 @@ using std::vector; // ------ PoolForHost ------ +PoolForHost::PoolForHost() + : _created(0), + _minValidCreationTimeMicroSec(0), + _type(ConnectionString::INVALID), + _maxPoolSize(kPoolSizeUnlimited), + _maxInUse(kDefaultMaxInUse), + _checkedOut(0), + _badConns(0), + _parentDestroyed(false), + _inShutdown(false) {} + PoolForHost::~PoolForHost() { clear(); } @@ -64,7 +82,7 @@ PoolForHost::~PoolForHost() { void PoolForHost::clear() { if (!_parentDestroyed) { logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of " - << _socketTimeout << " seconds)"; + << _socketTimeoutSecs << " seconds)"; } _pool = decltype(_pool){}; @@ -86,14 +104,14 @@ void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) { if (isFailed || isBroken) { _badConns++; logNoCache() << "Ending connection to host " << _hostName << "(with timeout of " - << _socketTimeout << " seconds)" + << _socketTimeoutSecs << " seconds)" << " due to bad connection status; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); } else if (_maxPoolSize >= 0 && static_cast<int>(_pool.size()) >= _maxPoolSize) { // We have a pool size that we need to enforce logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of " - << _socketTimeout << " seconds)" + << _socketTimeoutSecs << " seconds)" << " because the pool meets constraints; " << openConnections() << " connections to that host remain open"; pool->onDestroy(c.get()); @@ -143,13 +161,13 @@ void PoolForHost::flush() { clear(); } -void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) { +void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*>& stale) { vector<StoredConnection> all; while (!_pool.empty()) { StoredConnection c = std::move(_pool.top()); _pool.pop(); - if (c.ok()) { + if (c.ok() && !c.addedBefore(idleThreshold)) { all.push_back(std::move(c)); } else { _badConns++; @@ -164,13 +182,17 @@ void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) { PoolForHost::StoredConnection::StoredConnection(std::unique_ptr<DBClientBase> c) - : conn(std::move(c)), when(time(nullptr)) {} + : conn(std::move(c)), added(Date_t::now()) {} bool PoolForHost::StoredConnection::ok() { // Poke the connection to see if we're still ok return conn->isStillConnected(); } +bool PoolForHost::StoredConnection::addedBefore(Date_t time) { + return added < time; +} + void PoolForHost::createdOne(DBClientBase* base) { if (_created == 0) _type = base->type(); @@ -186,6 +208,80 @@ void PoolForHost::initializeHostName(const std::string& hostName) { } } +void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk) { + auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); }; + + if (timeout > 0) { + stdx::chrono::seconds timeoutSeconds{timeout}; + + // If we timed out waiting without getting a new connection, throw. + uassert(ErrorCodes::ExceededTimeLimit, + str::stream() << "too many connections to " << _hostName << ":" << timeout, + !_cv.wait_for(lk, timeoutSeconds, condition)); + } else { + _cv.wait(lk, condition); + } +} + +void PoolForHost::notifyWaiters() { + _cv.notify_one(); +} + +void PoolForHost::shutdown() { + _inShutdown.store(true); + _cv.notify_all(); +} + +// ------ DBConnectionPool::Detail ------ + +class DBConnectionPool::Detail { +public: + template <typename Connect> + static DBClientBase* get(DBConnectionPool* _this, + const std::string& host, + double timeout, + Connect connect) { + while (!(_this->_inShutdown.load())) { + // Get a connection from the pool, if there is one. + std::unique_ptr<DBClientBase> c(_this->_get(host, timeout)); + if (c) { + // This call may throw. + _this->onHandedOut(c.get()); + return c.release(); + } + + // If there are no pooled connections for this host, create a new connection. If + // there are too many connections in this pool to make a new one, block until a + // connection is released. + { + stdx::unique_lock<stdx::mutex> lk(_this->_mutex); + PoolForHost& p = _this->_pools[PoolKey(host, timeout)]; + + if (p.openConnections() >= _this->_maxInUse) { + log() << "Too many in-use connections; waiting until there are fewer than " + << _this->_maxInUse; + p.waitForFreeConnection(timeout, lk); + } else { + // Drop the lock here, so we can connect without holding it. + // _finishCreate will take the lock again. + lk.unlock(); + + // Create a new connection and return. All Connect functions + // should throw if they cannot create a connection. + auto c = connect(); + invariant(c); + return _this->_finishCreate(host, timeout, c); + } + } + } + + // If we get here, we are in shutdown, and it does not matter what we return. + invariant(_this->_inShutdown.load()); + uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false); + MONGO_UNREACHABLE; + } +}; + // ------ DBConnectionPool ------ const int PoolForHost::kPoolSizeUnlimited(-1); @@ -193,7 +289,22 @@ const int PoolForHost::kPoolSizeUnlimited(-1); DBConnectionPool::DBConnectionPool() : _name("dbconnectionpool"), _maxPoolSize(PoolForHost::kPoolSizeUnlimited), - _hooks(new list<DBConnectionHook*>()) {} + _maxInUse(kDefaultMaxInUse), + _idleTimeout(kDefaultIdleTimeout), + _inShutdown(false), + _hooks(new list<DBConnectionHook*>()) + +{} + +void DBConnectionPool::shutdown() { + if (!_inShutdown.swap(true)) { + stdx::lock_guard<stdx::mutex> L(_mutex); + for (auto i = _pools.begin(); i != _pools.end(); i++) { + PoolForHost& p = i->second; + p.shutdown(); + } + } +} DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) { uassert(ErrorCodes::ShutdownInProgress, @@ -240,65 +351,44 @@ DBClientBase* DBConnectionPool::_finishCreate(const string& ident, } DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { - // If a connection for this host is available from the underlying PoolForHost, use the - // connection in the pool. - DBClientBase* c = _get(url.toString(), socketTimeout); - if (c) { - try { - onHandedOut(c); - } catch (std::exception&) { - delete c; - throw; - } + auto connect = [&]() { + string errmsg; + auto c = url.connect(StringData(), errmsg, socketTimeout).release(); + uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c); return c; - } + }; - // If no connections for this host are available in the PoolForHost (that is, all the - // connections have been checked out, or none have been created yet), create a new connection. - string errmsg; - c = url.connect(StringData(), errmsg, socketTimeout).release(); - uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c); - - return _finishCreate(url.toString(), socketTimeout, c); + return Detail::get(this, url.toString(), socketTimeout, connect); } DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { - DBClientBase* c = _get(host, socketTimeout); - if (c) { - try { - onHandedOut(c); - } catch (std::exception&) { - delete c; - throw; + auto connect = [&] { + const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host))); + + string errmsg; + auto c = cs.connect(StringData(), errmsg, socketTimeout).release(); + if (!c) { + throw SocketException(SocketException::CONNECT_ERROR, + host, + 11002, + str::stream() << _name << " error: " << errmsg); } - return c; - } - - const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host))); - string errmsg; - c = cs.connect(StringData(), errmsg, socketTimeout).release(); - if (!c) - throw SocketException(SocketException::CONNECT_ERROR, - host, - 11002, - str::stream() << _name << " error: " << errmsg); + return c; + }; - return _finishCreate(host, socketTimeout, c); + return Detail::get(this, host, socketTimeout, connect); } DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) { - std::unique_ptr<DBClientBase> c(_get(uri.toString(), socketTimeout)); - if (c) { - onHandedOut(c.get()); + auto connect = [&] { + string errmsg; + std::unique_ptr<DBClientBase> c(uri.connect(StringData(), errmsg, socketTimeout)); + uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c); return c.release(); - } - - string errmsg; - c = std::unique_ptr<DBClientBase>(uri.connect(StringData(), errmsg, socketTimeout)); - uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c); + }; - return _finishCreate(uri.toString(), socketTimeout, c.release()); + return Detail::get(this, uri.toString(), socketTimeout, connect); } int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const { @@ -326,10 +416,13 @@ void DBConnectionPool::onRelease(DBClientBase* conn) { void DBConnectionPool::release(const string& host, DBClientBase* c) { onRelease(c); - stdx::lock_guard<stdx::mutex> L(_mutex); - _pools[PoolKey(host, c->getSoTimeout())].done(this, c); -} + stdx::unique_lock<stdx::mutex> lk(_mutex); + PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())]; + p.done(this, c); + lk.unlock(); + p.notifyWaiters(); +} DBConnectionPool::~DBConnectionPool() { // Do not log in destruction, because global connection pools get @@ -482,13 +575,13 @@ bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* co void DBConnectionPool::taskDoWork() { vector<DBClientBase*> toDelete; - + auto idleThreshold = Date_t::now() - _idleTimeout; { // we need to get the connections inside the lock // but we can actually delete them outside stdx::lock_guard<stdx::mutex> lk(_mutex); for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) { - i->second.getStaleConnections(toDelete); + i->second.getStaleConnections(idleThreshold, toDelete); } } @@ -505,21 +598,23 @@ void DBConnectionPool::taskDoWork() { // ------ ScopedDbConnection ------ ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout) - : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeout(socketTimeout) { + : _host(host), + _conn(globalConnPool.get(host, socketTimeout)), + _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout) : _host(host.toString()), _conn(globalConnPool.get(host, socketTimeout)), - _socketTimeout(socketTimeout) { + _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout) : _host(uri.toString()), _conn(globalConnPool.get(uri, socketTimeout)), - _socketTimeout(socketTimeout) { + _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } @@ -537,7 +632,7 @@ void ScopedDbConnection::_setSocketTimeout() { return; if (_conn->type() == ConnectionString::MASTER) - static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout); + static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeoutSecs); } ScopedDbConnection::~ScopedDbConnection() { diff --git a/src/mongo/client/connpool.h b/src/mongo/client/connpool.h index 68af31a5225..9d428dcab36 100644 --- a/src/mongo/client/connpool.h +++ b/src/mongo/client/connpool.h @@ -37,6 +37,7 @@ #include "mongo/platform/atomic_word.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/mutex.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -52,7 +53,9 @@ struct ConnectionPoolStats; * host. It is not responsible for creating new connections; instead, when DBConnectionPool is asked * for a connection to a particular host, DBConnectionPool will check if any connections are * available in the PoolForHost for that host. If so, DBConnectionPool will check out a connection - * from PoolForHost, and if not, DBConnectionPool will create a new connection itself. + * from PoolForHost, and if not, DBConnectionPool will create a new connection itself, if we are + * below the maximum allowed number of connections. If we have already created _maxPoolSize + * connections, the calling thread will block until a new connection can be made for it. * * Once the connection is released back to DBConnectionPool, DBConnectionPool will attempt to * release the connection to PoolForHost. This is how connections enter PoolForHost for the first @@ -74,15 +77,7 @@ public: friend class DBConnectionPool; - PoolForHost() - : _created(0), - _minValidCreationTimeMicroSec(0), - _type(ConnectionString::INVALID), - _maxPoolSize(kPoolSizeUnlimited), - _checkedOut(0), - _badConns(0), - _parentDestroyed(false) {} - + PoolForHost(); ~PoolForHost(); /** @@ -107,10 +102,17 @@ public: } /** - * Sets the socket timeout on this host, for reporting purposes only. + * Sets the maximum number of in-use connections for this pool. + */ + void setMaxInUse(int maxInUse) { + _maxInUse = maxInUse; + } + + /** + * Sets the socket timeout on this host, in seconds, for reporting purposes only. */ void setSocketTimeout(double socketTimeout) { - _socketTimeout = socketTimeout; + _socketTimeoutSecs = socketTimeout; } int numAvailable() const { @@ -150,7 +152,7 @@ public: void flush(); - void getStaleConnections(std::vector<DBClientBase*>& stale); + void getStaleConnections(Date_t idleThreshold, std::vector<DBClientBase*>& stale); /** * Sets the lower bound for creation times that can be considered as @@ -169,18 +171,45 @@ public: */ void initializeHostName(const std::string& hostName); + /** + * If this pool has more than _maxPoolSize connections in use, blocks + * the calling thread until a connection is returned to the pool or + * is destroyed. If a non-zero timeout is given, this method will + * throw if a free connection cannot be acquired within that amount of + * time. Timeout is in seconds. + */ + void waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk); + + /** + * Notifies any waiters that there are new connections available. + */ + void notifyWaiters(); + + /** + * Shuts down this pool, notifying all waiters. + */ + void shutdown(); + private: struct StoredConnection { StoredConnection(std::unique_ptr<DBClientBase> c); bool ok(); + /** + * Returns true if this connection was added before the given time. + */ + bool addedBefore(Date_t time); + std::unique_ptr<DBClientBase> conn; - time_t when; + + // The time when this connection was added to the pool. Will + // be reset if the connection is checked out and re-added. + Date_t added; }; std::string _hostName; - double _socketTimeout; + double _socketTimeoutSecs; std::stack<StoredConnection> _pool; int64_t _created; @@ -190,6 +219,9 @@ private: // The maximum number of connections we'll save in the pool int _maxPoolSize; + // The maximum number of connections allowed to be in-use in this pool + int _maxInUse; + // The number of currently active connections from this pool int _checkedOut; @@ -198,6 +230,10 @@ private: // Whether our parent DBConnectionPool object is in destruction bool _parentDestroyed; + + stdx::condition_variable _cv; + + AtomicWord<bool> _inShutdown; }; class DBConnectionHook { @@ -259,6 +295,21 @@ public: _maxPoolSize = maxPoolSize; } + /** + * Sets the maximum number of in-use connections per host. + */ + void setMaxInUse(int maxInUse) { + _maxInUse = maxInUse; + } + + /** + * Sets the timeout value for idle connections, after which we will remove them + * from the pool. This value is in minutes. + */ + void setIdleTimeout(int timeout) { + _idleTimeout = Minutes(timeout); + } + void onCreate(DBClientBase* conn); void onHandedOut(DBClientBase* conn); void onDestroy(DBClientBase* conn); @@ -266,6 +317,9 @@ public: void flush(); + /** + * Gets a connection to the given host with the given timeout, in seconds. + */ DBClientBase* get(const std::string& host, double socketTimeout = 0); DBClientBase* get(const ConnectionString& host, double socketTimeout = 0); DBClientBase* get(const MongoURI& uri, double socketTimeout = 0); @@ -310,7 +364,14 @@ public: } virtual void taskDoWork(); + /** + * Shuts down the connection pool, unblocking any waiters on connections. + */ + void shutdown(); + private: + class Detail; + DBConnectionPool(DBConnectionPool& p); DBClientBase* _get(const std::string& ident, double socketTimeout); @@ -337,8 +398,13 @@ private: // 0 effectively disables the pool int _maxPoolSize; + int _maxInUse; + Minutes _idleTimeout; + PoolMap _pools; + AtomicWord<bool> _inShutdown; + // pointers owned by me, right now they leak on shutdown // _hooks itself also leaks because it creates a shutdown race condition std::list<DBConnectionHook*>* _hooks; @@ -388,11 +454,11 @@ public: explicit ScopedDbConnection(const ConnectionString& host, double socketTimeout = 0); explicit ScopedDbConnection(const MongoURI& host, double socketTimeout = 0); - ScopedDbConnection() : _host(""), _conn(0), _socketTimeout(0) {} + ScopedDbConnection() : _host(""), _conn(0), _socketTimeoutSecs(0) {} /* @param conn - bind to an existing connection */ ScopedDbConnection(const std::string& host, DBClientBase* conn, double socketTimeout = 0) - : _host(host), _conn(conn), _socketTimeout(socketTimeout) { + : _host(host), _conn(conn), _socketTimeoutSecs(socketTimeout) { _setSocketTimeout(); } @@ -447,7 +513,7 @@ private: const std::string _host; DBClientBase* _conn; - const double _socketTimeout; + const double _socketTimeoutSecs; }; } // namespace mongo diff --git a/src/mongo/client/connpool_integration_test.cpp b/src/mongo/client/connpool_integration_test.cpp new file mode 100644 index 00000000000..758b3e99665 --- /dev/null +++ b/src/mongo/client/connpool_integration_test.cpp @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/connpool.h" +#include "mongo/client/global_conn_pool.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +TEST(ConnectionPoolTest, ConnectionPoolMaxInUseConnectionsTest) { + DBConnectionPool pool; + + auto fixture = unittest::getFixtureConnectionString(); + auto host = fixture.getServers()[0].toString(); + + stdx::condition_variable cv; + stdx::mutex mutex; + int counter = 0; + + pool.setMaxInUse(2); + + // Check out maxInUse connections. + auto conn1 = pool.get(host); + ASSERT(conn1); + auto conn2 = pool.get(host); + ASSERT(conn2); + + // Try creating a new one, should block until we release one. + stdx::thread t([&] { + { + stdx::lock_guard<stdx::mutex> lk(mutex); + counter++; + } + + cv.notify_one(); + + auto conn3 = pool.get(host); + + { + stdx::lock_guard<stdx::mutex> lk(mutex); + counter++; + } + + cv.notify_one(); + pool.release(host, conn3); + }); + + // First thread should be blocked. + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return counter == 1; }); + } + + // Return one to the pool, thread should be un-blocked. + pool.release(host, conn2); + + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return counter == 2; }); + } + + t.join(); +} + +TEST(ConnectionPoolTest, ConnectionPoolMaxInUseTimeoutTest) { + DBConnectionPool pool; + + auto fixture = unittest::getFixtureConnectionString(); + auto host = fixture.getServers()[0].toString(); + + pool.setMaxInUse(2); + + // Check out maxInUse connections. + auto conn1 = pool.get(host, 1); + ASSERT(conn1); + auto conn2 = pool.get(host, 1); + ASSERT(conn2); + + // Try creating a new connection with a 1-second timeout, should block, + // then should time out. + ASSERT_THROWS(pool.get(host, 1), AssertionException); +} + +TEST(ConnectionPoolTest, ConnectionPoolShutdownLogicTest) { + DBConnectionPool pool; + + auto fixture = unittest::getFixtureConnectionString(); + auto host = fixture.getServers()[0].toString(); + + stdx::condition_variable cv; + stdx::mutex mutex; + int counter = 0; + + pool.setMaxInUse(2); + + // Check out maxInUse connections. + auto conn1 = pool.get(host); + ASSERT(conn1); + auto conn2 = pool.get(host); + ASSERT(conn2); + + // Attempt to open a new connection, should block. + stdx::thread t([&] { + { + stdx::lock_guard<stdx::mutex> lk(mutex); + counter++; + } + + cv.notify_one(); + + ASSERT_THROWS(pool.get(host), AssertionException); + + { + stdx::lock_guard<stdx::mutex> lk(mutex); + counter++; + } + + cv.notify_one(); + }); + + // Wait for new thread to block. + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return counter == 1; }); + } + + // Shut down the pool, this should unblock our waiting connection. + pool.shutdown(); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return counter == 2; }); + } + + // Attempt to open a new connection, should fail. + ASSERT_THROWS(pool.get(host), AssertionException); + + t.join(); +} + +} // namespace + +} // namespace mongo diff --git a/src/mongo/db/conn_pool_options.cpp b/src/mongo/db/conn_pool_options.cpp index 988d31efb5d..851687fc7ff 100644 --- a/src/mongo/db/conn_pool_options.cpp +++ b/src/mongo/db/conn_pool_options.cpp @@ -30,6 +30,8 @@ #include "mongo/db/conn_pool_options.h" +#include <limits> + #include "mongo/base/init.h" #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" @@ -41,6 +43,12 @@ namespace mongo { int ConnPoolOptions::maxConnsPerHost(200); int ConnPoolOptions::maxShardedConnsPerHost(200); +int ConnPoolOptions::maxInUseConnsPerHost(std::numeric_limits<int>::max()); +int ConnPoolOptions::maxShardedInUseConnsPerHost(std::numeric_limits<int>::max()); + +int ConnPoolOptions::globalConnPoolIdleTimeout(std::numeric_limits<int>::max()); +int ConnPoolOptions::shardedConnPoolIdleTimeout(std::numeric_limits<int>::max()); + namespace { ExportedServerParameter<int, ServerParameterType::kStartupOnly> // @@ -53,6 +61,26 @@ ExportedServerParameter<int, ServerParameterType::kStartupOnly> // "connPoolMaxShardedConnsPerHost", &ConnPoolOptions::maxShardedConnsPerHost); +ExportedServerParameter<int, ServerParameterType::kStartupOnly> // + maxInUseConnsPerHostParameter(ServerParameterSet::getGlobal(), + "connPoolMaxInUseConnsPerHost", + &ConnPoolOptions::maxInUseConnsPerHost); + +ExportedServerParameter<int, ServerParameterType::kStartupOnly> // + maxShardedInUseConnsPerHostParameter(ServerParameterSet::getGlobal(), + "connPoolMaxShardedInUseConnsPerHost", + &ConnPoolOptions::maxShardedInUseConnsPerHost); + +ExportedServerParameter<int, ServerParameterType::kStartupOnly> // + globalConnPoolIdleTimeoutParameter(ServerParameterSet::getGlobal(), + "globalConnPoolIdleTimeoutMinutes", + &ConnPoolOptions::globalConnPoolIdleTimeout); + +ExportedServerParameter<int, ServerParameterType::kStartupOnly> // + shardedConnPoolIdleTimeoutParameter(ServerParameterSet::getGlobal(), + "shardedConnPoolIdleTimeoutMinutes", + &ConnPoolOptions::shardedConnPoolIdleTimeout); + MONGO_INITIALIZER(InitializeConnectionPools)(InitializerContext* context) { // Initialize the sharded and unsharded outgoing connection pools // NOTES: @@ -62,9 +90,13 @@ MONGO_INITIALIZER(InitializeConnectionPools)(InitializerContext* context) { globalConnPool.setName("connection pool"); globalConnPool.setMaxPoolSize(ConnPoolOptions::maxConnsPerHost); + globalConnPool.setMaxInUse(ConnPoolOptions::maxInUseConnsPerHost); + globalConnPool.setIdleTimeout(ConnPoolOptions::globalConnPoolIdleTimeout); shardConnectionPool.setName("sharded connection pool"); shardConnectionPool.setMaxPoolSize(ConnPoolOptions::maxShardedConnsPerHost); + shardConnectionPool.setMaxInUse(ConnPoolOptions::maxShardedInUseConnsPerHost); + shardConnectionPool.setIdleTimeout(ConnPoolOptions::shardedConnPoolIdleTimeout); return Status::OK(); } diff --git a/src/mongo/db/conn_pool_options.h b/src/mongo/db/conn_pool_options.h index faf10c0842c..9394a156d6a 100644 --- a/src/mongo/db/conn_pool_options.h +++ b/src/mongo/db/conn_pool_options.h @@ -28,6 +28,8 @@ #pragma once +#include "mongo/util/duration.h" + namespace mongo { // NOTE: @@ -40,13 +42,33 @@ namespace mongo { */ struct ConnPoolOptions { /** - * Maximum connections per host the connection pool should use + * Maximum connections per host the connection pool should store. */ static int maxConnsPerHost; /** - * Maximum connections per host the sharded conn pool should use + * Maximum connections per host the sharded conn pool should store. */ static int maxShardedConnsPerHost; + + /** + * Maximum in-use connections per host in the global connection pool. + */ + static int maxInUseConnsPerHost; + + /** + * Maximum in-use connections per host in the sharded connection pool. + */ + static int maxShardedInUseConnsPerHost; + + /** + * Amount of time, in minutes, to keep idle connections in the global connection pool. + */ + static int globalConnPoolIdleTimeout; + + /** + * Amount of time, in minutes, to keep idle connections in the sharded connection pool. + */ + static int shardedConnPoolIdleTimeout; }; } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 434448c82d8..d46545f50be 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -44,6 +44,7 @@ #include "mongo/base/init.h" #include "mongo/base/initializer.h" #include "mongo/base/status.h" +#include "mongo/client/global_conn_pool.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" #include "mongo/db/audit.h" @@ -1253,6 +1254,9 @@ void shutdownTask() { tl->shutdown(); } + // Shut down the global dbclient pool so callers stop waiting for connections. + globalConnPool.shutdown(); + if (serviceContext->getGlobalStorageEngine()) { ServiceContext::UniqueOperationContext uniqueOpCtx; OperationContext* opCtx = client->getOperationContext(); diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index e13d794abed..9933f2dc47c 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -240,6 +240,9 @@ static void cleanupTask() { tl->shutdown(); } + // Shut down the global dbclient pool so callers stop waiting for connections. + shardConnectionPool.shutdown(); + // Shutdown the Service Entry Point and its sessions and give it a grace period to complete. if (auto sep = serviceContext->getServiceEntryPoint()) { if (!sep->shutdown(Seconds(10))) { |