diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-03 00:50:18 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-03 00:50:18 -0400 |
commit | ddae7b803ed19bf4bc1af1dcf0f8d4e44575736c (patch) | |
tree | aca26a863f7807c64d978f0c0ab86499499183a3 /src/mongo/executor/connection_pool.cpp | |
parent | edee95798fd655a8da71da29c6081d09e62a3b89 (diff) | |
download | mongo-ddae7b803ed19bf4bc1af1dcf0f8d4e44575736c.tar.gz |
Revert "SERVER-39814 Add OutOfLineExecutor to ConnectionPool factory interface"
This reverts commit f4b07839c904a73d28e3994af5bc902004fd4f9d.
Diffstat (limited to 'src/mongo/executor/connection_pool.cpp')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 202 |
1 files changed, 112 insertions, 90 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 4cbc42dc2ec..41e4e602c65 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -140,6 +140,12 @@ public: Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk); /** + * Gets a connection from the specific pool if a connection is available and there are no + * outstanding requests. + */ + boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); + + /** * Triggers the shutdown procedure. This function marks the state as kInShutdown * and calls processFailure below with the status provided. This may not immediately * delist or destruct this pool. However, both will happen eventually as ConnectionHandles @@ -222,21 +228,15 @@ private: } }; - ConnectionHandle makeHandle(ConnectionInterface* connection); - - void finishRefresh(stdx::unique_lock<stdx::mutex> lk, - ConnectionInterface* connPtr, - Status status); - void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn); void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk); void spawnConnections(stdx::unique_lock<stdx::mutex>& lk); - // This internal helper is used both by get and by fulfillRequests and differs in that it + // This internal helper is used both by tryGet and by fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own - ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); + boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk); template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( @@ -398,6 +398,23 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb)); } +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet( + const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto iter = _pools.find(hostAndPort); + + if (iter == _pools.end()) { + return boost::none; + } + + const auto& pool = iter->second; + invariant(pool); + pool->fassertSSLModeIs(sslMode); + + return pool->tryGetConnection(lk); +} + Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) { @@ -506,14 +523,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) { invariant(_state != State::kInShutdown); - auto conn = tryGetConnection(lk); - - updateStateInLock(); - - if (conn) { - return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn)); - } - if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } @@ -526,25 +535,28 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec updateStateInLock(); - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20000, schedStatus); - - spawnConnections(lk); - })); + spawnConnections(lk); + fulfillRequests(lk); return std::move(pf.future); } -auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { - auto fun = guardCallback( - [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); }); +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection( + const stdx::unique_lock<stdx::mutex>& lk) { + invariant(_state != State::kInShutdown); + + if (_requests.size()) { + return boost::none; + } - auto handle = ConnectionHandle(connection, fun); - return handle; + auto conn = tryGetInternal(lk); + + updateStateInLock(); + + return conn; } -ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal( const stdx::unique_lock<stdx::mutex>&) { while (_readyPool.size()) { @@ -570,55 +582,14 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( // pass it to the user connPtr->resetToUnknown(); - auto handle = makeHandle(connPtr); - return handle; + return ConnectionHandle(connPtr, + guardCallback([this](stdx::unique_lock<stdx::mutex> localLk, + ConnectionPool::ConnectionInterface* conn) { + returnConnection(conn, std::move(localLk)); + })); } - return {}; -} - -void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk, - ConnectionInterface* connPtr, - Status status) { - auto conn = takeFromProcessingPool(connPtr); - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If we've exceeded the time limit, start a new connect, - // rather than failing all operations. We do this because the - // various callers have their own time limit which is unrelated - // to our internal one. - if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - LOG(0) << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Pass a failure on through - if (!status.isOK()) { - processFailure(status, std::move(lk)); - return; - } - - // If the host and port were dropped, let this lapse and spawn new connections - if (conn->getGeneration() != _generation) { - spawnConnections(lk); - return; - } - - // If the connection refreshed successfully, throw it back in the ready pool - addToReady(lk, std::move(conn)); - - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20003, schedStatus); - fulfillRequests(lk); - })); + return boost::none; } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, @@ -660,19 +631,49 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // Unlock in case refresh can occur immediately lk.unlock(); connPtr->refresh(_parent->_options.refreshTimeout, - guardCallback([this](auto lk, auto conn, auto status) { - finishRefresh(std::move(lk), conn, status); + guardCallback([this](stdx::unique_lock<stdx::mutex> lk, + ConnectionInterface* connPtr, + Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in + // the ready pool + if (status.isOK()) { + // If the host and port were dropped, let this lapse + if (conn->getGeneration() == _generation) { + addToReady(lk, std::move(conn)); + fulfillRequests(lk); + } + + return; + } + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + log() << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Otherwise pass the failure on through + processFailure(status, std::move(lk)); })); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); - - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20004, schedStatus); - fulfillRequests(lk); - })); + // TODO This should be scheduled on an executor once we have executor-aware pooling + fulfillRequests(lk); } updateStateInLock(); @@ -783,7 +784,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex // deadlock). // // None of the heap manipulation code throws, but it's something to keep in mind. - auto conn = tryGetConnection(lk); + auto conn = tryGetInternal(lk); if (!conn) { break; @@ -795,7 +796,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex _requests.pop_back(); lk.unlock(); - promise.emplaceValue(std::move(conn)); + promise.emplaceValue(std::move(*conn)); lk.lock(); updateStateInLock(); @@ -846,11 +847,32 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // Run the setup callback lk.unlock(); - handle->setup(_parent->_options.refreshTimeout, - guardCallback([this](auto lk, auto conn, auto status) { - finishRefresh(std::move(lk), conn, status); - })); - + handle->setup( + _parent->_options.refreshTimeout, + guardCallback([this]( + stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need this conn + if (_state == State::kInShutdown) + return; + + if (status.isOK()) { + // If the host and port was dropped, let the connection lapse + if (conn->getGeneration() == _generation) { + addToReady(lk, std::move(conn)); + fulfillRequests(lk); + } + } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + // If we've exceeded the time limit, restart the connect, rather than + // failing all operations. We do this because the various callers + // have their own time limit which is unrelated to our internal one. + spawnConnections(lk); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); + } + })); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout |