/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/client/connection_pool.h" #include "mongo/client/connpool.h" #include "mongo/client/mongo_uri.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/internal_user_auth.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/rpc/reply_interface.h" #include "mongo/rpc/unique_message.h" namespace mongo { namespace { const Date_t kNeverTooStale = Date_t::max(); // TODO: Workaround for SERVER-19092. To be lowered back to 5 min / 30 sec once the bug is fixed const Hours kCleanUpInterval(1); // Note: Must be larger than kMaxConnectionAge below) const Minutes kMaxConnectionAge(30); } // namespace ConnectionPool::ConnectionPool(int messagingPortTags, std::unique_ptr hook) : _messagingPortTags(messagingPortTags), _hook(std::move(hook)) {} ConnectionPool::ConnectionPool(int messagingPortTags) : ConnectionPool(messagingPortTags, nullptr) {} ConnectionPool::~ConnectionPool() { cleanUpOlderThan(Date_t::max()); invariant(_connections.empty()); invariant(_inUseConnections.empty()); } void ConnectionPool::cleanUpOlderThan(Date_t now) { stdx::lock_guard lk(_mutex); HostConnectionMap::iterator hostConns = _connections.begin(); while (hostConns != _connections.end()) { _cleanUpOlderThan_inlock(now, &hostConns->second); if (hostConns->second.empty()) { _connections.erase(hostConns++); } else { ++hostConns; } } } void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns) { ConnectionList::iterator iter = hostConns->begin(); while (iter != hostConns->end()) { if (_shouldKeepConnection(now, *iter)) { ++iter; } else { _destroyConnection_inlock(hostConns, iter++); } } } bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const { const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge; if (expirationDate <= now) { return false; } return true; } void ConnectionPool::closeAllInUseConnections() { stdx::lock_guard lk(_mutex); for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end(); ++iter) { iter->conn->port().shutdown(); } } void ConnectionPool::_cleanUpStaleHosts_inlock(Date_t now) { if (now > _lastCleanUpTime + kCleanUpInterval) { for (HostLastUsedMap::iterator itr = _lastUsedHosts.begin(); itr != _lastUsedHosts.end(); itr++) { if (itr->second <= _lastCleanUpTime) { ConnectionList connList = _connections.find(itr->first)->second; _cleanUpOlderThan_inlock(now, &connList); invariant(connList.empty()); itr->second = kNeverTooStale; } } _lastCleanUpTime = now; } } ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection( const HostAndPort& target, Date_t now, Milliseconds timeout) { stdx::unique_lock lk(_mutex); // Clean up connections on stale/unused hosts _cleanUpStaleHosts_inlock(now); for (HostConnectionMap::iterator hostConns; (hostConns = _connections.find(target)) != _connections.end();) { // Clean up the requested host to remove stale/unused connections _cleanUpOlderThan_inlock(now, &hostConns->second); if (hostConns->second.empty()) { // prevent host from causing unnecessary cleanups _lastUsedHosts[hostConns->first] = kNeverTooStale; break; } _inUseConnections.splice( _inUseConnections.begin(), hostConns->second, hostConns->second.begin()); const ConnectionList::iterator candidate = _inUseConnections.begin(); lk.unlock(); try { if (candidate->conn->isStillConnected()) { // setSoTimeout takes a double representing the number of seconds for send and // receive timeouts. Thus, we must express 'timeout' in milliseconds and divide by // 1000.0 to get the number of seconds with a fractional part. candidate->conn->setSoTimeout(durationCount(timeout) / 1000.0); return candidate; } } catch (...) { lk.lock(); _destroyConnection_inlock(&_inUseConnections, candidate); throw; } lk.lock(); _destroyConnection_inlock(&_inUseConnections, candidate); } // No idle connection in the pool; make a new one. lk.unlock(); std::unique_ptr conn; if (_hook) { conn.reset(new DBClientConnection( false, // auto reconnect 0, // socket timeout {}, // MongoURI [this, target](const executor::RemoteCommandResponse& isMasterReply) { return _hook->validateHost(target, isMasterReply); })); } else { conn.reset(new DBClientConnection()); } // setSoTimeout takes a double representing the number of seconds for send and receive // timeouts. Thus, we must express 'timeout' in milliseconds and divide by 1000.0 to get // the number of seconds with a fractional part. conn->setSoTimeout(durationCount(timeout) / 1000.0); uassertStatusOK(conn->connect(target, StringData())); conn->port().setTag(conn->port().getTag() | _messagingPortTags); if (isInternalAuthSet()) { conn->auth(getInternalUserAuthParams()); } if (_hook) { auto postConnectRequest = uassertStatusOK(_hook->makeRequest(target)); // We might not have a postConnectRequest if (postConnectRequest != boost::none) { auto start = Date_t::now(); auto reply = conn->runCommandWithMetadata(postConnectRequest->dbname, postConnectRequest->cmdObj.firstElementFieldName(), postConnectRequest->metadata, postConnectRequest->cmdObj); auto rcr = executor::RemoteCommandResponse(reply->getCommandReply().getOwned(), reply->getMetadata().getOwned(), Date_t::now() - start); uassertStatusOK(_hook->handleReply(target, std::move(rcr))); } } lk.lock(); return _inUseConnections.insert(_inUseConnections.begin(), ConnectionInfo(conn.release(), now)); } void ConnectionPool::releaseConnection(ConnectionList::iterator iter, const Date_t now) { stdx::lock_guard lk(_mutex); if (!_shouldKeepConnection(now, *iter)) { _destroyConnection_inlock(&_inUseConnections, iter); return; } ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()]; _cleanUpOlderThan_inlock(now, &hostConns); hostConns.splice(hostConns.begin(), _inUseConnections, iter); _lastUsedHosts[iter->conn->getServerHostAndPort()] = now; } void ConnectionPool::destroyConnection(ConnectionList::iterator iter) { stdx::lock_guard lk(_mutex); _destroyConnection_inlock(&_inUseConnections, iter); } void ConnectionPool::_destroyConnection_inlock(ConnectionList* connList, ConnectionList::iterator iter) { delete iter->conn; connList->erase(iter); } // // ConnectionPool::ConnectionPtr // ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPool* pool, const HostAndPort& target, Date_t now, Milliseconds timeout) : _pool(pool), _connInfo(pool->acquireConnection(target, now, timeout)) {} ConnectionPool::ConnectionPtr::~ConnectionPtr() { if (_pool) { _pool->destroyConnection(_connInfo); } } ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPtr&& other) : _pool(std::move(other._pool)), _connInfo(std::move(other._connInfo)) { other._pool = nullptr; } ConnectionPool::ConnectionPtr& ConnectionPool::ConnectionPtr::operator=(ConnectionPtr&& other) { _pool = std::move(other._pool); _connInfo = std::move(other._connInfo); other._pool = nullptr; return *this; } void ConnectionPool::ConnectionPtr::done(Date_t now) { _pool->releaseConnection(_connInfo, now); _pool = NULL; } } // namespace mongo