diff options
author | Michael Cahill <michael.cahill@mongodb.com> | 2015-06-29 11:51:02 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@mongodb.com> | 2015-06-29 11:51:02 +1000 |
commit | 91c9eaeae7dbe3ff5e5e5b369871562beda908a8 (patch) | |
tree | be0e470f7ee4c9492e9b330835e03d83d7a3a1fd /src/mongo/db/storage | |
parent | 38f937036b5033bd50a9fd740e897415bd9f21db (diff) | |
download | mongo-91c9eaeae7dbe3ff5e5e5b369871562beda908a8.tar.gz |
SERVER-17386 WiredTiger session cache improvements
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp | 147 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h | 65 |
2 files changed, 110 insertions, 102 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 9bc0f9687a2..e5be54f627e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -40,29 +40,29 @@ namespace mongo { -WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int cachePartition, int epoch) - : _cachePartition(cachePartition), _epoch(epoch), _session(NULL), _cursorsOut(0) { - int ret = conn->open_session(conn, NULL, "isolation=snapshot", &_session); - invariantWTOK(ret); +WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch) + : _epoch(epoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) { + invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } WiredTigerSession::~WiredTigerSession() { if (_session) { - int ret = _session->close(_session, NULL); - invariantWTOK(ret); + invariantWTOK(_session->close(_session, NULL)); } } WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id, bool forRecordStore) { - { - Cursors& cursors = _curmap[id]; - if (!cursors.empty()) { - WT_CURSOR* save = cursors.back(); - cursors.pop_back(); + // Find the most recently used cursor + for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + if (i->_id == id) { + WT_CURSOR* c = i->_cursor; + _cursors.erase(i); _cursorsOut++; - return save; + _cursorsCached--; + return c; } } + WT_CURSOR* c = NULL; int ret = _session->open_cursor( _session, uri.c_str(), NULL, forRecordStore ? "" : "overwrite=false", &c); @@ -78,33 +78,39 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { invariant(cursor); _cursorsOut--; - Cursors& cursors = _curmap[id]; - if (cursors.size() > 10u) { + invariantWTOK(cursor->reset(cursor)); + + // Cursors are pushed to the front of the list and removed from the back + _cursors.push_front(WiredTigerCachedCursor(id, _cursorGen++, cursor)); + _cursorsCached++; + + // "Old" is defined as not used in the last N**2 operations, if we have N cursors cached. + // The reasoning here is to imagine a workload with N tables performing operations randomly + // across all of them (i.e., each cursor has 1/N chance of used for each operation). We + // would like to cache N cursors in that case, so any given cursor could go N**2 operations + // in between use. + uint64_t cutoff = std::max(100, _cursorsCached * _cursorsCached); + while (_cursorGen - _cursors.back()._gen > cutoff) { + cursor = _cursors.back()._cursor; + _cursors.pop_back(); invariantWTOK(cursor->close(cursor)); - } else { - invariantWTOK(cursor->reset(cursor)); - cursors.push_back(cursor); } } void WiredTigerSession::closeAllCursors() { invariant(_session); - for (CursorMap::iterator i = _curmap.begin(); i != _curmap.end(); ++i) { - Cursors& cursors = i->second; - for (size_t j = 0; j < cursors.size(); j++) { - WT_CURSOR* cursor = cursors[j]; - if (cursor) { - int ret = cursor->close(cursor); - invariantWTOK(ret); - } + for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + WT_CURSOR* cursor = i->_cursor; + if (cursor) { + invariantWTOK(cursor->close(cursor)); } } - _curmap.clear(); + _cursors.clear(); } namespace { AtomicUInt64 nextCursorId(1); -AtomicUInt64 cachePartitionGen(0); +AtomicUInt64 sessionsInCache(0); } // static uint64_t WiredTigerSession::genCursorId() { @@ -114,10 +120,14 @@ uint64_t WiredTigerSession::genCursorId() { // ----------------------- WiredTigerSessionCache::WiredTigerSessionCache(WiredTigerKVEngine* engine) - : _engine(engine), _conn(engine->getConnection()), _shuttingDown(0) {} + : _engine(engine), + _conn(engine->getConnection()), + _shuttingDown(0), + _sessionsOut(0), + _highWaterMark(1) {} WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn) - : _engine(NULL), _conn(conn), _shuttingDown(0) {} + : _engine(NULL), _conn(conn), _shuttingDown(0), _sessionsOut(0), _highWaterMark(1) {} WiredTigerSessionCache::~WiredTigerSessionCache() { shuttingDown(); @@ -139,21 +149,17 @@ void WiredTigerSessionCache::shuttingDown() { } void WiredTigerSessionCache::closeAll() { - for (int i = 0; i < NumSessionCachePartitions; i++) { - SessionPool swapPool; - - { - stdx::unique_lock<SpinLock> scopedLock(_cache[i].lock); - _cache[i].pool.swap(swapPool); - _cache[i].epoch++; - } + // Increment the epoch as we are now closing all sessions with this epoch + SessionCache swap; - // New sessions will be created if need be outside of the lock - for (size_t i = 0; i < swapPool.size(); i++) { - delete swapPool[i]; - } + { + stdx::lock_guard<SpinLock> lock(_cacheLock); + _epoch++; + _sessions.swap(swap); + } - swapPool.clear(); + for (SessionCache::iterator i = swap.begin(); i != swap.end(); i++) { + delete (*i); } } @@ -164,25 +170,25 @@ WiredTigerSession* WiredTigerSessionCache::getSession() { // operations should be allowed to start. invariant(!_shuttingDown.loadRelaxed()); - // Spread sessions uniformly across the cache partitions - const int cachePartition = cachePartitionGen.addAndFetch(1) % NumSessionCachePartitions; - - int epoch; - - { - stdx::unique_lock<SpinLock> cachePartitionLock(_cache[cachePartition].lock); - epoch = _cache[cachePartition].epoch; - - if (!_cache[cachePartition].pool.empty()) { - WiredTigerSession* cachedSession = _cache[cachePartition].pool.back(); - _cache[cachePartition].pool.pop_back(); + // Set the high water mark if we need to + if (_sessionsOut.fetchAndAdd(1) > _highWaterMark.load()) { + _highWaterMark.store(_sessionsOut.load()); + } + if (!_sessions.empty()) { + stdx::lock_guard<SpinLock> lock(_cacheLock); + if (!_sessions.empty()) { + // Get the most recently used session so that if we discard sessions, we're + // discarding older ones + WiredTigerSession* cachedSession = _sessions.back(); + _sessions.pop_back(); + sessionsInCache.fetchAndSubtract(1); return cachedSession; } } // Outside of the cache partition lock, but on release will be put back on the cache - return new WiredTigerSession(_conn, cachePartition, epoch); + return new WiredTigerSession(_conn, _epoch); } void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { @@ -207,27 +213,32 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { invariant(range == 0); } - const int cachePartition = session->_getCachePartition(); - bool returnedToCache = false; - - if (cachePartition >= 0) { - stdx::unique_lock<SpinLock> cachePartitionLock(_cache[cachePartition].lock); - - invariant(session->_getEpoch() <= _cache[cachePartition].epoch); + _sessionsOut.fetchAndSubtract(1); - if (session->_getEpoch() == _cache[cachePartition].epoch) { - _cache[cachePartition].pool.push_back(session); - returnedToCache = true; - } + bool returnedToCache = false; + invariant(session->_getEpoch() <= _epoch); + + // Only return sessions until we hit the maximum number of sessions we have ever seen demand + // for concurrently. We also want to immediately delete any session that is from a + // non-current epoch. + if (session->_getEpoch() == _epoch && sessionsInCache.load() < _highWaterMark.load()) { + returnedToCache = true; + stdx::lock_guard<SpinLock> lock(_cacheLock); + _sessions.push_back(session); } - // Do all cleanup outside of the cache partition spinlock. - if (!returnedToCache) { + if (returnedToCache) { + sessionsInCache.fetchAndAdd(1); + } else { delete session; } if (_engine && _engine->haveDropsQueued()) { _engine->dropAllQueued(); } + + if (_engine && _engine->haveDropsQueued()) { + _engine->dropAllQueued(); + } } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index 9fd575232b9..cf3ce29bbe5 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -31,9 +31,8 @@ #pragma once -#include <map> +#include <list> #include <string> -#include <vector> #include <boost/thread/shared_mutex.hpp> #include <wiredtiger.h> @@ -46,6 +45,16 @@ namespace mongo { class WiredTigerKVEngine; +class WiredTigerCachedCursor { +public: + WiredTigerCachedCursor(uint64_t id, uint64_t gen, WT_CURSOR* cursor) + : _id(id), _gen(gen), _cursor(cursor) {} + + uint64_t _id; // Source ID, assigned to each URI + uint64_t _gen; // Generation, used to age out old cursors + WT_CURSOR* _cursor; +}; + /** * This is a structure that caches 1 cursor for each uri. * The idea is that there is a pool of these somewhere. @@ -64,7 +73,7 @@ public: * of -1 means that this value is not necessary since the session will not be * cached. */ - WiredTigerSession(WT_CONNECTION* conn, int cachePartition = -1, int epoch = -1); + WiredTigerSession(WT_CONNECTION* conn, int epoch = -1); ~WiredTigerSession(); WT_SESSION* getSession() const { @@ -72,6 +81,7 @@ public: } WT_CURSOR* getCursor(const std::string& uri, uint64_t id, bool forRecordStore); + void releaseCursor(uint64_t id, WT_CURSOR* cursor); void closeAllCursors(); @@ -90,24 +100,19 @@ public: private: friend class WiredTigerSessionCache; - typedef std::vector<WT_CURSOR*> Cursors; - typedef std::map<uint64_t, Cursors> CursorMap; - + // The cursor cache is a list of pairs that contain an ID and cursor + typedef std::list<WiredTigerCachedCursor> CursorCache; // Used internally by WiredTigerSessionCache int _getEpoch() const { return _epoch; } - int _getCachePartition() const { - return _cachePartition; - } - - const int _cachePartition; const int _epoch; WT_SESSION* _session; // owned - CursorMap _curmap; // owned - int _cursorsOut; + CursorCache _cursors; // owned + uint64_t _cursorGen; + int _cursorsCached, _cursorsOut; }; class WiredTigerSessionCache { @@ -128,34 +133,26 @@ public: } private: - typedef std::vector<WiredTigerSession*> SessionPool; - - enum { NumSessionCachePartitions = 64 }; - - struct SessionCachePartition { - SessionCachePartition() : epoch(0) {} - ~SessionCachePartition() { - invariant(pool.empty()); - } - - SpinLock lock; - int epoch; - SessionPool pool; - }; - - WiredTigerKVEngine* _engine; // not owned, might be NULL WT_CONNECTION* _conn; // not owned - // Partitioned cache of WT sessions. The partition key is not important, but it is - // important that sessions be returned to the same partition they were taken from in order - // to have some form of balance between the partitions. - SessionCachePartition _cache[NumSessionCachePartitions]; - // Regular operations take it in shared mode. Shutdown sets the _shuttingDown flag and // then takes it in exclusive mode. This ensures that all threads, which would return // sessions to the cache would leak them. boost::shared_mutex _shutdownLock; AtomicUInt32 _shuttingDown; // Used as boolean - 0 = false, 1 = true + + SpinLock _cacheLock; + typedef std::list<WiredTigerSession*> SessionCache; + SessionCache _sessions; + + // Bumped when all open sessions need to be closed + int _epoch; + + // How many sessions are in use concurrently + AtomicUInt32 _sessionsOut; + + // The most sessions we have ever in use concurrently. + AtomicUInt32 _highWaterMark; }; } |