summaryrefslogtreecommitdiff
path: root/src/mongo/client/connpool.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-01-02 13:32:21 -0500
committerJason Carey <jcarey@argv.me>2018-01-02 13:36:01 -0500
commitb48f6b016aa09aed2dec3b964ef982af8ab11a4c (patch)
tree3017200f8bb30c1b6d75e3659613267bb6c13228 /src/mongo/client/connpool.cpp
parent071e4e8b7b3cee57bfa9b48c4320ca1abea7aa75 (diff)
downloadmongo-b48f6b016aa09aed2dec3b964ef982af8ab11a4c.tar.gz
Revert "SERVER-28822 Add new dbclient connection pool options"
This reverts commit 9e7df79d3907a743e29b7333de6ea08a01f33a05.
Diffstat (limited to 'src/mongo/client/connpool.cpp')
-rw-r--r--src/mongo/client/connpool.cpp219
1 files changed, 62 insertions, 157 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 9f36cfccb1c..6c5a3d9cdfa 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -36,25 +36,18 @@
#include "mongo/client/connpool.h"
-#include <limits>
#include <string>
#include "mongo/client/connection_string.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/executor/connection_pool_stats.h"
-#include "mongo/stdx/chrono.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
namespace mongo {
-namespace {
-const int kDefaultIdleTimeout = std::numeric_limits<int>::max();
-const int kDefaultMaxInUse = std::numeric_limits<int>::max();
-} // namespace
-
using std::endl;
using std::list;
using std::map;
@@ -64,17 +57,6 @@ using std::vector;
// ------ PoolForHost ------
-PoolForHost::PoolForHost()
- : _created(0),
- _minValidCreationTimeMicroSec(0),
- _type(ConnectionString::INVALID),
- _maxPoolSize(kPoolSizeUnlimited),
- _maxInUse(kDefaultMaxInUse),
- _checkedOut(0),
- _badConns(0),
- _parentDestroyed(false),
- _inShutdown(false) {}
-
PoolForHost::~PoolForHost() {
clear();
}
@@ -82,7 +64,7 @@ PoolForHost::~PoolForHost() {
void PoolForHost::clear() {
if (!_parentDestroyed) {
logNoCache() << "Dropping all pooled connections to " << _hostName << "(with timeout of "
- << _socketTimeoutSecs << " seconds)";
+ << _socketTimeout << " seconds)";
}
_pool = decltype(_pool){};
@@ -104,14 +86,14 @@ void PoolForHost::done(DBConnectionPool* pool, DBClientBase* c_raw) {
if (isFailed || isBroken) {
_badConns++;
logNoCache() << "Ending connection to host " << _hostName << "(with timeout of "
- << _socketTimeoutSecs << " seconds)"
+ << _socketTimeout << " seconds)"
<< " due to bad connection status; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
} else if (_maxPoolSize >= 0 && static_cast<int>(_pool.size()) >= _maxPoolSize) {
// We have a pool size that we need to enforce
logNoCache() << "Ending idle connection to host " << _hostName << "(with timeout of "
- << _socketTimeoutSecs << " seconds)"
+ << _socketTimeout << " seconds)"
<< " because the pool meets constraints; " << openConnections()
<< " connections to that host remain open";
pool->onDestroy(c.get());
@@ -161,13 +143,13 @@ void PoolForHost::flush() {
clear();
}
-void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*>& stale) {
+void PoolForHost::getStaleConnections(vector<DBClientBase*>& stale) {
vector<StoredConnection> all;
while (!_pool.empty()) {
StoredConnection c = std::move(_pool.top());
_pool.pop();
- if (c.ok() && !c.addedBefore(idleThreshold)) {
+ if (c.ok()) {
all.push_back(std::move(c));
} else {
_badConns++;
@@ -182,17 +164,13 @@ void PoolForHost::getStaleConnections(Date_t idleThreshold, vector<DBClientBase*
PoolForHost::StoredConnection::StoredConnection(std::unique_ptr<DBClientBase> c)
- : conn(std::move(c)), added(Date_t::now()) {}
+ : conn(std::move(c)), when(time(nullptr)) {}
bool PoolForHost::StoredConnection::ok() {
// Poke the connection to see if we're still ok
return conn->isStillConnected();
}
-bool PoolForHost::StoredConnection::addedBefore(Date_t time) {
- return added < time;
-}
-
void PoolForHost::createdOne(DBClientBase* base) {
if (_created == 0)
_type = base->type();
@@ -208,80 +186,6 @@ void PoolForHost::initializeHostName(const std::string& hostName) {
}
}
-void PoolForHost::waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk) {
- auto condition = [&] { return (numInUse() < _maxInUse || _inShutdown.load()); };
-
- if (timeout > 0) {
- stdx::chrono::seconds timeoutSeconds{timeout};
-
- // If we timed out waiting without getting a new connection, throw.
- uassert(ErrorCodes::ExceededTimeLimit,
- str::stream() << "too many connections to " << _hostName << ":" << timeout,
- !_cv.wait_for(lk, timeoutSeconds, condition));
- } else {
- _cv.wait(lk, condition);
- }
-}
-
-void PoolForHost::notifyWaiters() {
- _cv.notify_one();
-}
-
-void PoolForHost::shutdown() {
- _inShutdown.store(true);
- _cv.notify_all();
-}
-
-// ------ DBConnectionPool::Detail ------
-
-class DBConnectionPool::Detail {
-public:
- template <typename Connect>
- static DBClientBase* get(DBConnectionPool* _this,
- const std::string& host,
- double timeout,
- Connect connect) {
- while (!(_this->_inShutdown.load())) {
- // Get a connection from the pool, if there is one.
- std::unique_ptr<DBClientBase> c(_this->_get(host, timeout));
- if (c) {
- // This call may throw.
- _this->onHandedOut(c.get());
- return c.release();
- }
-
- // If there are no pooled connections for this host, create a new connection. If
- // there are too many connections in this pool to make a new one, block until a
- // connection is released.
- {
- stdx::unique_lock<stdx::mutex> lk(_this->_mutex);
- PoolForHost& p = _this->_pools[PoolKey(host, timeout)];
-
- if (p.openConnections() >= _this->_maxInUse) {
- log() << "Too many in-use connections; waiting until there are fewer than "
- << _this->_maxInUse;
- p.waitForFreeConnection(timeout, lk);
- } else {
- // Drop the lock here, so we can connect without holding it.
- // _finishCreate will take the lock again.
- lk.unlock();
-
- // Create a new connection and return. All Connect functions
- // should throw if they cannot create a connection.
- auto c = connect();
- invariant(c);
- return _this->_finishCreate(host, timeout, c);
- }
- }
- }
-
- // If we get here, we are in shutdown, and it does not matter what we return.
- invariant(_this->_inShutdown.load());
- uassert(ErrorCodes::ShutdownInProgress, "connection pool is in shutdown", false);
- MONGO_UNREACHABLE;
- }
-};
-
// ------ DBConnectionPool ------
const int PoolForHost::kPoolSizeUnlimited(-1);
@@ -289,22 +193,7 @@ const int PoolForHost::kPoolSizeUnlimited(-1);
DBConnectionPool::DBConnectionPool()
: _name("dbconnectionpool"),
_maxPoolSize(PoolForHost::kPoolSizeUnlimited),
- _maxInUse(kDefaultMaxInUse),
- _idleTimeout(kDefaultIdleTimeout),
- _inShutdown(false),
- _hooks(new list<DBConnectionHook*>())
-
-{}
-
-void DBConnectionPool::shutdown() {
- if (!_inShutdown.swap(true)) {
- stdx::lock_guard<stdx::mutex> L(_mutex);
- for (auto i = _pools.begin(); i != _pools.end(); i++) {
- PoolForHost& p = i->second;
- p.shutdown();
- }
- }
-}
+ _hooks(new list<DBConnectionHook*>()) {}
DBClientBase* DBConnectionPool::_get(const string& ident, double socketTimeout) {
uassert(ErrorCodes::ShutdownInProgress,
@@ -351,44 +240,65 @@ DBClientBase* DBConnectionPool::_finishCreate(const string& ident,
}
DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
- auto connect = [&]() {
- string errmsg;
- auto c = url.connect(StringData(), errmsg, socketTimeout).release();
- uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
+ // If a connection for this host is available from the underlying PoolForHost, use the
+ // connection in the pool.
+ DBClientBase* c = _get(url.toString(), socketTimeout);
+ if (c) {
+ try {
+ onHandedOut(c);
+ } catch (std::exception&) {
+ delete c;
+ throw;
+ }
return c;
- };
+ }
- return Detail::get(this, url.toString(), socketTimeout, connect);
+ // If no connections for this host are available in the PoolForHost (that is, all the
+ // connections have been checked out, or none have been created yet), create a new connection.
+ string errmsg;
+ c = url.connect(StringData(), errmsg, socketTimeout).release();
+ uassert(13328, _name + ": connect failed " + url.toString() + " : " + errmsg, c);
+
+ return _finishCreate(url.toString(), socketTimeout, c);
}
DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
- auto connect = [&] {
- const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
-
- string errmsg;
- auto c = cs.connect(StringData(), errmsg, socketTimeout).release();
- if (!c) {
- throw SocketException(SocketException::CONNECT_ERROR,
- host,
- 11002,
- str::stream() << _name << " error: " << errmsg);
+ DBClientBase* c = _get(host, socketTimeout);
+ if (c) {
+ try {
+ onHandedOut(c);
+ } catch (std::exception&) {
+ delete c;
+ throw;
}
-
return c;
- };
+ }
+
+ const ConnectionString cs(uassertStatusOK(ConnectionString::parse(host)));
- return Detail::get(this, host, socketTimeout, connect);
+ string errmsg;
+ c = cs.connect(StringData(), errmsg, socketTimeout).release();
+ if (!c)
+ throw SocketException(SocketException::CONNECT_ERROR,
+ host,
+ 11002,
+ str::stream() << _name << " error: " << errmsg);
+
+ return _finishCreate(host, socketTimeout, c);
}
DBClientBase* DBConnectionPool::get(const MongoURI& uri, double socketTimeout) {
- auto connect = [&] {
- string errmsg;
- std::unique_ptr<DBClientBase> c(uri.connect(StringData(), errmsg, socketTimeout));
- uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
+ std::unique_ptr<DBClientBase> c(_get(uri.toString(), socketTimeout));
+ if (c) {
+ onHandedOut(c.get());
return c.release();
- };
+ }
+
+ string errmsg;
+ c = std::unique_ptr<DBClientBase>(uri.connect(StringData(), errmsg, socketTimeout));
+ uassert(40356, _name + ": connect failed " + uri.toString() + " : " + errmsg, c);
- return Detail::get(this, uri.toString(), socketTimeout, connect);
+ return _finishCreate(uri.toString(), socketTimeout, c.release());
}
int DBConnectionPool::getNumAvailableConns(const string& host, double socketTimeout) const {
@@ -416,14 +326,11 @@ void DBConnectionPool::onRelease(DBClientBase* conn) {
void DBConnectionPool::release(const string& host, DBClientBase* c) {
onRelease(c);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- PoolForHost& p = _pools[PoolKey(host, c->getSoTimeout())];
- p.done(this, c);
-
- lk.unlock();
- p.notifyWaiters();
+ stdx::lock_guard<stdx::mutex> L(_mutex);
+ _pools[PoolKey(host, c->getSoTimeout())].done(this, c);
}
+
DBConnectionPool::~DBConnectionPool() {
// Do not log in destruction, because global connection pools get
// destroyed after the logging framework.
@@ -575,13 +482,13 @@ bool DBConnectionPool::isConnectionGood(const string& hostName, DBClientBase* co
void DBConnectionPool::taskDoWork() {
vector<DBClientBase*> toDelete;
- auto idleThreshold = Date_t::now() - _idleTimeout;
+
{
// we need to get the connections inside the lock
// but we can actually delete them outside
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (PoolMap::iterator i = _pools.begin(); i != _pools.end(); ++i) {
- i->second.getStaleConnections(idleThreshold, toDelete);
+ i->second.getStaleConnections(toDelete);
}
}
@@ -598,23 +505,21 @@ void DBConnectionPool::taskDoWork() {
// ------ ScopedDbConnection ------
ScopedDbConnection::ScopedDbConnection(const std::string& host, double socketTimeout)
- : _host(host),
- _conn(globalConnPool.get(host, socketTimeout)),
- _socketTimeoutSecs(socketTimeout) {
+ : _host(host), _conn(globalConnPool.get(host, socketTimeout)), _socketTimeout(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const ConnectionString& host, double socketTimeout)
: _host(host.toString()),
_conn(globalConnPool.get(host, socketTimeout)),
- _socketTimeoutSecs(socketTimeout) {
+ _socketTimeout(socketTimeout) {
_setSocketTimeout();
}
ScopedDbConnection::ScopedDbConnection(const MongoURI& uri, double socketTimeout)
: _host(uri.toString()),
_conn(globalConnPool.get(uri, socketTimeout)),
- _socketTimeoutSecs(socketTimeout) {
+ _socketTimeout(socketTimeout) {
_setSocketTimeout();
}
@@ -632,7 +537,7 @@ void ScopedDbConnection::_setSocketTimeout() {
return;
if (_conn->type() == ConnectionString::MASTER)
- static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeoutSecs);
+ static_cast<DBClientConnection*>(_conn)->setSoTimeout(_socketTimeout);
}
ScopedDbConnection::~ScopedDbConnection() {