#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
#include "mongo/platform/basic.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
// One interesting implementation note herein concerns how setup() and
// refresh() are invoked outside of the global lock, but setTimeout is not.
// This implementation detail simplifies mocks, allowing them to return
// synchronously sometimes, whereas having timeouts fire instantly adds little
// value. In practice, dumping the locks is always safe (because we restrict
// ourselves to operations over the connection).
namespace mongo {
namespace executor {
* A pool for a specific HostAndPort
* Pools come into existance the first time a connection is requested and
* go out of existence after hostTimeout passes without any of their
* connections being used.
class ConnectionPool::SpecificPool {
SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
* Gets a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
void getConnection(const HostAndPort& hostAndPort,
Milliseconds timeout,
stdx::unique_lock lk,
GetConnectionCallback cb);
* Cascades a failure across existing connections and requests. Invoking
* this function drops all current connections and fails all current
* requests with the passed status.
void processFailure(const Status& status, stdx::unique_lock lk);
* Returns a connection to a specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
void returnConnection(ConnectionInterface* connection, stdx::unique_lock lk);
* Returns the number of connections currently checked out of the pool.
size_t inUseConnections(const stdx::unique_lock& lk);
* Returns the number of available connections in the pool.
size_t availableConnections(const stdx::unique_lock& lk);
* Returns the total number of connections ever created in this pool.
size_t createdConnections(const stdx::unique_lock& lk);
using OwnedConnection = std::unique_ptr;
using OwnershipPool = std::unordered_map;
using Request = std::pair;
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
return a.first > b.first;
void addToReady(stdx::unique_lock& lk, OwnedConnection conn);
void fulfillRequests(stdx::unique_lock& lk);
void spawnConnections(stdx::unique_lock& lk, const HostAndPort& hostAndPort);
void shutdown();
OwnedConnection takeFromPool(OwnershipPool& pool, ConnectionInterface* connection);
OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
void updateStateInLock();
ConnectionPool* const _parent;
const HostAndPort _hostAndPort;
OwnershipPool _readyPool;
OwnershipPool _processingPool;
OwnershipPool _droppedProcessingPool;
OwnershipPool _checkedOutPool;
std::priority_queue, RequestComparator> _requests;
std::unique_ptr _requestTimer;
Date_t _requestTimerExpiration;
size_t _generation;
bool _inFulfillRequests;
size_t _created;
* The current state of the pool
* The pool begins in a running state. Moves to idle when no requests
* are pending and no connections are checked out. It finally enters
* shutdown after hostTimeout has passed (and waits there for current
* refreshes to process out).
* At any point a new request sets the state back to running and
* restarts all timers.
enum class State {
// The pool is active
// No current activity, waiting for hostTimeout to pass
// hostTimeout is passed, we're waiting for any processing
// connections to finish before shutting down
State _state;
Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(20);
Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Seconds(60);
Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5);
const Status ConnectionPool::kConnectionStateUnknown =
Status(ErrorCodes::InternalError, "Connection is in an unknown state");
ConnectionPool::ConnectionPool(std::unique_ptr impl, Options options)
: _options(std::move(options)), _factory(std::move(impl)) {}
ConnectionPool::~ConnectionPool() = default;
void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
stdx::unique_lock lk(_mutex);
auto iter = _pools.find(hostAndPort);
if (iter == _pools.end())
Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk));
void ConnectionPool::get(const HostAndPort& hostAndPort,
Milliseconds timeout,
GetConnectionCallback cb) {
SpecificPool* pool;
stdx::unique_lock lk(_mutex);
auto iter = _pools.find(hostAndPort);
if (iter == _pools.end()) {
auto handle = stdx::make_unique(this, hostAndPort);
pool = handle.get();
_pools[hostAndPort] = std::move(handle);
} else {
pool = iter->second.get();
pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
stdx::unique_lock lk(_mutex);
for (const auto& kv : _pools) {
HostAndPort host = kv.first;
auto& pool = kv.second;
ConnectionStatsPerHost hostStats{pool->inUseConnections(lk),
stats->updateStatsForHost(host, hostStats);
void ConnectionPool::returnConnection(ConnectionInterface* conn) {
stdx::unique_lock lk(_mutex);
auto iter = _pools.find(conn->getHostAndPort());
invariant(iter != _pools.end());
iter->second.get()->returnConnection(conn, std::move(lk));
ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
: _parent(parent),
_state(State::kRunning) {}
ConnectionPool::SpecificPool::~SpecificPool() {
size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock& lk) {
return _checkedOutPool.size();
size_t ConnectionPool::SpecificPool::availableConnections(
const stdx::unique_lock& lk) {
return _readyPool.size();
size_t ConnectionPool::SpecificPool::createdConnections(const stdx::unique_lock& lk) {
return _created;
void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
Milliseconds timeout,
stdx::unique_lock lk,
GetConnectionCallback cb) {
// We need some logic here to handle kNoTimeout, which is defined as -1 Milliseconds. If we just
// added the timeout, we would get a time 1MS in the past, which would immediately timeout - the
// exact opposite of what we want.
auto expiration = (timeout == RemoteCommandRequest::kNoTimeout)
? RemoteCommandRequest::kNoExpirationDate
: _parent->_factory->now() + timeout;
_requests.push(make_pair(expiration, std::move(cb)));
spawnConnections(lk, hostAndPort);
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
stdx::unique_lock lk) {
auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
auto conn = takeFromPool(_checkedOutPool, connPtr);
// Users are required to call indicateSuccess() or indicateFailure() before allowing
// a connection to be returned. Otherwise, we have entered an unknown state.
invariant(conn->getStatus() != kConnectionStateUnknown);
if (conn->getGeneration() != _generation) {
// If the connection is from an older generation, just return.
if (!conn->getStatus().isOK()) {
// TODO: alert via some callback if the host is bad
auto now = _parent->_factory->now();
if (needsRefreshTP <= now) {
// If we need to refresh this connection
if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
_parent->_options.minConnections) {
// If we already have minConnections, just let the connection lapse
_processingPool[connPtr] = std::move(conn);
// Unlock in case refresh can occur immediately
[this](ConnectionInterface* connPtr, Status status) {
stdx::unique_lock lk(_parent->_mutex);
auto conn = takeFromProcessingPool(connPtr);
// If the host and port were dropped, let this lapse
if (conn->getGeneration() != _generation)
// If we're in shutdown, we don't need refreshed connections
if (_state == State::kInShutdown)
// If the connection refreshed successfully, throw it back in the ready
// pool
if (status.isOK()) {
addToReady(lk, std::move(conn));
// Otherwise pass the failure on through
processFailure(status, std::move(lk));
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
// Adds a live connection to the ready pool
void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock& lk,
OwnedConnection conn) {
auto connPtr = conn.get();
_readyPool[connPtr] = std::move(conn);
// Our strategy for refreshing connections is to check them out and
// immediately check them back in (which kicks off the refresh logic in
// returnConnection
[this, connPtr]() {
OwnedConnection conn;
stdx::unique_lock lk(_parent->_mutex);
if (!_readyPool.count(connPtr)) {
// We've already been checked out. We don't need to refresh
// ourselves.
conn = takeFromPool(_readyPool, connPtr);
// If we're in shutdown, we don't need to refresh connections
if (_state == State::kInShutdown)
_checkedOutPool[connPtr] = std::move(conn);
returnConnection(connPtr, std::move(lk));
// Drop connections and fail all requests
void ConnectionPool::SpecificPool::processFailure(const Status& status,
stdx::unique_lock lk) {
// Bump the generation so we don't reuse any pending or checked out
// connections
// Drop ready connections
// Migrate processing connections to the dropped pool
for (auto&& x : _processingPool) {
_droppedProcessingPool[x.first] = std::move(x.second);
// Move the requests out so they aren't visible
// in other threads
decltype(_requests) requestsToFail;
using std::swap;
swap(requestsToFail, _requests);
// Update state to reflect the lack of requests
// Drop the lock and process all of the requests
// with the same failed status
while (requestsToFail.size()) {
// fulfills as many outstanding requests as possible
void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock& lk) {
// If some other thread (possibly this thread) is fulfilling requests,
// don't keep padding the callstack.
if (_inFulfillRequests)
_inFulfillRequests = true;
auto guard = MakeGuard([&] { _inFulfillRequests = false; });
while (_requests.size()) {
auto iter = _readyPool.begin();
if (iter == _readyPool.end())
// Grab the connection and cancel its timeout
auto conn = std::move(iter->second);
if (!conn->isHealthy()) {
log() << "dropping unhealthy pooled connection to " << conn->getHostAndPort();
if (_readyPool.empty()) {
log() << "after drop, pool was empty, going to spawn some connections";
// Spawn some more connections to the bad host if we're all out.
spawnConnections(lk, conn->getHostAndPort());
// Drop the bad connection.
// Retry.
// Grab the request and callback
auto cb = std::move(_requests.top().second);
auto connPtr = conn.get();
// check out the connection
_checkedOutPool[connPtr] = std::move(conn);
// pass it to the user
cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
// spawn enough connections to satisfy open requests and minpool, while
// honoring maxpool
void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock& lk,
const HostAndPort& hostAndPort) {
// We want minConnections <= outstanding requests <= maxConnections
auto target = [&] {
return std::max(
std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections));
// While all of our inflight connections are less than our target
while (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) {
// make a new connection and put it in processing
auto handle = _parent->_factory->makeConnection(hostAndPort, _generation);
auto connPtr = handle.get();
_processingPool[connPtr] = std::move(handle);
// Run the setup callback
[this](ConnectionInterface* connPtr, Status status) {
stdx::unique_lock lk(_parent->_mutex);
auto conn = takeFromProcessingPool(connPtr);
if (conn->getGeneration() != _generation) {
// If the host and port was dropped, let the
// connection lapse
} else if (status.isOK()) {
addToReady(lk, std::move(conn));
} else {
// If the setup failed, cascade the failure edge
processFailure(status, std::move(lk));
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
// Called every second after hostTimeout until all processing connections reap
void ConnectionPool::SpecificPool::shutdown() {
stdx::unique_lock lk(_parent->_mutex);
// We're racing:
// Thread A (this thread)
// * Fired the shutdown timer
// * Came into shutdown() and blocked
// Thread B (some new consumer)
// * Requested a new connection
// * Beat thread A to the mutex
// * Cancelled timer (but thread A already made it in)
// * Set state to running
// * released the mutex
// So we end up in shutdown, but with kRunning. If we're here we raced and
// we should just bail.
if (_state == State::kRunning) {
_state = State::kInShutdown;
// If we have processing connections, wait for them to finish or timeout
// before shutdown
if (_processingPool.size() || _droppedProcessingPool.size()) {
_requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromPool(
OwnershipPool& pool, ConnectionInterface* connPtr) {
auto iter = pool.find(connPtr);
invariant(iter != pool.end());
auto conn = std::move(iter->second);
return conn;
ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool(
ConnectionInterface* connPtr) {
if (_processingPool.count(connPtr))
return takeFromPool(_processingPool, connPtr);
return takeFromPool(_droppedProcessingPool, connPtr);
// Updates our state and manages the request timer
void ConnectionPool::SpecificPool::updateStateInLock() {
if (_requests.size()) {
// We have some outstanding requests, we're live
// If we were already running and the timer is the same as it was
// before, nothing to do
if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first)
_state = State::kRunning;
_requestTimerExpiration = _requests.top().first;
auto timeout = _requests.top().first - _parent->_factory->now();
// We set a timer for the most recent request, then invoke each timed
// out request we couldn't service
[this]() {
stdx::unique_lock lk(_parent->_mutex);
auto now = _parent->_factory->now();
while (_requests.size()) {
auto& x = _requests.top();
if (x.first <= now) {
auto cb = std::move(x.second);
"Couldn't get a connection within the time limit"));
} else {
} else if (_checkedOutPool.size()) {
// If we have no requests, but someone's using a connection, we just
// hang around until the next request or a return
_state = State::kRunning;
_requestTimerExpiration = _requestTimerExpiration.max();
} else {
// If we don't have any live requests and no one has checked out connections
// If we used to be idle, just bail
if (_state == State::kIdle)
_state = State::kIdle;
_requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout;
auto timeout = _parent->_options.hostTimeout;
// Set the shutdown timer
_requestTimer->setTimeout(timeout, [this]() { shutdown(); });
} // namespace executor
} // namespace mongo