summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-12-20 11:37:25 -0500
committersamantharitter <samantha.ritter@10gen.com>2017-12-20 16:32:02 -0500
commit9e7df79d3907a743e29b7333de6ea08a01f33a05 (patch)
tree9fa45946255b5088d96426facd9ce45049f2d267
parent9d6383c2b9f0c6af06d7181e6cff9152d92ba858 (diff)
downloadmongo-9e7df79d3907a743e29b7333de6ea08a01f33a05.tar.gz
SERVER-28822 Add new dbclient connection pool options
-rw-r--r--src/mongo/client/SConscript11
-rw-r--r--src/mongo/client/connpool.cpp219
-rw-r--r--src/mongo/client/connpool.h102
-rw-r--r--src/mongo/client/connpool_integration_test.cpp173
-rw-r--r--src/mongo/db/conn_pool_options.cpp32
-rw-r--r--src/mongo/db/conn_pool_options.h26
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/s/server.cpp3
8 files changed, 488 insertions, 82 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index ffe65b7cc89..d034b9698aa 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -193,6 +193,17 @@ clientDriverEnv.Library(
],
)
+env.CppIntegrationTest(
+ target='connpool_integration_test',
+ source=[
+ 'connpool_integration_test.cpp',
+ ],
+ LIBDEPS=[
+ 'clientdriver',
+ '$BUILD_DIR/mongo/util/version_impl',
+ ],
+)
+
env.Library(
target='connection_pool',
source=[
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 6c5a3d9cdfa..9f36cfccb1c 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -36,18 +36,25 @@
#include "mongo/client/connpool.h"
+#include <limits>
#include <string>
#include "mongo/client/connection_string.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/executor/connection_pool_stats.h"
+#include "mongo/stdx/chrono.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
namespace mongo {
+namespace {
+const int kDefaultIdleTimeout = std::numeric_limits<int>::max();
+const int kDefaultMaxInUse = std::numeric_limits<int>::max();
+} // namespace
+
using std::endl;
using std::list;
using std::map;
@@ -57,6 +64,17 @@ using std::vector;
// ------ PoolForHost ------
+PoolForHost::PoolForHost()
+ : _created(0),
+ _minValidCreationTimeMicroSec(0),
+ _type(ConnectionString::INVALID),
+ _maxPoolSize(kPoolSizeUnlimited),
+ _maxInUse(kDefaultMaxInUse),
+ _checkedOut(0),
+ _badConns(0),
+ _parentDestroyed(false),
+ _inShutdown(false) {}
+
PoolForHost::~PoolForHost() {
clear();
}
@@ -64,7 +82,7 @@ PoolForHost::~PoolForHost() {
void PoolForHost::clear() {
if (!_parentDestroyed) {
logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)";
+ << _socketTimeoutSecs << " seconds)";
}
_pool = decltype(_pool){};
@@ -86,14 +104,14 @@ void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) {
if (isFailed || isBroken) {
_badConns++;
logNoCache() << "Ending connection to host " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)"
+ << _socketTimeoutSecs << " seconds)"
<< " due to bad connection status; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
} else if (_maxPoolSize >= 0 && static_cast<int>(_pool.size()) >= _maxPoolSize) {
// We have a pool size that we need to enforce
logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)"
+ << _socketTimeoutSecs << " seconds)"
<< " because the pool meets constraints; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
@@ -143,13 +161,13 @@ void PoolForHost::flush() {
clear();
}
-void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) {
+void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*>& stale) {
vector<StoredConnection> all;
while (!_pool.empty()) {
StoredConnection c = std::move(_pool.top());
_pool.pop();
- if (c.ok()) {
+ if (c.ok() && !c.addedBefore(idleThreshold)) {
all.push_back(std::move(c));
} else {
_badConns++;
@@ -164,13 +182,17 @@ void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) {
PoolForHost::StoredConnection::StoredConnection(std::unique_ptr<DBClientBase> c)
- : conn(std::move(c)), when(time(nullptr)) {}
+ : conn(std::move(c)), added(Date_t::now()) {}
bool PoolForHost::StoredConnection::ok() {
// Poke the connection to see if we're still ok
return conn->isStillConnected();
}
+bool PoolForHost::StoredConnection::addedBefore(Date_t time) {
+ return added < time;
+}
+
void PoolForHost::createdOne(DBClientBase* base) {
if (_created == 0)
_type = base->type();
@@ -186,6 +208,80 @@ void PoolForHost::initializeHostName(const std::string& hostName) {
}
}
+void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk) {
+ auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); };
+
+ if (timeout > 0) {
+ stdx::chrono::seconds timeoutSeconds{timeout};
+
+ // If we timed out waiting without getting a new connection, throw.
+ uassert(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "too many connections to " << _hostName << ":" << timeout,
+ !_cv.wait_for(lk, timeoutSeconds, condition));
+ } else {
+ _cv.wait(lk, condition);
+ }
+}
+
+void PoolForHost::notifyWaiters() {
+ _cv.notify_one();
+}
+
+void PoolForHost::shutdown() {
+ _inShutdown.store(true);
+ _cv.notify_all();
+}
+
+// ------ DBConnectionPool::Detail ------
+
+class DBConnectionPool::Detail {
+public:
+ template <typename Connect>
+ static DBClientBase* get(DBConnectionPool* _this,
+ const std::string& host,
+ double timeout,
+ Connect connect) {
+ while (!(_this->_inShutdown.load())) {
+ // Get a connection from the pool, if there is one.
+ std::unique_ptr<DBClientBase> c(_this->_get(host, timeout));
+ if (c) {
+ // This call may throw.
+ _this->onHandedOut(c.get());
+ return c.release();
+ }
+
+ // If there are no pooled connections for this host, create a new connection. If
+ // there are too many connections in this pool to make a new one, block until a
+ // connection is released.
+ {
+ stdx::unique_lock<stdx::mutex> lk(_this->_mutex);
+ PoolForHost& p = _this->_pools[PoolKey(host, timeout)];
+
+ if (p.openConnections() >= _this->_maxInUse) {
+ log() << "Too many in-use connections; waiting until there are fewer than "
+ << _this->_maxInUse;
+ p.waitForFreeConnection(timeout, lk);
+ } else {
+ // Drop the lock here, so we can connect without holding it.
+ // _finishCreate will take the lock again.
+ lk.unlock();
+
+ // Create a new connection and return. All Connect functions
+ // should throw if they cannot create a connection.
+ auto c = connect();
+ invariant(c);
+ return _this->_finishCreate(host, timeout, c);
+ }
+ }
+ }
+
+ // If we get here, we are in shutdown, and it does not matter what we return.
+ invariant(_this->_inShutdown.load());
+ uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false);
+ MONGO_UNREACHABLE;
+ }
+};
+
// ------ DBConnectionPool ------
const int PoolForHost::kPoolSizeUnlimited(-1);
@@ -193,7 +289,22 @@ const int PoolForHost::kPoolSizeUnlimited(-1);
DBConnectionPool::DBConnectionPool()
: _name("dbconnectionpool"),
_maxPoolSize(PoolForHost::kPoolSizeUnlimited),
- _hooks(new list<DBConnectionHook*>()) {}
+ _maxInUse(kDefaultMaxInUse),
+ _idleTimeout(kDefaultIdleTimeout),
+ _inShutdown(false),
+ _hooks(new list<DBConnectionHook*>())
+
+{}
+
+void DBConnectionPool::shutdown() {
+ if (!_inShutdown.swap(true)) {
+ stdx::lock_guard<stdx::mutex> L(_mutex);
+ for (auto i = _pools.begin(); i != _pools.end(); i++) {
+ PoolForHost& p = i->second;
+ p.shutdown();
+ }
+ }
+}
DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) {
uassert(ErrorCodes::ShutdownInProgress,
@@ -240,65 +351,44 @@ DBClientBase* DBConnectionPool::_finishCreate(const string& ident,
}
DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
- // If a connection for this host is available from the underlying PoolForHost, use the
- // connection in the pool.
- DBClientBase* c = _get(url.toString(), socketTimeout);
- if (c) {
- try {
- onHandedOut(c);
- } catch (std::exception&) {
- delete c;
- throw;
- }
+ auto connect = [&]() {
+ string errmsg;
+ auto c = url.connect(StringData(), errmsg, socketTimeout).release();
+ uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
return c;
- }
+ };
- // If no connections for this host are available in the PoolForHost (that is, all the
- // connections have been checked out, or none have been created yet), create a new connection.
- string errmsg;
- c = url.connect(StringData(), errmsg, socketTimeout).release();
- uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
-
- return _finishCreate(url.toString(), socketTimeout, c);
+ return Detail::get(this, url.toString(), socketTimeout, connect);
}
DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
- DBClientBase* c = _get(host, socketTimeout);
- if (c) {
- try {
- onHandedOut(c);
- } catch (std::exception&) {
- delete c;
- throw;
+ auto connect = [&] {
+ const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
+
+ string errmsg;
+ auto c = cs.connect(StringData(), errmsg, socketTimeout).release();
+ if (!c) {
+ throw SocketException(SocketException::CONNECT_ERROR,
+ host,
+ 11002,
+ str::stream() << _name << " error: " << errmsg);
}
- return c;
- }
-
- const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
- string errmsg;
- c = cs.connect(StringData(), errmsg, socketTimeout).release();
- if (!c)
- throw SocketException(SocketException::CONNECT_ERROR,
- host,
- 11002,
- str::stream() << _name << " error: " << errmsg);
+ return c;
+ };
- return _finishCreate(host, socketTimeout, c);
+ return Detail::get(this, host, socketTimeout, connect);
}
DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) {
- std::unique_ptr<DBClientBase> c(_get(uri.toString(), socketTimeout));
- if (c) {
- onHandedOut(c.get());
+ auto connect = [&] {
+ string errmsg;
+ std::unique_ptr<DBClientBase> c(uri.connect(StringData(), errmsg, socketTimeout));
+ uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
return c.release();
- }
-
- string errmsg;
- c = std::unique_ptr<DBClientBase>(uri.connect(StringData(), errmsg, socketTimeout));
- uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
+ };
- return _finishCreate(uri.toString(), socketTimeout, c.release());
+ return Detail::get(this, uri.toString(), socketTimeout, connect);
}
int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const {
@@ -326,10 +416,13 @@ void DBConnectionPool::onRelease(DBClientBase* conn) {
void DBConnectionPool::release(const string& host, DBClientBase* c) {
onRelease(c);
- stdx::lock_guard<stdx::mutex> L(_mutex);
- _pools[PoolKey(host, c->getSoTimeout())].done(this, c);
-}
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())];
+ p.done(this, c);
+ lk.unlock();
+ p.notifyWaiters();
+}
DBConnectionPool::~DBConnectionPool() {
// Do not log in destruction, because global connection pools get
@@ -482,13 +575,13 @@ bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* co
void DBConnectionPool::taskDoWork() {
vector<DBClientBase*> toDelete;
-
+ auto idleThreshold = Date_t::now() - _idleTimeout;
{
// we need to get the connections inside the lock
// but we can actually delete them outside
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) {
- i->second.getStaleConnections(toDelete);
+ i->second.getStaleConnections(idleThreshold, toDelete);
}
}
@@ -505,21 +598,23 @@ void DBConnectionPool::taskDoWork() {
// ------ ScopedDbConnection ------
ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout)
- : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeout(socketTimeout) {
+ : _host(host),
+ _conn(globalConnPool.get(host, socketTimeout)),
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout)
: _host(host.toString()),
_conn(globalConnPool.get(host, socketTimeout)),
- _socketTimeout(socketTimeout) {
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout)
: _host(uri.toString()),
_conn(globalConnPool.get(uri, socketTimeout)),
- _socketTimeout(socketTimeout) {
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
@@ -537,7 +632,7 @@ void ScopedDbConnection::_setSocketTimeout() {
return;
if (_conn->type() == ConnectionString::MASTER)
- static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout);
+ static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeoutSecs);
}
ScopedDbConnection::~ScopedDbConnection() {
diff --git a/src/mongo/client/connpool.h b/src/mongo/client/connpool.h
index 68af31a5225..9d428dcab36 100644
--- a/src/mongo/client/connpool.h
+++ b/src/mongo/client/connpool.h
@@ -37,6 +37,7 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/util/background.h"
#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -52,7 +53,9 @@ struct ConnectionPoolStats;
* host. It is not responsible for creating new connections; instead, when DBConnectionPool is asked
* for a connection to a particular host, DBConnectionPool will check if any connections are
* available in the PoolForHost for that host. If so, DBConnectionPool will check out a connection
- * from PoolForHost, and if not, DBConnectionPool will create a new connection itself.
+ * from PoolForHost, and if not, DBConnectionPool will create a new connection itself, if we are
+ * below the maximum allowed number of connections. If we have already created _maxPoolSize
+ * connections, the calling thread will block until a new connection can be made for it.
*
* Once the connection is released back to DBConnectionPool, DBConnectionPool will attempt to
* release the connection to PoolForHost. This is how connections enter PoolForHost for the first
@@ -74,15 +77,7 @@ public:
friend class DBConnectionPool;
- PoolForHost()
- : _created(0),
- _minValidCreationTimeMicroSec(0),
- _type(ConnectionString::INVALID),
- _maxPoolSize(kPoolSizeUnlimited),
- _checkedOut(0),
- _badConns(0),
- _parentDestroyed(false) {}
-
+ PoolForHost();
~PoolForHost();
/**
@@ -107,10 +102,17 @@ public:
}
/**
- * Sets the socket timeout on this host, for reporting purposes only.
+ * Sets the maximum number of in-use connections for this pool.
+ */
+ void setMaxInUse(int maxInUse) {
+ _maxInUse = maxInUse;
+ }
+
+ /**
+ * Sets the socket timeout on this host, in seconds, for reporting purposes only.
*/
void setSocketTimeout(double socketTimeout) {
- _socketTimeout = socketTimeout;
+ _socketTimeoutSecs = socketTimeout;
}
int numAvailable() const {
@@ -150,7 +152,7 @@ public:
void flush();
- void getStaleConnections(std::vector<DBClientBase*>& stale);
+ void getStaleConnections(Date_t idleThreshold, std::vector<DBClientBase*>& stale);
/**
* Sets the lower bound for creation times that can be considered as
@@ -169,18 +171,45 @@ public:
*/
void initializeHostName(const std::string& hostName);
+ /**
+ * If this pool has more than _maxPoolSize connections in use, blocks
+ * the calling thread until a connection is returned to the pool or
+ * is destroyed. If a non-zero timeout is given, this method will
+ * throw if a free connection cannot be acquired within that amount of
+ * time. Timeout is in seconds.
+ */
+ void waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk);
+
+ /**
+ * Notifies any waiters that there are new connections available.
+ */
+ void notifyWaiters();
+
+ /**
+ * Shuts down this pool, notifying all waiters.
+ */
+ void shutdown();
+
private:
struct StoredConnection {
StoredConnection(std::unique_ptr<DBClientBase> c);
bool ok();
+ /**
+ * Returns true if this connection was added before the given time.
+ */
+ bool addedBefore(Date_t time);
+
std::unique_ptr<DBClientBase> conn;
- time_t when;
+
+ // The time when this connection was added to the pool. Will
+ // be reset if the connection is checked out and re-added.
+ Date_t added;
};
std::string _hostName;
- double _socketTimeout;
+ double _socketTimeoutSecs;
std::stack<StoredConnection> _pool;
int64_t _created;
@@ -190,6 +219,9 @@ private:
// The maximum number of connections we'll save in the pool
int _maxPoolSize;
+ // The maximum number of connections allowed to be in-use in this pool
+ int _maxInUse;
+
// The number of currently active connections from this pool
int _checkedOut;
@@ -198,6 +230,10 @@ private:
// Whether our parent DBConnectionPool object is in destruction
bool _parentDestroyed;
+
+ stdx::condition_variable _cv;
+
+ AtomicWord<bool> _inShutdown;
};
class DBConnectionHook {
@@ -259,6 +295,21 @@ public:
_maxPoolSize = maxPoolSize;
}
+ /**
+ * Sets the maximum number of in-use connections per host.
+ */
+ void setMaxInUse(int maxInUse) {
+ _maxInUse = maxInUse;
+ }
+
+ /**
+ * Sets the timeout value for idle connections, after which we will remove them
+ * from the pool. This value is in minutes.
+ */
+ void setIdleTimeout(int timeout) {
+ _idleTimeout = Minutes(timeout);
+ }
+
void onCreate(DBClientBase* conn);
void onHandedOut(DBClientBase* conn);
void onDestroy(DBClientBase* conn);
@@ -266,6 +317,9 @@ public:
void flush();
+ /**
+ * Gets a connection to the given host with the given timeout, in seconds.
+ */
DBClientBase* get(const std::string& host, double socketTimeout = 0);
DBClientBase* get(const ConnectionString& host, double socketTimeout = 0);
DBClientBase* get(const MongoURI& uri, double socketTimeout = 0);
@@ -310,7 +364,14 @@ public:
}
virtual void taskDoWork();
+ /**
+ * Shuts down the connection pool, unblocking any waiters on connections.
+ */
+ void shutdown();
+
private:
+ class Detail;
+
DBConnectionPool(DBConnectionPool& p);
DBClientBase* _get(const std::string& ident, double socketTimeout);
@@ -337,8 +398,13 @@ private:
// 0 effectively disables the pool
int _maxPoolSize;
+ int _maxInUse;
+ Minutes _idleTimeout;
+
PoolMap _pools;
+ AtomicWord<bool> _inShutdown;
+
// pointers owned by me, right now they leak on shutdown
// _hooks itself also leaks because it creates a shutdown race condition
std::list<DBConnectionHook*>* _hooks;
@@ -388,11 +454,11 @@ public:
explicit ScopedDbConnection(const ConnectionString& host, double socketTimeout = 0);
explicit ScopedDbConnection(const MongoURI& host, double socketTimeout = 0);
- ScopedDbConnection() : _host(""), _conn(0), _socketTimeout(0) {}
+ ScopedDbConnection() : _host(""), _conn(0), _socketTimeoutSecs(0) {}
/* @param conn - bind to an existing connection */
ScopedDbConnection(const std::string& host, DBClientBase* conn, double socketTimeout = 0)
- : _host(host), _conn(conn), _socketTimeout(socketTimeout) {
+ : _host(host), _conn(conn), _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
@@ -447,7 +513,7 @@ private:
const std::string _host;
DBClientBase* _conn;
- const double _socketTimeout;
+ const double _socketTimeoutSecs;
};
} // namespace mongo
diff --git a/src/mongo/client/connpool_integration_test.cpp b/src/mongo/client/connpool_integration_test.cpp
new file mode 100644
index 00000000000..758b3e99665
--- /dev/null
+++ b/src/mongo/client/connpool_integration_test.cpp
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/connpool.h"
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+TEST(ConnectionPoolTest, ConnectionPoolMaxInUseConnectionsTest) {
+ DBConnectionPool pool;
+
+ auto fixture = unittest::getFixtureConnectionString();
+ auto host = fixture.getServers()[0].toString();
+
+ stdx::condition_variable cv;
+ stdx::mutex mutex;
+ int counter = 0;
+
+ pool.setMaxInUse(2);
+
+ // Check out maxInUse connections.
+ auto conn1 = pool.get(host);
+ ASSERT(conn1);
+ auto conn2 = pool.get(host);
+ ASSERT(conn2);
+
+ // Try creating a new one, should block until we release one.
+ stdx::thread t([&] {
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ counter++;
+ }
+
+ cv.notify_one();
+
+ auto conn3 = pool.get(host);
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ counter++;
+ }
+
+ cv.notify_one();
+ pool.release(host, conn3);
+ });
+
+ // First thread should be blocked.
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return counter == 1; });
+ }
+
+ // Return one to the pool, thread should be un-blocked.
+ pool.release(host, conn2);
+
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return counter == 2; });
+ }
+
+ t.join();
+}
+
+TEST(ConnectionPoolTest, ConnectionPoolMaxInUseTimeoutTest) {
+ DBConnectionPool pool;
+
+ auto fixture = unittest::getFixtureConnectionString();
+ auto host = fixture.getServers()[0].toString();
+
+ pool.setMaxInUse(2);
+
+ // Check out maxInUse connections.
+ auto conn1 = pool.get(host, 1);
+ ASSERT(conn1);
+ auto conn2 = pool.get(host, 1);
+ ASSERT(conn2);
+
+ // Try creating a new connection with a 1-second timeout, should block,
+ // then should time out.
+ ASSERT_THROWS(pool.get(host, 1), AssertionException);
+}
+
+TEST(ConnectionPoolTest, ConnectionPoolShutdownLogicTest) {
+ DBConnectionPool pool;
+
+ auto fixture = unittest::getFixtureConnectionString();
+ auto host = fixture.getServers()[0].toString();
+
+ stdx::condition_variable cv;
+ stdx::mutex mutex;
+ int counter = 0;
+
+ pool.setMaxInUse(2);
+
+ // Check out maxInUse connections.
+ auto conn1 = pool.get(host);
+ ASSERT(conn1);
+ auto conn2 = pool.get(host);
+ ASSERT(conn2);
+
+ // Attempt to open a new connection, should block.
+ stdx::thread t([&] {
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ counter++;
+ }
+
+ cv.notify_one();
+
+ ASSERT_THROWS(pool.get(host), AssertionException);
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ counter++;
+ }
+
+ cv.notify_one();
+ });
+
+ // Wait for new thread to block.
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return counter == 1; });
+ }
+
+ // Shut down the pool, this should unblock our waiting connection.
+ pool.shutdown();
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return counter == 2; });
+ }
+
+ // Attempt to open a new connection, should fail.
+ ASSERT_THROWS(pool.get(host), AssertionException);
+
+ t.join();
+}
+
+} // namespace
+
+} // namespace mongo
diff --git a/src/mongo/db/conn_pool_options.cpp b/src/mongo/db/conn_pool_options.cpp
index 988d31efb5d..851687fc7ff 100644
--- a/src/mongo/db/conn_pool_options.cpp
+++ b/src/mongo/db/conn_pool_options.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/conn_pool_options.h"
+#include <limits>
+
#include "mongo/base/init.h"
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
@@ -41,6 +43,12 @@ namespace mongo {
int ConnPoolOptions::maxConnsPerHost(200);
int ConnPoolOptions::maxShardedConnsPerHost(200);
+int ConnPoolOptions::maxInUseConnsPerHost(std::numeric_limits<int>::max());
+int ConnPoolOptions::maxShardedInUseConnsPerHost(std::numeric_limits<int>::max());
+
+int ConnPoolOptions::globalConnPoolIdleTimeout(std::numeric_limits<int>::max());
+int ConnPoolOptions::shardedConnPoolIdleTimeout(std::numeric_limits<int>::max());
+
namespace {
ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
@@ -53,6 +61,26 @@ ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
"connPoolMaxShardedConnsPerHost",
&ConnPoolOptions::maxShardedConnsPerHost);
+ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
+ maxInUseConnsPerHostParameter(ServerParameterSet::getGlobal(),
+ "connPoolMaxInUseConnsPerHost",
+ &ConnPoolOptions::maxInUseConnsPerHost);
+
+ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
+ maxShardedInUseConnsPerHostParameter(ServerParameterSet::getGlobal(),
+ "connPoolMaxShardedInUseConnsPerHost",
+ &ConnPoolOptions::maxShardedInUseConnsPerHost);
+
+ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
+ globalConnPoolIdleTimeoutParameter(ServerParameterSet::getGlobal(),
+ "globalConnPoolIdleTimeoutMinutes",
+ &ConnPoolOptions::globalConnPoolIdleTimeout);
+
+ExportedServerParameter<int, ServerParameterType::kStartupOnly> //
+ shardedConnPoolIdleTimeoutParameter(ServerParameterSet::getGlobal(),
+ "shardedConnPoolIdleTimeoutMinutes",
+ &ConnPoolOptions::shardedConnPoolIdleTimeout);
+
MONGO_INITIALIZER(InitializeConnectionPools)(InitializerContext* context) {
// Initialize the sharded and unsharded outgoing connection pools
// NOTES:
@@ -62,9 +90,13 @@ MONGO_INITIALIZER(InitializeConnectionPools)(InitializerContext* context) {
globalConnPool.setName("connection pool");
globalConnPool.setMaxPoolSize(ConnPoolOptions::maxConnsPerHost);
+ globalConnPool.setMaxInUse(ConnPoolOptions::maxInUseConnsPerHost);
+ globalConnPool.setIdleTimeout(ConnPoolOptions::globalConnPoolIdleTimeout);
shardConnectionPool.setName("sharded connection pool");
shardConnectionPool.setMaxPoolSize(ConnPoolOptions::maxShardedConnsPerHost);
+ shardConnectionPool.setMaxInUse(ConnPoolOptions::maxShardedInUseConnsPerHost);
+ shardConnectionPool.setIdleTimeout(ConnPoolOptions::shardedConnPoolIdleTimeout);
return Status::OK();
}
diff --git a/src/mongo/db/conn_pool_options.h b/src/mongo/db/conn_pool_options.h
index faf10c0842c..9394a156d6a 100644
--- a/src/mongo/db/conn_pool_options.h
+++ b/src/mongo/db/conn_pool_options.h
@@ -28,6 +28,8 @@
#pragma once
+#include "mongo/util/duration.h"
+
namespace mongo {
// NOTE:
@@ -40,13 +42,33 @@ namespace mongo {
*/
struct ConnPoolOptions {
/**
- * Maximum connections per host the connection pool should use
+ * Maximum connections per host the connection pool should store.
*/
static int maxConnsPerHost;
/**
- * Maximum connections per host the sharded conn pool should use
+ * Maximum connections per host the sharded conn pool should store.
*/
static int maxShardedConnsPerHost;
+
+ /**
+ * Maximum in-use connections per host in the global connection pool.
+ */
+ static int maxInUseConnsPerHost;
+
+ /**
+ * Maximum in-use connections per host in the sharded connection pool.
+ */
+ static int maxShardedInUseConnsPerHost;
+
+ /**
+ * Amount of time, in minutes, to keep idle connections in the global connection pool.
+ */
+ static int globalConnPoolIdleTimeout;
+
+ /**
+ * Amount of time, in minutes, to keep idle connections in the sharded connection pool.
+ */
+ static int shardedConnPoolIdleTimeout;
};
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index eeb41cd5666..4fefe009b16 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -44,6 +44,7 @@
#include "mongo/base/init.h"
#include "mongo/base/initializer.h"
#include "mongo/base/status.h"
+#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/config.h"
#include "mongo/db/audit.h"
@@ -1230,6 +1231,9 @@ void shutdownTask() {
tl->shutdown();
}
+ // Shut down the global dbclient pool so callers stop waiting for connections.
+ globalConnPool.shutdown();
+
if (serviceContext->getGlobalStorageEngine()) {
ServiceContext::UniqueOperationContext uniqueOpCtx;
OperationContext* opCtx = client->getOperationContext();
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index e0a629c36e2..82e3ceb438b 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -238,6 +238,9 @@ static void cleanupTask() {
tl->shutdown();
}
+ // Shut down the global dbclient pool so callers stop waiting for connections.
+ shardConnectionPool.shutdown();
+
// Shutdown the Service Entry Point and its sessions and give it a grace period to complete.
if (auto sep = serviceContext->getServiceEntryPoint()) {
if (!sep->shutdown(Seconds(10))) {