summaryrefslogtreecommitdiff
path: root/src/mongo/client/connection_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/connection_pool.cpp')
-rw-r--r--src/mongo/client/connection_pool.cpp322
1 files changed, 153 insertions, 169 deletions
diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp
index 17628610135..de2cd347ec8 100644
--- a/src/mongo/client/connection_pool.cpp
+++ b/src/mongo/client/connection_pool.cpp
@@ -38,209 +38,193 @@
namespace mongo {
namespace {
- const Date_t kNeverTooStale = Date_t::max();
+const Date_t kNeverTooStale = Date_t::max();
- const Minutes kCleanUpInterval(5); // Note: Must be larger than kMaxConnectionAge below)
- const Seconds kMaxConnectionAge(30);
+const Minutes kCleanUpInterval(5); // Note: Must be larger than kMaxConnectionAge below)
+const Seconds kMaxConnectionAge(30);
-} // namespace
+} // namespace
- ConnectionPool::ConnectionPool(int messagingPortTags) : _messagingPortTags(messagingPortTags) {}
+ConnectionPool::ConnectionPool(int messagingPortTags) : _messagingPortTags(messagingPortTags) {}
- ConnectionPool::~ConnectionPool() {
- cleanUpOlderThan(Date_t::max());
+ConnectionPool::~ConnectionPool() {
+ cleanUpOlderThan(Date_t::max());
- invariant(_connections.empty());
- invariant(_inUseConnections.empty());
- }
+ invariant(_connections.empty());
+ invariant(_inUseConnections.empty());
+}
- void ConnectionPool::cleanUpOlderThan(Date_t now) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _cleanUpOlderThan_inlock(now);
- }
+void ConnectionPool::cleanUpOlderThan(Date_t now) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _cleanUpOlderThan_inlock(now);
+}
- void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now) {
- HostConnectionMap::iterator hostConns = _connections.begin();
- while (hostConns != _connections.end()) {
- _cleanUpOlderThan_inlock(now, &hostConns->second);
- if (hostConns->second.empty()) {
- _connections.erase(hostConns++);
- }
- else {
- ++hostConns;
- }
+void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now) {
+ HostConnectionMap::iterator hostConns = _connections.begin();
+ while (hostConns != _connections.end()) {
+ _cleanUpOlderThan_inlock(now, &hostConns->second);
+ if (hostConns->second.empty()) {
+ _connections.erase(hostConns++);
+ } else {
+ ++hostConns;
}
}
-
- void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns) {
- ConnectionList::iterator iter = hostConns->begin();
- while (iter != hostConns->end()) {
- if (_shouldKeepConnection(now, *iter)) {
- ++iter;
- }
- else {
- _destroyConnection_inlock(hostConns, iter++);
- }
+}
+
+void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns) {
+ ConnectionList::iterator iter = hostConns->begin();
+ while (iter != hostConns->end()) {
+ if (_shouldKeepConnection(now, *iter)) {
+ ++iter;
+ } else {
+ _destroyConnection_inlock(hostConns, iter++);
}
}
+}
- bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const {
- const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge;
- if (expirationDate <= now) {
- return false;
- }
-
- return true;
+bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const {
+ const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge;
+ if (expirationDate <= now) {
+ return false;
}
- void ConnectionPool::closeAllInUseConnections() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- for (ConnectionList::iterator iter = _inUseConnections.begin();
- iter != _inUseConnections.end();
- ++iter) {
+ return true;
+}
- iter->conn->port().shutdown();
- }
+void ConnectionPool::closeAllInUseConnections() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end();
+ ++iter) {
+ iter->conn->port().shutdown();
}
-
- void ConnectionPool::_cleanUpStaleHosts_inlock(Date_t now) {
- if (now > _lastCleanUpTime + kCleanUpInterval) {
- for (HostLastUsedMap::iterator itr = _lastUsedHosts.begin();
- itr != _lastUsedHosts.end();
- itr++) {
-
- if (itr->second <= _lastCleanUpTime) {
- ConnectionList connList = _connections.find(itr->first)->second;
- _cleanUpOlderThan_inlock(now, &connList);
- invariant(connList.empty());
- itr->second = kNeverTooStale;
- }
+}
+
+void ConnectionPool::_cleanUpStaleHosts_inlock(Date_t now) {
+ if (now > _lastCleanUpTime + kCleanUpInterval) {
+ for (HostLastUsedMap::iterator itr = _lastUsedHosts.begin(); itr != _lastUsedHosts.end();
+ itr++) {
+ if (itr->second <= _lastCleanUpTime) {
+ ConnectionList connList = _connections.find(itr->first)->second;
+ _cleanUpOlderThan_inlock(now, &connList);
+ invariant(connList.empty());
+ itr->second = kNeverTooStale;
}
-
- _lastCleanUpTime = now;
}
- }
-
- ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection(
- const HostAndPort& target,
- Date_t now,
- Milliseconds timeout) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- // Clean up connections on stale/unused hosts
- _cleanUpStaleHosts_inlock(now);
-
- for (HostConnectionMap::iterator hostConns;
- ((hostConns = _connections.find(target)) != _connections.end());) {
+ _lastCleanUpTime = now;
+ }
+}
+
+ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection(
+ const HostAndPort& target, Date_t now, Milliseconds timeout) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // Clean up connections on stale/unused hosts
+ _cleanUpStaleHosts_inlock(now);
+
+ for (HostConnectionMap::iterator hostConns;
+ ((hostConns = _connections.find(target)) != _connections.end());) {
+ // Clean up the requested host to remove stale/unused connections
+ _cleanUpOlderThan_inlock(now, &hostConns->second);
+ if (hostConns->second.empty()) {
+ // prevent host from causing unnecessary cleanups
+ _lastUsedHosts[hostConns->first] = kNeverTooStale;
+ break;
+ }
- // Clean up the requested host to remove stale/unused connections
- _cleanUpOlderThan_inlock(now, &hostConns->second);
- if (hostConns->second.empty()) {
- // prevent host from causing unnecessary cleanups
- _lastUsedHosts[hostConns->first] = kNeverTooStale;
- break;
- }
+ _inUseConnections.splice(
+ _inUseConnections.begin(), hostConns->second, hostConns->second.begin());
- _inUseConnections.splice(_inUseConnections.begin(),
- hostConns->second,
- hostConns->second.begin());
-
- const ConnectionList::iterator candidate = _inUseConnections.begin();
- lk.unlock();
- try {
- if (candidate->conn->isStillConnected()) {
- // setSoTimeout takes a double representing the number of seconds for send and
- // receive timeouts. Thus, we must take count() and divide by
- // 1000.0 to get the number of seconds with a fractional part.
- candidate->conn->setSoTimeout(timeout.count() / 1000.0);
- return candidate;
- }
- }
- catch (...) {
- lk.lock();
- _destroyConnection_inlock(&_inUseConnections, candidate);
- throw;
+ const ConnectionList::iterator candidate = _inUseConnections.begin();
+ lk.unlock();
+ try {
+ if (candidate->conn->isStillConnected()) {
+ // setSoTimeout takes a double representing the number of seconds for send and
+ // receive timeouts. Thus, we must take count() and divide by
+ // 1000.0 to get the number of seconds with a fractional part.
+ candidate->conn->setSoTimeout(timeout.count() / 1000.0);
+ return candidate;
}
-
+ } catch (...) {
lk.lock();
_destroyConnection_inlock(&_inUseConnections, candidate);
- }
-
- // No idle connection in the pool; make a new one.
- lk.unlock();
- std::unique_ptr<DBClientConnection> conn(new DBClientConnection);
-
- // setSoTimeout takes a double representing the number of seconds for send and receive
- // timeouts. Thus, we must take count() and divide by 1000.0 to get the number
- // of seconds with a fractional part.
- conn->setSoTimeout(timeout.count() / 1000.0);
- std::string errmsg;
- uassert(28640,
- str::stream() << "Failed attempt to connect to "
- << target.toString() << "; " << errmsg,
- conn->connect(target, errmsg));
-
- conn->port().tag |= _messagingPortTags;
-
- if (getGlobalAuthorizationManager()->isAuthEnabled()) {
- uassert(ErrorCodes::AuthenticationFailed,
- "Missing credentials for authenticating as internal user",
- isInternalAuthSet());
- conn->auth(getInternalUserAuthParamsWithFallback());
+ throw;
}
lk.lock();
- return _inUseConnections.insert(_inUseConnections.begin(),
- ConnectionInfo(conn.release(), now));
- }
-
- void ConnectionPool::releaseConnection(ConnectionList::iterator iter, const Date_t now) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!_shouldKeepConnection(now, *iter)) {
- _destroyConnection_inlock(&_inUseConnections, iter);
- return;
- }
-
- ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()];
- _cleanUpOlderThan_inlock(now, &hostConns);
- hostConns.splice(hostConns.begin(), _inUseConnections, iter);
- _lastUsedHosts[iter->conn->getServerHostAndPort()] = now;
+ _destroyConnection_inlock(&_inUseConnections, candidate);
}
- void ConnectionPool::destroyConnection(ConnectionList::iterator iter) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _destroyConnection_inlock(&_inUseConnections, iter);
+ // No idle connection in the pool; make a new one.
+ lk.unlock();
+ std::unique_ptr<DBClientConnection> conn(new DBClientConnection);
+
+ // setSoTimeout takes a double representing the number of seconds for send and receive
+ // timeouts. Thus, we must take count() and divide by 1000.0 to get the number
+ // of seconds with a fractional part.
+ conn->setSoTimeout(timeout.count() / 1000.0);
+ std::string errmsg;
+ uassert(28640,
+ str::stream() << "Failed attempt to connect to " << target.toString() << "; " << errmsg,
+ conn->connect(target, errmsg));
+
+ conn->port().tag |= _messagingPortTags;
+
+ if (getGlobalAuthorizationManager()->isAuthEnabled()) {
+ uassert(ErrorCodes::AuthenticationFailed,
+ "Missing credentials for authenticating as internal user",
+ isInternalAuthSet());
+ conn->auth(getInternalUserAuthParamsWithFallback());
}
- void ConnectionPool::_destroyConnection_inlock(ConnectionList* connList,
- ConnectionList::iterator iter) {
- delete iter->conn;
- connList->erase(iter);
- }
-
-
- //
- // ConnectionPool::ConnectionPtr
- //
-
- ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPool* pool,
- const HostAndPort& target,
- Date_t now,
- Milliseconds timeout)
- : _pool(pool),
- _connInfo(pool->acquireConnection(target, now, timeout)) {
+ lk.lock();
+ return _inUseConnections.insert(_inUseConnections.begin(), ConnectionInfo(conn.release(), now));
+}
+void ConnectionPool::releaseConnection(ConnectionList::iterator iter, const Date_t now) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_shouldKeepConnection(now, *iter)) {
+ _destroyConnection_inlock(&_inUseConnections, iter);
+ return;
}
- ConnectionPool::ConnectionPtr::~ConnectionPtr() {
- if (_pool) {
- _pool->destroyConnection(_connInfo);
- }
+ ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()];
+ _cleanUpOlderThan_inlock(now, &hostConns);
+ hostConns.splice(hostConns.begin(), _inUseConnections, iter);
+ _lastUsedHosts[iter->conn->getServerHostAndPort()] = now;
+}
+
+void ConnectionPool::destroyConnection(ConnectionList::iterator iter) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _destroyConnection_inlock(&_inUseConnections, iter);
+}
+
+void ConnectionPool::_destroyConnection_inlock(ConnectionList* connList,
+ ConnectionList::iterator iter) {
+ delete iter->conn;
+ connList->erase(iter);
+}
+
+
+//
+// ConnectionPool::ConnectionPtr
+//
+
+ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPool* pool,
+ const HostAndPort& target,
+ Date_t now,
+ Milliseconds timeout)
+ : _pool(pool), _connInfo(pool->acquireConnection(target, now, timeout)) {}
+
+ConnectionPool::ConnectionPtr::~ConnectionPtr() {
+ if (_pool) {
+ _pool->destroyConnection(_connInfo);
}
+}
- void ConnectionPool::ConnectionPtr::done(Date_t now) {
- _pool->releaseConnection(_connInfo, now);
- _pool = NULL;
- }
+void ConnectionPool::ConnectionPtr::done(Date_t now) {
+ _pool->releaseConnection(_connInfo, now);
+ _pool = NULL;
+}
-} // namespace mongo
+} // namespace mongo