summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@mongodb.com>2015-06-29 11:51:02 +1000
committerMichael Cahill <michael.cahill@mongodb.com>2015-06-29 11:51:02 +1000
commit91c9eaeae7dbe3ff5e5e5b369871562beda908a8 (patch)
treebe0e470f7ee4c9492e9b330835e03d83d7a3a1fd /src/mongo/db/storage
parent38f937036b5033bd50a9fd740e897415bd9f21db (diff)
downloadmongo-91c9eaeae7dbe3ff5e5e5b369871562beda908a8.tar.gz
SERVER-17386 WiredTiger session cache improvements
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp147
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h65
2 files changed, 110 insertions, 102 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 9bc0f9687a2..e5be54f627e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -40,29 +40,29 @@
namespace mongo {
-WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int cachePartition, int epoch)
- : _cachePartition(cachePartition), _epoch(epoch), _session(NULL), _cursorsOut(0) {
- int ret = conn->open_session(conn, NULL, "isolation=snapshot", &_session);
- invariantWTOK(ret);
+WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch)
+ : _epoch(epoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) {
+ invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
}
WiredTigerSession::~WiredTigerSession() {
if (_session) {
- int ret = _session->close(_session, NULL);
- invariantWTOK(ret);
+ invariantWTOK(_session->close(_session, NULL));
}
}
WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id, bool forRecordStore) {
- {
- Cursors& cursors = _curmap[id];
- if (!cursors.empty()) {
- WT_CURSOR* save = cursors.back();
- cursors.pop_back();
+ // Find the most recently used cursor
+ for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
+ if (i->_id == id) {
+ WT_CURSOR* c = i->_cursor;
+ _cursors.erase(i);
_cursorsOut++;
- return save;
+ _cursorsCached--;
+ return c;
}
}
+
WT_CURSOR* c = NULL;
int ret = _session->open_cursor(
_session, uri.c_str(), NULL, forRecordStore ? "" : "overwrite=false", &c);
@@ -78,33 +78,39 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) {
invariant(cursor);
_cursorsOut--;
- Cursors& cursors = _curmap[id];
- if (cursors.size() > 10u) {
+ invariantWTOK(cursor->reset(cursor));
+
+ // Cursors are pushed to the front of the list and removed from the back
+ _cursors.push_front(WiredTigerCachedCursor(id, _cursorGen++, cursor));
+ _cursorsCached++;
+
+ // "Old" is defined as not used in the last N**2 operations, if we have N cursors cached.
+ // The reasoning here is to imagine a workload with N tables performing operations randomly
+ // across all of them (i.e., each cursor has 1/N chance of used for each operation). We
+ // would like to cache N cursors in that case, so any given cursor could go N**2 operations
+ // in between use.
+ uint64_t cutoff = std::max(100, _cursorsCached * _cursorsCached);
+ while (_cursorGen - _cursors.back()._gen > cutoff) {
+ cursor = _cursors.back()._cursor;
+ _cursors.pop_back();
invariantWTOK(cursor->close(cursor));
- } else {
- invariantWTOK(cursor->reset(cursor));
- cursors.push_back(cursor);
}
}
void WiredTigerSession::closeAllCursors() {
invariant(_session);
- for (CursorMap::iterator i = _curmap.begin(); i != _curmap.end(); ++i) {
- Cursors& cursors = i->second;
- for (size_t j = 0; j < cursors.size(); j++) {
- WT_CURSOR* cursor = cursors[j];
- if (cursor) {
- int ret = cursor->close(cursor);
- invariantWTOK(ret);
- }
+ for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
+ WT_CURSOR* cursor = i->_cursor;
+ if (cursor) {
+ invariantWTOK(cursor->close(cursor));
}
}
- _curmap.clear();
+ _cursors.clear();
}
namespace {
AtomicUInt64 nextCursorId(1);
-AtomicUInt64 cachePartitionGen(0);
+AtomicUInt64 sessionsInCache(0);
}
// static
uint64_t WiredTigerSession::genCursorId() {
@@ -114,10 +120,14 @@ uint64_t WiredTigerSession::genCursorId() {
// -----------------------
WiredTigerSessionCache::WiredTigerSessionCache(WiredTigerKVEngine* engine)
- : _engine(engine), _conn(engine->getConnection()), _shuttingDown(0) {}
+ : _engine(engine),
+ _conn(engine->getConnection()),
+ _shuttingDown(0),
+ _sessionsOut(0),
+ _highWaterMark(1) {}
WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn)
- : _engine(NULL), _conn(conn), _shuttingDown(0) {}
+ : _engine(NULL), _conn(conn), _shuttingDown(0), _sessionsOut(0), _highWaterMark(1) {}
WiredTigerSessionCache::~WiredTigerSessionCache() {
shuttingDown();
@@ -139,21 +149,17 @@ void WiredTigerSessionCache::shuttingDown() {
}
void WiredTigerSessionCache::closeAll() {
- for (int i = 0; i < NumSessionCachePartitions; i++) {
- SessionPool swapPool;
-
- {
- stdx::unique_lock<SpinLock> scopedLock(_cache[i].lock);
- _cache[i].pool.swap(swapPool);
- _cache[i].epoch++;
- }
+ // Increment the epoch as we are now closing all sessions with this epoch
+ SessionCache swap;
- // New sessions will be created if need be outside of the lock
- for (size_t i = 0; i < swapPool.size(); i++) {
- delete swapPool[i];
- }
+ {
+ stdx::lock_guard<SpinLock> lock(_cacheLock);
+ _epoch++;
+ _sessions.swap(swap);
+ }
- swapPool.clear();
+ for (SessionCache::iterator i = swap.begin(); i != swap.end(); i++) {
+ delete (*i);
}
}
@@ -164,25 +170,25 @@ WiredTigerSession* WiredTigerSessionCache::getSession() {
// operations should be allowed to start.
invariant(!_shuttingDown.loadRelaxed());
- // Spread sessions uniformly across the cache partitions
- const int cachePartition = cachePartitionGen.addAndFetch(1) % NumSessionCachePartitions;
-
- int epoch;
-
- {
- stdx::unique_lock<SpinLock> cachePartitionLock(_cache[cachePartition].lock);
- epoch = _cache[cachePartition].epoch;
-
- if (!_cache[cachePartition].pool.empty()) {
- WiredTigerSession* cachedSession = _cache[cachePartition].pool.back();
- _cache[cachePartition].pool.pop_back();
+ // Set the high water mark if we need to
+ if (_sessionsOut.fetchAndAdd(1) > _highWaterMark.load()) {
+ _highWaterMark.store(_sessionsOut.load());
+ }
+ if (!_sessions.empty()) {
+ stdx::lock_guard<SpinLock> lock(_cacheLock);
+ if (!_sessions.empty()) {
+ // Get the most recently used session so that if we discard sessions, we're
+ // discarding older ones
+ WiredTigerSession* cachedSession = _sessions.back();
+ _sessions.pop_back();
+ sessionsInCache.fetchAndSubtract(1);
return cachedSession;
}
}
// Outside of the cache partition lock, but on release will be put back on the cache
- return new WiredTigerSession(_conn, cachePartition, epoch);
+ return new WiredTigerSession(_conn, _epoch);
}
void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
@@ -207,27 +213,32 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
invariant(range == 0);
}
- const int cachePartition = session->_getCachePartition();
- bool returnedToCache = false;
-
- if (cachePartition >= 0) {
- stdx::unique_lock<SpinLock> cachePartitionLock(_cache[cachePartition].lock);
-
- invariant(session->_getEpoch() <= _cache[cachePartition].epoch);
+ _sessionsOut.fetchAndSubtract(1);
- if (session->_getEpoch() == _cache[cachePartition].epoch) {
- _cache[cachePartition].pool.push_back(session);
- returnedToCache = true;
- }
+ bool returnedToCache = false;
+ invariant(session->_getEpoch() <= _epoch);
+
+ // Only return sessions until we hit the maximum number of sessions we have ever seen demand
+ // for concurrently. We also want to immediately delete any session that is from a
+ // non-current epoch.
+ if (session->_getEpoch() == _epoch && sessionsInCache.load() < _highWaterMark.load()) {
+ returnedToCache = true;
+ stdx::lock_guard<SpinLock> lock(_cacheLock);
+ _sessions.push_back(session);
}
- // Do all cleanup outside of the cache partition spinlock.
- if (!returnedToCache) {
+ if (returnedToCache) {
+ sessionsInCache.fetchAndAdd(1);
+ } else {
delete session;
}
if (_engine && _engine->haveDropsQueued()) {
_engine->dropAllQueued();
}
+
+ if (_engine && _engine->haveDropsQueued()) {
+ _engine->dropAllQueued();
+ }
}
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index 9fd575232b9..cf3ce29bbe5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -31,9 +31,8 @@
#pragma once
-#include <map>
+#include <list>
#include <string>
-#include <vector>
#include <boost/thread/shared_mutex.hpp>
#include <wiredtiger.h>
@@ -46,6 +45,16 @@ namespace mongo {
class WiredTigerKVEngine;
+class WiredTigerCachedCursor {
+public:
+ WiredTigerCachedCursor(uint64_t id, uint64_t gen, WT_CURSOR* cursor)
+ : _id(id), _gen(gen), _cursor(cursor) {}
+
+ uint64_t _id; // Source ID, assigned to each URI
+ uint64_t _gen; // Generation, used to age out old cursors
+ WT_CURSOR* _cursor;
+};
+
/**
* This is a structure that caches 1 cursor for each uri.
* The idea is that there is a pool of these somewhere.
@@ -64,7 +73,7 @@ public:
* of -1 means that this value is not necessary since the session will not be
* cached.
*/
- WiredTigerSession(WT_CONNECTION* conn, int cachePartition = -1, int epoch = -1);
+ WiredTigerSession(WT_CONNECTION* conn, int epoch = -1);
~WiredTigerSession();
WT_SESSION* getSession() const {
@@ -72,6 +81,7 @@ public:
}
WT_CURSOR* getCursor(const std::string& uri, uint64_t id, bool forRecordStore);
+
void releaseCursor(uint64_t id, WT_CURSOR* cursor);
void closeAllCursors();
@@ -90,24 +100,19 @@ public:
private:
friend class WiredTigerSessionCache;
- typedef std::vector<WT_CURSOR*> Cursors;
- typedef std::map<uint64_t, Cursors> CursorMap;
-
+ // The cursor cache is a list of pairs that contain an ID and cursor
+ typedef std::list<WiredTigerCachedCursor> CursorCache;
// Used internally by WiredTigerSessionCache
int _getEpoch() const {
return _epoch;
}
- int _getCachePartition() const {
- return _cachePartition;
- }
-
- const int _cachePartition;
const int _epoch;
WT_SESSION* _session; // owned
- CursorMap _curmap; // owned
- int _cursorsOut;
+ CursorCache _cursors; // owned
+ uint64_t _cursorGen;
+ int _cursorsCached, _cursorsOut;
};
class WiredTigerSessionCache {
@@ -128,34 +133,26 @@ public:
}
private:
- typedef std::vector<WiredTigerSession*> SessionPool;
-
- enum { NumSessionCachePartitions = 64 };
-
- struct SessionCachePartition {
- SessionCachePartition() : epoch(0) {}
- ~SessionCachePartition() {
- invariant(pool.empty());
- }
-
- SpinLock lock;
- int epoch;
- SessionPool pool;
- };
-
-
WiredTigerKVEngine* _engine; // not owned, might be NULL
WT_CONNECTION* _conn; // not owned
- // Partitioned cache of WT sessions. The partition key is not important, but it is
- // important that sessions be returned to the same partition they were taken from in order
- // to have some form of balance between the partitions.
- SessionCachePartition _cache[NumSessionCachePartitions];
-
// Regular operations take it in shared mode. Shutdown sets the _shuttingDown flag and
// then takes it in exclusive mode. This ensures that all threads, which would return
// sessions to the cache would leak them.
boost::shared_mutex _shutdownLock;
AtomicUInt32 _shuttingDown; // Used as boolean - 0 = false, 1 = true
+
+ SpinLock _cacheLock;
+ typedef std::list<WiredTigerSession*> SessionCache;
+ SessionCache _sessions;
+
+ // Bumped when all open sessions need to be closed
+ int _epoch;
+
+ // How many sessions are in use concurrently
+ AtomicUInt32 _sessionsOut;
+
+ // The most sessions we have ever in use concurrently.
+ AtomicUInt32 _highWaterMark;
};
}