diff options
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 6619ce01865..836d42b9e42 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -44,8 +44,13 @@ namespace mongo { -WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch) - : _epoch(epoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) { +WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch) + : _epoch(epoch), + _cursorEpoch(cursorEpoch), + _session(NULL), + _cursorGen(0), + _cursorsCached(0), + _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } @@ -101,7 +106,7 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { } } -void WiredTigerSession::closeAllCursors() { +void WiredTigerSession::closeAllCursors(uint64_t cursorEpoch) { invariant(_session); for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { WT_CURSOR* cursor = i->_cursor; @@ -110,6 +115,7 @@ void WiredTigerSession::closeAllCursors() { } } _cursors.clear(); + _cursorEpoch = cursorEpoch; } namespace { @@ -209,8 +215,18 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { _journalListener->onDurable(token); } +void WiredTigerSessionCache::closeAllCursors() { + // Increment the cursor epoch so that all cursors from this epoch are closed. + uint64_t cursorEpoch = _cursorEpoch.addAndFetch(1); + + stdx::lock_guard<stdx::mutex> lock(_cacheLock); + for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) { + (*i)->closeAllCursors(cursorEpoch); + } +} + void WiredTigerSessionCache::closeAll() { - // Increment the epoch as we are now closing all sessions with this epoch + // Increment the epoch as we are now closing all sessions with this epoch. SessionCache swap; { @@ -245,7 +261,7 @@ WiredTigerSession* WiredTigerSessionCache::getSession() { } // Outside of the cache partition lock, but on release will be put back on the cache - return new WiredTigerSession(_conn, _epoch.load()); + return new WiredTigerSession(_conn, _epoch.load(), _cursorEpoch.load()); } void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { @@ -272,6 +288,11 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { invariant(range == 0); } + // If the cursor epoch has moved on, close all cursors in the session. + uint64_t cursorEpoch = _cursorEpoch.load(); + if (session->_getCursorEpoch() != cursorEpoch) + session->closeAllCursors(cursorEpoch); + bool returnedToCache = false; uint64_t currentEpoch = _epoch.load(); |