summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAndrew Morrow <acm@mongodb.com>2017-04-08 14:02:46 -0400
committerAndrew Morrow <acm@mongodb.com>2017-04-13 18:13:31 -0400
commit3edc5558fa1728761549c775350a2e17fb68f8ab (patch)
tree0054521588c112db14355a889c71c8e665dcbf45 /src/mongo
parentcc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d (diff)
downloadmongo-3edc5558fa1728761549c775350a2e17fb68f8ab.tar.gz
SERVER-28664 Use pool connections in MRU order
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/connection_pool.cpp19
-rw-r--r--src/mongo/executor/connection_pool_test.cpp174
-rw-r--r--src/mongo/util/lru_cache.h36
-rw-r--r--src/mongo/util/lru_cache_test.cpp46
4 files changed, 246 insertions, 29 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 7de24928f8f..78665f7f11e 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -39,6 +39,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
+#include "mongo/util/lru_cache.h"
#include "mongo/util/scopeguard.h"
// One interesting implementation note herein concerns how setup() and
@@ -115,6 +116,7 @@ public:
private:
using OwnedConnection = std::unique_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
+ using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
using Request = std::pair<Date_t, GetConnectionCallback>;
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
@@ -130,7 +132,10 @@ private:
void shutdown();
- OwnedConnection takeFromPool(OwnershipPool& pool, ConnectionInterface* connection);
+ template <typename OwnershipPoolType>
+ typename OwnershipPoolType::mapped_type takeFromPool(
+ OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr);
+
OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
void updateStateInLock();
@@ -140,7 +145,7 @@ private:
const HostAndPort _hostAndPort;
- OwnershipPool _readyPool;
+ LRUOwnershipPool _readyPool;
OwnershipPool _processingPool;
OwnershipPool _droppedProcessingPool;
OwnershipPool _checkedOutPool;
@@ -269,6 +274,7 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) {
ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
: _parent(parent),
_hostAndPort(hostAndPort),
+ _readyPool(std::numeric_limits<size_t>::max()),
_requestTimer(parent->_factory->makeTimer()),
_generation(0),
_inFulfillRequests(false),
@@ -413,7 +419,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
OwnedConnection conn) {
auto connPtr = conn.get();
- _readyPool[connPtr] = std::move(conn);
+ // This makes the connection the new most-recently-used connection.
+ _readyPool.add(connPtr, std::move(conn));
// Our strategy for refreshing connections is to check them out and
// immediately check them back in (which kicks off the refresh logic in
@@ -497,6 +504,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
auto guard = MakeGuard([&] { _inFulfillRequests = false; });
while (_requests.size()) {
+ // _readyPool is an LRUCache, so its begin() object is the MRU item.
auto iter = _readyPool.begin();
if (iter == _readyPool.end())
@@ -646,8 +654,9 @@ void ConnectionPool::SpecificPool::shutdown() {
_parent->_pools.erase(_hostAndPort);
}
-ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromPool(
- OwnershipPool& pool, ConnectionInterface* connPtr) {
+template <typename OwnershipPoolType>
+typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool(
+ OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) {
auto iter = pool.find(connPtr);
invariant(iter != pool.end());
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 6a857602ca6..a3b5cca24a8 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -27,12 +27,17 @@
#include "mongo/platform/basic.h"
+#include <algorithm>
+#include <random>
+#include <stack>
+
#include "mongo/executor/connection_pool_test_fixture.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace executor {
@@ -95,6 +100,175 @@ TEST_F(ConnectionPoolTest, SameConn) {
}
/**
+ * Verify that connections are obtained in MRU order.
+ */
+TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+
+ // Obtain a set of connections
+ constexpr size_t kSize = 100;
+ std::vector<ConnectionPool::ConnectionHandle> connections;
+
+ // Ensure that no matter how we leave the test, we mark any
+ // checked out connections as OK before implicity returning them
+ // to the pool by destroying the 'connections' vector. Otherwise,
+ // this test would cause an invariant failure instead of a normal
+ // test failure if it fails, which would be confusing.
+ const auto guard = MakeGuard([&] {
+ while (!connections.empty()) {
+ try {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ conn->indicateSuccess();
+ } catch (...) {
+ }
+ }
+ });
+
+ for (size_t i = 0; i != kSize; ++i) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ });
+ }
+
+ // Shuffle them into a random order
+ std::random_device rd;
+ std::mt19937 rng(rd());
+ std::shuffle(connections.begin(), connections.end(), rng);
+
+ // Return them to the pool in that random order, recording IDs in a stack
+ std::stack<size_t> ids;
+ while (!connections.empty()) {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ ids.push(static_cast<ConnectionImpl*>(conn.get())->id());
+ conn->indicateSuccess();
+ }
+
+ // Re-obtain the connections. They should come back in the same order
+ // as the IDs in the stack, since the pool returns them in MRU order.
+ for (size_t i = 0; i != kSize; ++i) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ const auto id = CONN2ID(swConn);
+ connections.push_back(std::move(swConn.getValue()));
+ ASSERT(id == ids.top());
+ ids.pop();
+ });
+ }
+}
+
+/**
+ * Verify that recently used connections are not purged.
+ */
+TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
+ ConnectionPool::Options options;
+ options.minConnections = 0;
+ options.refreshRequirement = Milliseconds(1000);
+ options.refreshTimeout = Milliseconds(5000);
+ options.hostTimeout = Minutes(1);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), 0U);
+
+ // Obtain a set of connections
+ constexpr size_t kSize = 100;
+ std::vector<ConnectionPool::ConnectionHandle> connections;
+
+ // Ensure that no matter how we leave the test, we mark any
+ // checked out connections as OK before implicity returning them
+ // to the pool by destroying the 'connections' vector. Otherwise,
+ // this test would cause an invariant failure instead of a normal
+ // test failure if it fails, which would be confusing.
+ const auto guard = MakeGuard([&] {
+ while (!connections.empty()) {
+ try {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ conn->indicateSuccess();
+ } catch (...) {
+ }
+ }
+ });
+
+ auto now = Date_t::now();
+ PoolImpl::setNow(now);
+
+ // Check out kSize connections from the pool, and record their IDs in a set.
+ std::set<size_t> original_ids;
+ for (size_t i = 0; i != kSize; ++i) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ original_ids.insert(CONN2ID(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ });
+ }
+
+ ASSERT_EQ(original_ids.size(), kSize);
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+
+ // Shuffle them into a random order
+ std::random_device rd;
+ std::mt19937 rng(rd());
+ std::shuffle(connections.begin(), connections.end(), rng);
+
+ // Return them to the pool in that random order.
+ while (!connections.empty()) {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ conn->indicateSuccess();
+ }
+
+ // Advance the time, but not enough to age out connections. We should still have them all.
+ PoolImpl::setNow(now + Milliseconds(500));
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+
+ // Re-obtain a quarter of the connections, and record their IDs in a set.
+ std::set<size_t> reacquired_ids;
+ for (size_t i = 0; i < kSize / 4; ++i) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ reacquired_ids.insert(CONN2ID(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ });
+ }
+
+ ASSERT_EQ(reacquired_ids.size(), kSize / 4);
+ ASSERT(std::includes(
+ original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end()));
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+
+ // Put them right back in.
+ while (!connections.empty()) {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ conn->indicateSuccess();
+ }
+
+ // We should still have all of them in the pool
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+
+ // Advance across the host timeout for the 75 connections we
+ // didn't use. Afterwards, the pool should contain only those
+ // kSize/4 connections we used above.
+ PoolImpl::setNow(now + Milliseconds(1000));
+ ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize / 4);
+}
+
+/**
* Verify that a failed connection isn't returned to the pool
*/
TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
diff --git a/src/mongo/util/lru_cache.h b/src/mongo/util/lru_cache.h
index 3f99fd04442..5feea1a0bc3 100644
--- a/src/mongo/util/lru_cache.h
+++ b/src/mongo/util/lru_cache.h
@@ -72,6 +72,9 @@ public:
using Map = stdx::unordered_map<K, iterator, Hash, KeyEqual>;
+ using key_type = K;
+ using mapped_type = V;
+
/**
* Inserts a new entry into the cache. If the given key already exists in the cache,
* then we will drop the old entry and replace it with the given new entry. The cache
@@ -103,7 +106,7 @@ public:
this->_list.pop_back();
invariant(this->size() <= this->_maxSize);
- return result;
+ return std::move(result);
}
invariant(this->size() <= this->_maxSize);
@@ -167,28 +170,29 @@ public:
}
/**
- * Removes the element in the cache stored for this key, if one exists.
+ * Removes the element in the cache stored for this key, if one
+ * exists. Returns the count of elements erased.
*/
- void remove(const K& key) {
+ typename Map::size_type erase(const K& key) {
auto it = this->_map.find(key);
if (it == this->_map.end()) {
- return;
+ return 0;
}
this->_list.erase(it->second);
this->_map.erase(it);
+ return 1;
}
/**
- * Removes the element pointed to by the given iterator from this cache.
+ * Removes the element pointed to by the given iterator from this
+ * cache, and returns an iterator to the next least recently used
+ * element, or the end iterator, if no such element exists.
*/
- void remove(iterator it) {
- if (it == this->_list.end()) {
- return;
- }
-
- this->_map.erase(it->first);
- this->_list.erase(it);
+ iterator erase(iterator it) {
+ invariant(it != this->_list.end());
+ invariant(this->_map.erase(it->first) == 1);
+ return this->_list.erase(it);
}
/**
@@ -214,6 +218,10 @@ public:
return this->_list.size();
}
+ bool empty() const {
+ return this->_list.empty();
+ }
+
/**
* Returns an iterator pointing to the most recently used element in the cache.
*/
@@ -256,6 +264,10 @@ public:
return this->_list.cend();
}
+ typename Map::size_type count(const K& key) const {
+ return this->_map.count(key);
+ }
+
private:
// The maximum allowable number of entries in the cache.
const std::size_t _maxSize;
diff --git a/src/mongo/util/lru_cache_test.cpp b/src/mongo/util/lru_cache_test.cpp
index ea53d392aed..48fd7efefd6 100644
--- a/src/mongo/util/lru_cache_test.cpp
+++ b/src/mongo/util/lru_cache_test.cpp
@@ -200,8 +200,8 @@ TEST(LRUCacheTest, SizeZeroCache) {
assertEquals(cache.size(), size_t(0));
assertNotInCache(cache, 3);
- // Remove should be a no-op
- cache.remove(4);
+ // Erase should be a no-op
+ assertEquals(cache.erase(4), size_t(0));
assertEquals(cache.size(), size_t(0));
assertNotInCache(cache, 4);
@@ -231,11 +231,12 @@ TEST(LRUCacheTest, StressTest) {
assertEquals(found->second, s);
assertEquals(found, cache.begin());
- cache.remove(found);
+ const auto nextAfterFound = std::next(found);
+ assertEquals(cache.erase(found), nextAfterFound);
assertEquals(cache.size(), size_t(maxSize - 1));
cache.add(s, s);
assertEquals(cache.size(), size_t(maxSize));
- cache.remove(s);
+ assertEquals(cache.erase(s), size_t(1));
assertEquals(cache.size(), size_t(maxSize - 1));
cache.add(s, s);
}
@@ -366,11 +367,11 @@ TEST(LRUCacheTest, ReplaceKeyTest) {
// Test that calling add() with a key that already exists in the cache deletes
// the existing entry and gets promoted properly
-TEST(LRUCacheTest, RemoveByKey) {
+TEST(LRUCacheTest, EraseByKey) {
runWithDifferentSizes([](int maxSize) {
// Test replacement for any position in the original cache
- // i <= maxSize so we remove a non-existent element
+ // i <= maxSize so we erase a non-existent element
for (int i = 0; i <= maxSize; i++) {
LRUCache<int, int> cache(maxSize);
@@ -382,11 +383,12 @@ TEST(LRUCacheTest, RemoveByKey) {
assertEquals(cache.size(), size_t(maxSize));
- // Remove an element
- cache.remove(i);
+ // Erase an element
if (i != maxSize) {
+ assertEquals(cache.erase(i), size_t(1));
assertEquals(cache.size(), size_t(maxSize - 1));
} else {
+ assertEquals(cache.erase(i), size_t(0));
assertEquals(cache.size(), size_t(maxSize));
}
@@ -403,12 +405,11 @@ TEST(LRUCacheTest, RemoveByKey) {
}
// Test removal of elements by iterator from the cache
-TEST(LRUCacheTest, RemoveByIterator) {
+TEST(LRUCacheTest, EraseByIterator) {
runWithDifferentSizes([](int maxSize) {
// Test replacement for any position in the original cache
- // i <= maxSize so we remove a non-existent element
- for (int i = 0; i <= maxSize; i++) {
+ for (int i = 0; i < maxSize; i++) {
LRUCache<int, int> cache(maxSize);
// Fill up the cache
@@ -426,7 +427,8 @@ TEST(LRUCacheTest, RemoveByIterator) {
elem--;
}
- cache.remove(it);
+ auto nextElement = std::next(it);
+ assertEquals(cache.erase(it), nextElement);
if (i == maxSize) {
assertEquals(cache.size(), size_t(maxSize));
@@ -576,4 +578,24 @@ TEST(LRUCacheTest, CustomHashAndEqualityTypeTest) {
assertEquals(found->second, sortaEqual._b);
}
+TEST(LRUCacheTest, EmptyTest) {
+ const int maxSize = 4;
+ LRUCache<int, int> cache(maxSize);
+ assertEquals(cache.empty(), true);
+ cache.add(1, 2);
+ assertEquals(cache.empty(), false);
+ cache.erase(1);
+ assertEquals(cache.empty(), true);
+}
+
+TEST(LRUCacheTest, CountTest) {
+ const int maxSize = 4;
+ LRUCache<int, int> cache(maxSize);
+ assertEquals(cache.count(1), size_t(0));
+ cache.add(1, 2);
+ assertEquals(cache.count(1), size_t(1));
+ cache.erase(1);
+ assertEquals(cache.count(1), size_t(0));
+}
+
} // namespace