summaryrefslogtreecommitdiff
path: root/src/mongo/client/connpool.cpp
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-12-20 11:37:25 -0500
committerJason Carey <jcarey@argv.me>2018-02-01 11:36:21 -0500
commit48a34a495386b7cbe18419313768929d12028125 (patch)
tree3f6bbbf5055b4380bc356243d92887565231b3ab /src/mongo/client/connpool.cpp
parent255b7bfbd5d8a566b52c70f9209f82067afdedca (diff)
downloadmongo-48a34a495386b7cbe18419313768929d12028125.tar.gz
SERVER-28822 Add new dbclient connection pool options
Diffstat (limited to 'src/mongo/client/connpool.cpp')
-rw-r--r--src/mongo/client/connpool.cpp216
1 files changed, 156 insertions, 60 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 7dc8eaed3fe..e4938ebcc0a 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -36,18 +36,25 @@
#include "mongo/client/connpool.h"
+#include <limits>
#include <string>
#include "mongo/client/connection_string.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/executor/connection_pool_stats.h"
+#include "mongo/stdx/chrono.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
namespace mongo {
+namespace {
+const int kDefaultIdleTimeout = std::numeric_limits<int>::max();
+const int kDefaultMaxInUse = std::numeric_limits<int>::max();
+} // namespace
+
using std::endl;
using std::list;
using std::map;
@@ -57,6 +64,17 @@ using std::vector;
// ------ PoolForHost ------
+PoolForHost::PoolForHost()
+ : _created(0),
+ _minValidCreationTimeMicroSec(0),
+ _type(ConnectionString::INVALID),
+ _maxPoolSize(kPoolSizeUnlimited),
+ _maxInUse(kDefaultMaxInUse),
+ _checkedOut(0),
+ _badConns(0),
+ _parentDestroyed(false),
+ _inShutdown(false) {}
+
PoolForHost::~PoolForHost() {
clear();
}
@@ -64,7 +82,7 @@ PoolForHost::~PoolForHost() {
void PoolForHost::clear() {
if (!_parentDestroyed) {
logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)";
+ << _socketTimeoutSecs << " seconds)";
}
_pool = decltype(_pool){};
@@ -86,14 +104,14 @@ void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) {
if (isFailed || isBroken) {
_badConns++;
logNoCache() << "Ending connection to host " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)"
+ << _socketTimeoutSecs << " seconds)"
<< " due to bad connection status; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
} else if (_maxPoolSize >= 0 && static_cast<int>(_pool.size()) >= _maxPoolSize) {
// We have a pool size that we need to enforce
logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of "
- << _socketTimeout << " seconds)"
+ << _socketTimeoutSecs << " seconds)"
<< " because the pool meets constraints; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
@@ -143,13 +161,13 @@ void PoolForHost::flush() {
clear();
}
-void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) {
+void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*>& stale) {
vector<StoredConnection> all;
while (!_pool.empty()) {
StoredConnection c = std::move(_pool.top());
_pool.pop();
- if (c.ok()) {
+ if (c.ok() && !c.addedBefore(idleThreshold)) {
all.push_back(std::move(c));
} else {
_badConns++;
@@ -164,13 +182,17 @@ void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) {
PoolForHost::StoredConnection::StoredConnection(std::unique_ptr<DBClientBase> c)
- : conn(std::move(c)), when(time(nullptr)) {}
+ : conn(std::move(c)), added(Date_t::now()) {}
bool PoolForHost::StoredConnection::ok() {
// Poke the connection to see if we're still ok
return conn->isStillConnected();
}
+bool PoolForHost::StoredConnection::addedBefore(Date_t time) {
+ return added < time;
+}
+
void PoolForHost::createdOne(DBClientBase* base) {
if (_created == 0)
_type = base->type();
@@ -186,6 +208,80 @@ void PoolForHost::initializeHostName(const std::string& hostName) {
}
}
+void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk) {
+ auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); };
+
+ if (timeout > 0) {
+ stdx::chrono::seconds timeoutSeconds{timeout};
+
+ // If we timed out waiting without getting a new connection, throw.
+ uassert(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "too many connections to " << _hostName << ":" << timeout,
+ !_cv.wait_for(lk, timeoutSeconds, condition));
+ } else {
+ _cv.wait(lk, condition);
+ }
+}
+
+void PoolForHost::notifyWaiters() {
+ _cv.notify_one();
+}
+
+void PoolForHost::shutdown() {
+ _inShutdown.store(true);
+ _cv.notify_all();
+}
+
+// ------ DBConnectionPool::Detail ------
+
+class DBConnectionPool::Detail {
+public:
+ template <typename Connect>
+ static DBClientBase* get(DBConnectionPool* _this,
+ const std::string& host,
+ double timeout,
+ Connect connect) {
+ while (!(_this->_inShutdown.load())) {
+ // Get a connection from the pool, if there is one.
+ std::unique_ptr<DBClientBase> c(_this->_get(host, timeout));
+ if (c) {
+ // This call may throw.
+ _this->onHandedOut(c.get());
+ return c.release();
+ }
+
+ // If there are no pooled connections for this host, create a new connection. If
+ // there are too many connections in this pool to make a new one, block until a
+ // connection is released.
+ {
+ stdx::unique_lock<stdx::mutex> lk(_this->_mutex);
+ PoolForHost& p = _this->_pools[PoolKey(host, timeout)];
+
+ if (p.openConnections() >= _this->_maxInUse) {
+ log() << "Too many in-use connections; waiting until there are fewer than "
+ << _this->_maxInUse;
+ p.waitForFreeConnection(timeout, lk);
+ } else {
+ // Drop the lock here, so we can connect without holding it.
+ // _finishCreate will take the lock again.
+ lk.unlock();
+
+ // Create a new connection and return. All Connect functions
+ // should throw if they cannot create a connection.
+ auto c = connect();
+ invariant(c);
+ return _this->_finishCreate(host, timeout, c);
+ }
+ }
+ }
+
+ // If we get here, we are in shutdown, and it does not matter what we return.
+ invariant(_this->_inShutdown.load());
+ uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false);
+ MONGO_UNREACHABLE;
+ }
+};
+
// ------ DBConnectionPool ------
const int PoolForHost::kPoolSizeUnlimited(-1);
@@ -193,7 +289,22 @@ const int PoolForHost::kPoolSizeUnlimited(-1);
DBConnectionPool::DBConnectionPool()
: _name("dbconnectionpool"),
_maxPoolSize(PoolForHost::kPoolSizeUnlimited),
- _hooks(new list<DBConnectionHook*>()) {}
+ _maxInUse(kDefaultMaxInUse),
+ _idleTimeout(kDefaultIdleTimeout),
+ _inShutdown(false),
+ _hooks(new list<DBConnectionHook*>())
+
+{}
+
+void DBConnectionPool::shutdown() {
+ if (!_inShutdown.swap(true)) {
+ stdx::lock_guard<stdx::mutex> L(_mutex);
+ for (auto i = _pools.begin(); i != _pools.end(); i++) {
+ PoolForHost& p = i->second;
+ p.shutdown();
+ }
+ }
+}
DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) {
uassert(ErrorCodes::ShutdownInProgress,
@@ -240,63 +351,43 @@ DBClientBase* DBConnectionPool::_finishCreate(const string& ident,
}
DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
- // If a connection for this host is available from the underlying PoolForHost, use the
- // connection in the pool.
- DBClientBase* c = _get(url.toString(), socketTimeout);
- if (c) {
- try {
- onHandedOut(c);
- } catch (std::exception&) {
- delete c;
- throw;
- }
+ auto connect = [&]() {
+ string errmsg;
+ auto c = url.connect(StringData(), errmsg, socketTimeout).release();
+ uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
return c;
- }
+ };
- // If no connections for this host are available in the PoolForHost (that is, all the
- // connections have been checked out, or none have been created yet), create a new connection.
- string errmsg;
- c = url.connect(StringData(), errmsg, socketTimeout).release();
- uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
-
- return _finishCreate(url.toString(), socketTimeout, c);
+ return Detail::get(this, url.toString(), socketTimeout, connect);
}
DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
- DBClientBase* c = _get(host, socketTimeout);
- if (c) {
- try {
- onHandedOut(c);
- } catch (std::exception&) {
- delete c;
- throw;
+ auto connect = [&] {
+ const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
+
+ string errmsg;
+ auto c = cs.connect(StringData(), errmsg, socketTimeout).release();
+ if (!c) {
+ throwSocketError(SocketErrorKind::CONNECT_ERROR,
+ host,
+ str::stream() << _name << " error: " << errmsg);
}
- return c;
- }
-
- const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
- string errmsg;
- c = cs.connect(StringData(), errmsg, socketTimeout).release();
- if (!c)
- throwSocketError(
- SocketErrorKind::CONNECT_ERROR, host, str::stream() << _name << " error: " << errmsg);
+ return c;
+ };
- return _finishCreate(host, socketTimeout, c);
+ return Detail::get(this, host, socketTimeout, connect);
}
DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) {
- std::unique_ptr<DBClientBase> c(_get(uri.toString(), socketTimeout));
- if (c) {
- onHandedOut(c.get());
+ auto connect = [&] {
+ string errmsg;
+ std::unique_ptr<DBClientBase> c(uri.connect(StringData(), errmsg, socketTimeout));
+ uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
return c.release();
- }
-
- string errmsg;
- c = std::unique_ptr<DBClientBase>(uri.connect(StringData(), errmsg, socketTimeout));
- uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
+ };
- return _finishCreate(uri.toString(), socketTimeout, c.release());
+ return Detail::get(this, uri.toString(), socketTimeout, connect);
}
int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const {
@@ -324,10 +415,13 @@ void DBConnectionPool::onRelease(DBClientBase* conn) {
void DBConnectionPool::release(const string& host, DBClientBase* c) {
onRelease(c);
- stdx::lock_guard<stdx::mutex> L(_mutex);
- _pools[PoolKey(host, c->getSoTimeout())].done(this, c);
-}
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())];
+ p.done(this, c);
+ lk.unlock();
+ p.notifyWaiters();
+}
DBConnectionPool::~DBConnectionPool() {
// Do not log in destruction, because global connection pools get
@@ -480,13 +574,13 @@ bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* co
void DBConnectionPool::taskDoWork() {
vector<DBClientBase*> toDelete;
-
+ auto idleThreshold = Date_t::now() - _idleTimeout;
{
// we need to get the connections inside the lock
// but we can actually delete them outside
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) {
- i->second.getStaleConnections(toDelete);
+ i->second.getStaleConnections(idleThreshold, toDelete);
}
}
@@ -503,21 +597,23 @@ void DBConnectionPool::taskDoWork() {
// ------ ScopedDbConnection ------
ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout)
- : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeout(socketTimeout) {
+ : _host(host),
+ _conn(globalConnPool.get(host, socketTimeout)),
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout)
: _host(host.toString()),
_conn(globalConnPool.get(host, socketTimeout)),
- _socketTimeout(socketTimeout) {
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout)
: _host(uri.toString()),
_conn(globalConnPool.get(uri, socketTimeout)),
- _socketTimeout(socketTimeout) {
+ _socketTimeoutSecs(socketTimeout) {
_setSocketTimeout();
}
@@ -535,7 +631,7 @@ void ScopedDbConnection::_setSocketTimeout() {
return;
if (_conn->type() == ConnectionString::MASTER)
- static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout);
+ static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeoutSecs);
}
ScopedDbConnection::~ScopedDbConnection() {