/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO #include "mongo/platform/basic.h" #include "mongo/executor/connection_pool.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" #include "mongo/util/lru_cache.h" #include "mongo/util/scopeguard.h" // One interesting implementation note herein concerns how setup() and // refresh() are invoked outside of the global lock, but setTimeout is not. // This implementation detail simplifies mocks, allowing them to return // synchronously sometimes, whereas having timeouts fire instantly adds little // value. In practice, dumping the locks is always safe (because we restrict // ourselves to operations over the connection). namespace mongo { namespace executor { /** * A pool for a specific HostAndPort * * Pools come into existance the first time a connection is requested and * go out of existence after hostTimeout passes without any of their * connections being used. */ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this { public: /** * These active client methods must be used whenever entering a specific pool outside of the * shutdown background task. The presence of an active client will bump a counter on the * specific pool which will prevent the shutdown thread from deleting it. * * The complexity comes from the need to hold a lock when writing to the * _activeClients param on the specific pool. Because the code beneath the client needs to lock * and unlock the parent mutex (and can leave unlocked), we want to start the client with the * lock acquired, move it into the client, then re-acquire to decrement the counter on the way * out. * * It's used like: * * pool.runWithActiveClient([](stdx::unique_lock lk){ codeToBeProtected(); }); */ template auto runWithActiveClient(Callback&& cb) { return runWithActiveClient(stdx::unique_lock(_parent->_mutex), std::forward(cb)); } template auto runWithActiveClient(stdx::unique_lock lk, Callback&& cb) { invariant(lk.owns_lock()); _activeClients++; const auto guard = MakeGuard([&] { invariant(!lk.owns_lock()); stdx::lock_guard lk(_parent->_mutex); _activeClients--; }); { decltype(lk) localLk(std::move(lk)); return cb(std::move(localLk)); } } SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); ~SpecificPool(); /** * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ Future getConnection(const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock lk); /** * Cascades a failure across existing connections and requests. Invoking * this function drops all current connections and fails all current * requests with the passed status. */ void processFailure(const Status& status, stdx::unique_lock lk); /** * Returns a connection to a specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ void returnConnection(ConnectionInterface* connection, stdx::unique_lock lk); /** * Returns the number of connections currently checked out of the pool. */ size_t inUseConnections(const stdx::unique_lock& lk); /** * Returns the number of available connections in the pool. */ size_t availableConnections(const stdx::unique_lock& lk); /** * Returns the number of in progress connections in the pool. */ size_t refreshingConnections(const stdx::unique_lock& lk); /** * Returns the total number of connections ever created in this pool. */ size_t createdConnections(const stdx::unique_lock& lk); /** * Returns the total number of connections currently open that belong to * this pool. This is the sum of refreshingConnections, availableConnections, * and inUseConnections. */ size_t openConnections(const stdx::unique_lock& lk); /** * Return true if the tags on the specific pool match the passed in tags */ bool matchesTags(const stdx::unique_lock& lk, transport::Session::TagMask tags) const { return !!(_tags & tags); } /** * Atomically manipulate the tags in the pool */ void mutateTags(const stdx::unique_lock& lk, const stdx::function& mutateFunc) { _tags = mutateFunc(_tags); } /** * See runWithActiveClient for what this controls, and be very very careful to manage the * refcount correctly. */ void incActiveClients(const stdx::unique_lock& lk) { _activeClients++; } void decActiveClients(const stdx::unique_lock& lk) { _activeClients--; } private: using OwnedConnection = std::shared_ptr; using OwnershipPool = stdx::unordered_map; using LRUOwnershipPool = LRUCache; using Request = std::pair>; struct RequestComparator { bool operator()(const Request& a, const Request& b) { return a.first > b.first; } }; void addToReady(stdx::unique_lock& lk, OwnedConnection conn); void fulfillRequests(stdx::unique_lock& lk); void spawnConnections(stdx::unique_lock& lk); void shutdown(); template typename OwnershipPoolType::mapped_type takeFromPool( OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr); OwnedConnection takeFromProcessingPool(ConnectionInterface* connection); void updateStateInLock(); private: ConnectionPool* const _parent; const HostAndPort _hostAndPort; LRUOwnershipPool _readyPool; OwnershipPool _processingPool; OwnershipPool _droppedProcessingPool; OwnershipPool _checkedOutPool; std::vector _requests; std::shared_ptr _requestTimer; Date_t _requestTimerExpiration; size_t _activeClients; size_t _generation; bool _inFulfillRequests; bool _inSpawnConnections; size_t _created; transport::Session::TagMask _tags = transport::Session::kPending; /** * The current state of the pool * * The pool begins in a running state. Moves to idle when no requests * are pending and no connections are checked out. It finally enters * shutdown after hostTimeout has passed (and waits there for current * refreshes to process out). * * At any point a new request sets the state back to running and * restarts all timers. */ enum class State { // The pool is active kRunning, // No current activity, waiting for hostTimeout to pass kIdle, // hostTimeout is passed, we're waiting for any processing // connections to finish before shutting down kInShutdown, }; State _state; }; constexpr Milliseconds ConnectionPool::kDefaultHostTimeout; size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits::max(); size_t const ConnectionPool::kDefaultMinConns = 1; size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits::max(); constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement; constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout; const Status ConnectionPool::kConnectionStateUnknown = Status(ErrorCodes::InternalError, "Connection is in an unknown state"); ConnectionPool::ConnectionPool(std::shared_ptr impl, std::string name, Options options) : _name(std::move(name)), _options(std::move(options)), _factory(std::move(impl)), _manager(options.egressTagCloserManager) { if (_manager) { _manager->add(this); } } ConnectionPool::~ConnectionPool() { // If we're currently destroying the service context the _manager is already deleted and this // pointer dangles. No need for cleanup in that case. if (hasGlobalServiceContext() && _manager) { _manager->remove(this); } shutdown(); } void ConnectionPool::shutdown() { _factory->shutdown(); std::vector pools; // Ensure we decrement active clients for all pools that we inc on (because we intend to process // failures) const auto guard = MakeGuard([&] { stdx::unique_lock lk(_mutex); for (const auto& pool : pools) { pool->decActiveClients(lk); } }); // Grab all current pools (under the lock) { stdx::unique_lock lk(_mutex); for (auto& pair : _pools) { pools.push_back(pair.second.get()); pair.second->incActiveClients(lk); } } // Reacquire the lock per pool and process failures. We'll dec active clients when we're all // through in the guard for (const auto& pool : pools) { stdx::unique_lock lk(_mutex); pool->processFailure( Status(ErrorCodes::ShutdownInProgress, "Shutting down the connection pool"), std::move(lk)); } } void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { stdx::unique_lock lk(_mutex); auto iter = _pools.find(hostAndPort); if (iter == _pools.end()) return; iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { iter->second->processFailure( Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); }); } void ConnectionPool::dropConnections(transport::Session::TagMask tags) { std::vector pools; // Ensure we decrement active clients for all pools that we inc on (because we intend to process // failures) const auto guard = MakeGuard([&] { stdx::unique_lock lk(_mutex); for (const auto& pool : pools) { pool->decActiveClients(lk); } }); // Grab all current pools that don't match tags (under the lock) { stdx::unique_lock lk(_mutex); for (auto& pair : _pools) { if (!pair.second->matchesTags(lk, tags)) { pools.push_back(pair.second.get()); pair.second->incActiveClients(lk); } } } // Reacquire the lock per pool and process failures. We'll dec active clients when we're all // through in the guard for (const auto& pool : pools) { stdx::unique_lock lk(_mutex); pool->processFailure( Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); } } void ConnectionPool::mutateTags( const HostAndPort& hostAndPort, const stdx::function& mutateFunc) { stdx::unique_lock lk(_mutex); auto iter = _pools.find(hostAndPort); if (iter == _pools.end()) return; iter->second->mutateTags(lk, mutateFunc); } void ConnectionPool::get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb) { return get(hostAndPort, timeout).getAsync(std::move(cb)); } Future ConnectionPool::get(const HostAndPort& hostAndPort, Milliseconds timeout) { SpecificPool* pool; stdx::unique_lock lk(_mutex); auto iter = _pools.find(hostAndPort); if (iter == _pools.end()) { auto handle = stdx::make_unique(this, hostAndPort); pool = handle.get(); _pools[hostAndPort] = std::move(handle); } else { pool = iter->second.get(); } invariant(pool); return pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { return pool->getConnection(hostAndPort, timeout, std::move(lk)); }); } void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { stdx::unique_lock lk(_mutex); for (const auto& kv : _pools) { HostAndPort host = kv.first; auto& pool = kv.second; ConnectionStatsPer hostStats{pool->inUseConnections(lk), pool->availableConnections(lk), pool->createdConnections(lk), pool->refreshingConnections(lk)}; stats->updateStatsForHost(_name, host, hostStats); } } size_t ConnectionPool::getNumConnectionsPerHost(const HostAndPort& hostAndPort) const { stdx::unique_lock lk(_mutex); auto iter = _pools.find(hostAndPort); if (iter != _pools.end()) { return iter->second->openConnections(lk); } return 0; } void ConnectionPool::returnConnection(ConnectionInterface* conn) { stdx::unique_lock lk(_mutex); auto iter = _pools.find(conn->getHostAndPort()); invariant(iter != _pools.end(), str::stream() << "Tried to return connection but no pool found for " << conn->getHostAndPort()); iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { iter->second->returnConnection(conn, std::move(lk)); }); } ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) : _parent(parent), _hostAndPort(hostAndPort), _readyPool(std::numeric_limits::max()), _requestTimer(parent->_factory->makeTimer()), _activeClients(0), _generation(0), _inFulfillRequests(false), _inSpawnConnections(false), _created(0), _state(State::kRunning) {} ConnectionPool::SpecificPool::~SpecificPool() { DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) } size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock& lk) { return _checkedOutPool.size(); } size_t ConnectionPool::SpecificPool::availableConnections( const stdx::unique_lock& lk) { return _readyPool.size(); } size_t ConnectionPool::SpecificPool::refreshingConnections( const stdx::unique_lock& lk) { return _processingPool.size(); } size_t ConnectionPool::SpecificPool::createdConnections(const stdx::unique_lock& lk) { return _created; } size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock& lk) { return _checkedOutPool.size() + _readyPool.size() + _processingPool.size(); } Future ConnectionPool::SpecificPool::getConnection( const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock lk) { if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } const auto expiration = _parent->_factory->now() + timeout; auto pf = makePromiseFuture(); _requests.push_back(make_pair(expiration, pf.promise.share())); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); updateStateInLock(); spawnConnections(lk); fulfillRequests(lk); return std::move(pf.future); } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, stdx::unique_lock lk) { auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement; auto conn = takeFromPool(_checkedOutPool, connPtr); invariant(conn); updateStateInLock(); // Users are required to call indicateSuccess() or indicateFailure() before allowing // a connection to be returned. Otherwise, we have entered an unknown state. invariant(conn->getStatus() != kConnectionStateUnknown); if (conn->getGeneration() != _generation) { // If the connection is from an older generation, just return. return; } if (!conn->getStatus().isOK()) { // TODO: alert via some callback if the host is bad log() << "Ending connection to host " << _hostAndPort << " due to bad connection status; " << openConnections(lk) << " connections to that host remain open"; return; } auto now = _parent->_factory->now(); if (needsRefreshTP <= now) { // If we need to refresh this connection if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= _parent->_options.minConnections) { // If we already have minConnections, just let the connection lapse log() << "Ending idle connection to host " << _hostAndPort << " because the pool meets constraints; " << openConnections(lk) << " connections to that host remain open"; return; } _processingPool[connPtr] = std::move(conn); // Unlock in case refresh can occur immediately lk.unlock(); connPtr->refresh( _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { runWithActiveClient([&](stdx::unique_lock lk) { auto conn = takeFromProcessingPool(connPtr); // If the host and port were dropped, let this lapse if (conn->getGeneration() != _generation) { spawnConnections(lk); return; } // If we're in shutdown, we don't need refreshed connections if (_state == State::kInShutdown) return; // If the connection refreshed successfully, throw it back in // the ready pool if (status.isOK()) { addToReady(lk, std::move(conn)); spawnConnections(lk); return; } // If we've exceeded the time limit, start a new connect, // rather than failing all operations. We do this because the // various callers have their own time limit which is unrelated // to our internal one. if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { log() << "Pending connection to host " << _hostAndPort << " did not complete within the connection timeout," << " retrying with a new connection;" << openConnections(lk) << " connections to that host remain open"; spawnConnections(lk); return; } // Otherwise pass the failure on through processFailure(status, std::move(lk)); }); }); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); } updateStateInLock(); } // Adds a live connection to the ready pool void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock& lk, OwnedConnection conn) { auto connPtr = conn.get(); // This makes the connection the new most-recently-used connection. _readyPool.add(connPtr, std::move(conn)); // Our strategy for refreshing connections is to check them out and // immediately check them back in (which kicks off the refresh logic in // returnConnection connPtr->setTimeout(_parent->_options.refreshRequirement, [ this, connPtr, anchor = shared_from_this() ]() { runWithActiveClient([&](stdx::unique_lock lk) { auto conn = takeFromPool(_readyPool, connPtr); // We've already been checked out. We don't need to refresh // ourselves. if (!conn) return; // If we're in shutdown, we don't need to refresh connections if (_state == State::kInShutdown) return; _checkedOutPool[connPtr] = std::move(conn); connPtr->indicateSuccess(); returnConnection(connPtr, std::move(lk)); }); }); fulfillRequests(lk); } // Drop connections and fail all requests void ConnectionPool::SpecificPool::processFailure(const Status& status, stdx::unique_lock lk) { // Bump the generation so we don't reuse any pending or checked out // connections _generation++; // Drop ready connections _readyPool.clear(); // Log something helpful log() << "Dropping all pooled connections to " << _hostAndPort << " due to " << status; // Migrate processing connections to the dropped pool for (auto&& x : _processingPool) { _droppedProcessingPool[x.first] = std::move(x.second); } _processingPool.clear(); // Move the requests out so they aren't visible // in other threads decltype(_requests) requestsToFail; { using std::swap; swap(requestsToFail, _requests); } // Update state to reflect the lack of requests updateStateInLock(); // Drop the lock and process all of the requests // with the same failed status lk.unlock(); for (auto& request : requestsToFail) { request.second.setError(status); } } // fulfills as many outstanding requests as possible void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock& lk) { // If some other thread (possibly this thread) is fulfilling requests, // don't keep padding the callstack. if (_inFulfillRequests) return; _inFulfillRequests = true; auto guard = MakeGuard([&] { _inFulfillRequests = false; }); while (_requests.size()) { // _readyPool is an LRUCache, so its begin() object is the MRU item. auto iter = _readyPool.begin(); if (iter == _readyPool.end()) break; // Grab the connection and cancel its timeout auto conn = std::move(iter->second); _readyPool.erase(iter); conn->cancelTimeout(); if (!conn->isHealthy()) { log() << "dropping unhealthy pooled connection to " << conn->getHostAndPort(); if (_readyPool.empty()) { log() << "after drop, pool was empty, going to spawn some connections"; // Spawn some more connections to the bad host if we're all out. spawnConnections(lk); } // Drop the bad connection. conn.reset(); // Retry. continue; } // Grab the request and callback auto promise = std::move(_requests.front().second); std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); _requests.pop_back(); auto connPtr = conn.get(); // check out the connection _checkedOutPool[connPtr] = std::move(conn); updateStateInLock(); // pass it to the user connPtr->resetToUnknown(); lk.unlock(); promise.emplaceValue(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); lk.lock(); } } // spawn enough connections to satisfy open requests and minpool, while // honoring maxpool void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock& lk) { // If some other thread (possibly this thread) is spawning connections, // don't keep padding the callstack. if (_inSpawnConnections) return; _inSpawnConnections = true; auto guard = MakeGuard([&] { _inSpawnConnections = false; }); // We want minConnections <= outstanding requests <= maxConnections auto target = [&] { return std::max( _parent->_options.minConnections, std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections)); }; // While all of our inflight connections are less than our target while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) && (_processingPool.size() < _parent->_options.maxConnecting)) { OwnedConnection handle; try { // make a new connection and put it in processing handle = _parent->_factory->makeConnection(_hostAndPort, _generation); } catch (std::system_error& e) { severe() << "Failed to construct a new connection object: " << e.what(); fassertFailed(40336); } _processingPool[handle.get()] = handle; ++_created; // Run the setup callback lk.unlock(); handle->setup( _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { runWithActiveClient([&](stdx::unique_lock lk) { auto conn = takeFromProcessingPool(connPtr); if (conn->getGeneration() != _generation) { // If the host and port was dropped, let the // connection lapse spawnConnections(lk); } else if (status.isOK()) { addToReady(lk, std::move(conn)); spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers // have their own time limit which is unrelated to our internal one. spawnConnections(lk); } else { // If the setup failed, cascade the failure edge processFailure(status, std::move(lk)); } }); }); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout lk.lock(); } } // Called every second after hostTimeout until all processing connections reap void ConnectionPool::SpecificPool::shutdown() { stdx::unique_lock lk(_parent->_mutex); // We're racing: // // Thread A (this thread) // * Fired the shutdown timer // * Came into shutdown() and blocked // // Thread B (some new consumer) // * Requested a new connection // * Beat thread A to the mutex // * Cancelled timer (but thread A already made it in) // * Set state to running // * released the mutex // // So we end up in shutdown, but with kRunning. If we're here we raced and // we should just bail. if (_state == State::kRunning) { return; } _state = State::kInShutdown; // If we have processing connections, wait for them to finish or timeout // before shutdown if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) { _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); }); return; } invariant(_requests.empty()); invariant(_checkedOutPool.empty()); _parent->_pools.erase(_hostAndPort); } template typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool( OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) { auto iter = pool.find(connPtr); if (iter == pool.end()) return typename OwnershipPoolType::mapped_type(); auto conn = std::move(iter->second); pool.erase(iter); return conn; } ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool( ConnectionInterface* connPtr) { auto conn = takeFromPool(_processingPool, connPtr); if (conn) return conn; return takeFromPool(_droppedProcessingPool, connPtr); } // Updates our state and manages the request timer void ConnectionPool::SpecificPool::updateStateInLock() { if (_requests.size()) { // We have some outstanding requests, we're live // If we were already running and the timer is the same as it was // before, nothing to do if (_state == State::kRunning && _requestTimerExpiration == _requests.front().first) return; _state = State::kRunning; _requestTimer->cancelTimeout(); _requestTimerExpiration = _requests.front().first; auto timeout = _requests.front().first - _parent->_factory->now(); // We set a timer for the most recent request, then invoke each timed // out request we couldn't service _requestTimer->setTimeout(timeout, [this]() { runWithActiveClient([&](stdx::unique_lock lk) { auto now = _parent->_factory->now(); while (_requests.size()) { auto& x = _requests.front(); if (x.first <= now) { auto promise = std::move(x.second); std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); _requests.pop_back(); lk.unlock(); promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "Couldn't get a connection within the time limit")); lk.lock(); } else { break; } } updateStateInLock(); }); }); } else if (_checkedOutPool.size()) { // If we have no requests, but someone's using a connection, we just // hang around until the next request or a return _requestTimer->cancelTimeout(); _state = State::kRunning; _requestTimerExpiration = _requestTimerExpiration.max(); } else { // If we don't have any live requests and no one has checked out connections // If we used to be idle, just bail if (_state == State::kIdle) return; _state = State::kIdle; _requestTimer->cancelTimeout(); _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout; auto timeout = _parent->_options.hostTimeout; // Set the shutdown timer _requestTimer->setTimeout(timeout, [this]() { shutdown(); }); } } } // namespace executor } // namespace mongo