diff options
author | Andrew Morrow <acm@mongodb.com> | 2017-04-08 14:02:46 -0400 |
---|---|---|
committer | Andrew Morrow <acm@mongodb.com> | 2017-04-13 18:13:31 -0400 |
commit | 3edc5558fa1728761549c775350a2e17fb68f8ab (patch) | |
tree | 0054521588c112db14355a889c71c8e665dcbf45 /src/mongo | |
parent | cc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d (diff) | |
download | mongo-3edc5558fa1728761549c775350a2e17fb68f8ab.tar.gz |
SERVER-28664 Use pool connections in MRU order
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 19 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 174 | ||||
-rw-r--r-- | src/mongo/util/lru_cache.h | 36 | ||||
-rw-r--r-- | src/mongo/util/lru_cache_test.cpp | 46 |
4 files changed, 246 insertions, 29 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 7de24928f8f..78665f7f11e 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -39,6 +39,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" +#include "mongo/util/lru_cache.h" #include "mongo/util/scopeguard.h" // One interesting implementation note herein concerns how setup() and @@ -115,6 +116,7 @@ public: private: using OwnedConnection = std::unique_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; + using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>; using Request = std::pair<Date_t, GetConnectionCallback>; struct RequestComparator { bool operator()(const Request& a, const Request& b) { @@ -130,7 +132,10 @@ private: void shutdown(); - OwnedConnection takeFromPool(OwnershipPool& pool, ConnectionInterface* connection); + template <typename OwnershipPoolType> + typename OwnershipPoolType::mapped_type takeFromPool( + OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr); + OwnedConnection takeFromProcessingPool(ConnectionInterface* connection); void updateStateInLock(); @@ -140,7 +145,7 @@ private: const HostAndPort _hostAndPort; - OwnershipPool _readyPool; + LRUOwnershipPool _readyPool; OwnershipPool _processingPool; OwnershipPool _droppedProcessingPool; OwnershipPool _checkedOutPool; @@ -269,6 +274,7 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) { ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) : _parent(parent), _hostAndPort(hostAndPort), + _readyPool(std::numeric_limits<size_t>::max()), _requestTimer(parent->_factory->makeTimer()), _generation(0), _inFulfillRequests(false), @@ -413,7 +419,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk OwnedConnection conn) { auto connPtr = conn.get(); - _readyPool[connPtr] = std::move(conn); + // This makes the connection the new most-recently-used connection. + _readyPool.add(connPtr, std::move(conn)); // Our strategy for refreshing connections is to check them out and // immediately check them back in (which kicks off the refresh logic in @@ -497,6 +504,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex auto guard = MakeGuard([&] { _inFulfillRequests = false; }); while (_requests.size()) { + // _readyPool is an LRUCache, so its begin() object is the MRU item. auto iter = _readyPool.begin(); if (iter == _readyPool.end()) @@ -646,8 +654,9 @@ void ConnectionPool::SpecificPool::shutdown() { _parent->_pools.erase(_hostAndPort); } -ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromPool( - OwnershipPool& pool, ConnectionInterface* connPtr) { +template <typename OwnershipPoolType> +typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool( + OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) { auto iter = pool.find(connPtr); invariant(iter != pool.end()); diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 6a857602ca6..a3b5cca24a8 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -27,12 +27,17 @@ #include "mongo/platform/basic.h" +#include <algorithm> +#include <random> +#include <stack> + #include "mongo/executor/connection_pool_test_fixture.h" #include "mongo/executor/connection_pool.h" #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace executor { @@ -95,6 +100,175 @@ TEST_F(ConnectionPoolTest, SameConn) { } /** + * Verify that connections are obtained in MRU order. + */ +TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { + ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + + // Obtain a set of connections + constexpr size_t kSize = 100; + std::vector<ConnectionPool::ConnectionHandle> connections; + + // Ensure that no matter how we leave the test, we mark any + // checked out connections as OK before implicity returning them + // to the pool by destroying the 'connections' vector. Otherwise, + // this test would cause an invariant failure instead of a normal + // test failure if it fails, which would be confusing. + const auto guard = MakeGuard([&] { + while (!connections.empty()) { + try { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + conn->indicateSuccess(); + } catch (...) { + } + } + }); + + for (size_t i = 0; i != kSize; ++i) { + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + }); + } + + // Shuffle them into a random order + std::random_device rd; + std::mt19937 rng(rd()); + std::shuffle(connections.begin(), connections.end(), rng); + + // Return them to the pool in that random order, recording IDs in a stack + std::stack<size_t> ids; + while (!connections.empty()) { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + ids.push(static_cast<ConnectionImpl*>(conn.get())->id()); + conn->indicateSuccess(); + } + + // Re-obtain the connections. They should come back in the same order + // as the IDs in the stack, since the pool returns them in MRU order. + for (size_t i = 0; i != kSize; ++i) { + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + const auto id = CONN2ID(swConn); + connections.push_back(std::move(swConn.getValue())); + ASSERT(id == ids.top()); + ids.pop(); + }); + } +} + +/** + * Verify that recently used connections are not purged. + */ +TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { + ConnectionPool::Options options; + options.minConnections = 0; + options.refreshRequirement = Milliseconds(1000); + options.refreshTimeout = Milliseconds(5000); + options.hostTimeout = Minutes(1); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), 0U); + + // Obtain a set of connections + constexpr size_t kSize = 100; + std::vector<ConnectionPool::ConnectionHandle> connections; + + // Ensure that no matter how we leave the test, we mark any + // checked out connections as OK before implicity returning them + // to the pool by destroying the 'connections' vector. Otherwise, + // this test would cause an invariant failure instead of a normal + // test failure if it fails, which would be confusing. + const auto guard = MakeGuard([&] { + while (!connections.empty()) { + try { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + conn->indicateSuccess(); + } catch (...) { + } + } + }); + + auto now = Date_t::now(); + PoolImpl::setNow(now); + + // Check out kSize connections from the pool, and record their IDs in a set. + std::set<size_t> original_ids; + for (size_t i = 0; i != kSize; ++i) { + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + original_ids.insert(CONN2ID(swConn)); + connections.push_back(std::move(swConn.getValue())); + }); + } + + ASSERT_EQ(original_ids.size(), kSize); + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + + // Shuffle them into a random order + std::random_device rd; + std::mt19937 rng(rd()); + std::shuffle(connections.begin(), connections.end(), rng); + + // Return them to the pool in that random order. + while (!connections.empty()) { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + conn->indicateSuccess(); + } + + // Advance the time, but not enough to age out connections. We should still have them all. + PoolImpl::setNow(now + Milliseconds(500)); + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + + // Re-obtain a quarter of the connections, and record their IDs in a set. + std::set<size_t> reacquired_ids; + for (size_t i = 0; i < kSize / 4; ++i) { + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + reacquired_ids.insert(CONN2ID(swConn)); + connections.push_back(std::move(swConn.getValue())); + }); + } + + ASSERT_EQ(reacquired_ids.size(), kSize / 4); + ASSERT(std::includes( + original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end())); + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + + // Put them right back in. + while (!connections.empty()) { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + conn->indicateSuccess(); + } + + // We should still have all of them in the pool + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + + // Advance across the host timeout for the 75 connections we + // didn't use. Afterwards, the pool should contain only those + // kSize/4 connections we used above. + PoolImpl::setNow(now + Milliseconds(1000)); + ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize / 4); +} + +/** * Verify that a failed connection isn't returned to the pool */ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { diff --git a/src/mongo/util/lru_cache.h b/src/mongo/util/lru_cache.h index 3f99fd04442..5feea1a0bc3 100644 --- a/src/mongo/util/lru_cache.h +++ b/src/mongo/util/lru_cache.h @@ -72,6 +72,9 @@ public: using Map = stdx::unordered_map<K, iterator, Hash, KeyEqual>; + using key_type = K; + using mapped_type = V; + /** * Inserts a new entry into the cache. If the given key already exists in the cache, * then we will drop the old entry and replace it with the given new entry. The cache @@ -103,7 +106,7 @@ public: this->_list.pop_back(); invariant(this->size() <= this->_maxSize); - return result; + return std::move(result); } invariant(this->size() <= this->_maxSize); @@ -167,28 +170,29 @@ public: } /** - * Removes the element in the cache stored for this key, if one exists. + * Removes the element in the cache stored for this key, if one + * exists. Returns the count of elements erased. */ - void remove(const K& key) { + typename Map::size_type erase(const K& key) { auto it = this->_map.find(key); if (it == this->_map.end()) { - return; + return 0; } this->_list.erase(it->second); this->_map.erase(it); + return 1; } /** - * Removes the element pointed to by the given iterator from this cache. + * Removes the element pointed to by the given iterator from this + * cache, and returns an iterator to the next least recently used + * element, or the end iterator, if no such element exists. */ - void remove(iterator it) { - if (it == this->_list.end()) { - return; - } - - this->_map.erase(it->first); - this->_list.erase(it); + iterator erase(iterator it) { + invariant(it != this->_list.end()); + invariant(this->_map.erase(it->first) == 1); + return this->_list.erase(it); } /** @@ -214,6 +218,10 @@ public: return this->_list.size(); } + bool empty() const { + return this->_list.empty(); + } + /** * Returns an iterator pointing to the most recently used element in the cache. */ @@ -256,6 +264,10 @@ public: return this->_list.cend(); } + typename Map::size_type count(const K& key) const { + return this->_map.count(key); + } + private: // The maximum allowable number of entries in the cache. const std::size_t _maxSize; diff --git a/src/mongo/util/lru_cache_test.cpp b/src/mongo/util/lru_cache_test.cpp index ea53d392aed..48fd7efefd6 100644 --- a/src/mongo/util/lru_cache_test.cpp +++ b/src/mongo/util/lru_cache_test.cpp @@ -200,8 +200,8 @@ TEST(LRUCacheTest, SizeZeroCache) { assertEquals(cache.size(), size_t(0)); assertNotInCache(cache, 3); - // Remove should be a no-op - cache.remove(4); + // Erase should be a no-op + assertEquals(cache.erase(4), size_t(0)); assertEquals(cache.size(), size_t(0)); assertNotInCache(cache, 4); @@ -231,11 +231,12 @@ TEST(LRUCacheTest, StressTest) { assertEquals(found->second, s); assertEquals(found, cache.begin()); - cache.remove(found); + const auto nextAfterFound = std::next(found); + assertEquals(cache.erase(found), nextAfterFound); assertEquals(cache.size(), size_t(maxSize - 1)); cache.add(s, s); assertEquals(cache.size(), size_t(maxSize)); - cache.remove(s); + assertEquals(cache.erase(s), size_t(1)); assertEquals(cache.size(), size_t(maxSize - 1)); cache.add(s, s); } @@ -366,11 +367,11 @@ TEST(LRUCacheTest, ReplaceKeyTest) { // Test that calling add() with a key that already exists in the cache deletes // the existing entry and gets promoted properly -TEST(LRUCacheTest, RemoveByKey) { +TEST(LRUCacheTest, EraseByKey) { runWithDifferentSizes([](int maxSize) { // Test replacement for any position in the original cache - // i <= maxSize so we remove a non-existent element + // i <= maxSize so we erase a non-existent element for (int i = 0; i <= maxSize; i++) { LRUCache<int, int> cache(maxSize); @@ -382,11 +383,12 @@ TEST(LRUCacheTest, RemoveByKey) { assertEquals(cache.size(), size_t(maxSize)); - // Remove an element - cache.remove(i); + // Erase an element if (i != maxSize) { + assertEquals(cache.erase(i), size_t(1)); assertEquals(cache.size(), size_t(maxSize - 1)); } else { + assertEquals(cache.erase(i), size_t(0)); assertEquals(cache.size(), size_t(maxSize)); } @@ -403,12 +405,11 @@ TEST(LRUCacheTest, RemoveByKey) { } // Test removal of elements by iterator from the cache -TEST(LRUCacheTest, RemoveByIterator) { +TEST(LRUCacheTest, EraseByIterator) { runWithDifferentSizes([](int maxSize) { // Test replacement for any position in the original cache - // i <= maxSize so we remove a non-existent element - for (int i = 0; i <= maxSize; i++) { + for (int i = 0; i < maxSize; i++) { LRUCache<int, int> cache(maxSize); // Fill up the cache @@ -426,7 +427,8 @@ TEST(LRUCacheTest, RemoveByIterator) { elem--; } - cache.remove(it); + auto nextElement = std::next(it); + assertEquals(cache.erase(it), nextElement); if (i == maxSize) { assertEquals(cache.size(), size_t(maxSize)); @@ -576,4 +578,24 @@ TEST(LRUCacheTest, CustomHashAndEqualityTypeTest) { assertEquals(found->second, sortaEqual._b); } +TEST(LRUCacheTest, EmptyTest) { + const int maxSize = 4; + LRUCache<int, int> cache(maxSize); + assertEquals(cache.empty(), true); + cache.add(1, 2); + assertEquals(cache.empty(), false); + cache.erase(1); + assertEquals(cache.empty(), true); +} + +TEST(LRUCacheTest, CountTest) { + const int maxSize = 4; + LRUCache<int, int> cache(maxSize); + assertEquals(cache.count(1), size_t(0)); + cache.add(1, 2); + assertEquals(cache.count(1), size_t(1)); + cache.erase(1); + assertEquals(cache.count(1), size_t(0)); +} + } // namespace |