diff options
4 files changed, 61 insertions, 15 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index ae460ab7c36..2cc3e530b3b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -563,7 +563,7 @@ bool WiredTigerKVEngine::_drop(StringData ident) { stdx::lock_guard<stdx::mutex> lk(_identToDropMutex); _identToDrop.push(uri); } - _sessionCache->closeAll(); + _sessionCache->closeAllCursors(); return false; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 4040630d8b2..3a5428ca94f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -46,13 +46,22 @@ namespace mongo { -WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch) - : _epoch(epoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) { +WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch) + : _epoch(epoch), + _cursorEpoch(cursorEpoch), + _session(NULL), + _cursorGen(0), + _cursorsCached(0), + _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } -WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, WiredTigerSessionCache* cache, int epoch) +WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, + WiredTigerSessionCache* cache, + uint64_t epoch, + uint64_t cursorEpoch) : _epoch(epoch), + _cursorEpoch(cursorEpoch), _cache(cache), _session(NULL), _cursorGen(0), @@ -122,6 +131,7 @@ void WiredTigerSession::closeAllCursors() { } } _cursors.clear(); + _cursorEpoch = _cache->getCursorEpoch(); } namespace { @@ -219,8 +229,18 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { _journalListener->onDurable(token); } +void WiredTigerSessionCache::closeAllCursors() { + // Increment the cursor epoch so that all cursors from this epoch are closed. + _cursorEpoch.fetchAndAdd(1); + + stdx::lock_guard<stdx::mutex> lock(_cacheLock); + for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) { + (*i)->closeAllCursors(); + } +} + void WiredTigerSessionCache::closeAll() { - // Increment the epoch as we are now closing all sessions with this epoch + // Increment the epoch as we are now closing all sessions with this epoch. SessionCache swap; { @@ -255,7 +275,8 @@ UniqueWiredTigerSession WiredTigerSessionCache::getSession() { } // Outside of the cache partition lock, but on release will be put back on the cache - return UniqueWiredTigerSession(new WiredTigerSession(_conn, this, _epoch.load())); + return UniqueWiredTigerSession( + new WiredTigerSession(_conn, this, _epoch.load(), _cursorEpoch.load())); } void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { @@ -282,6 +303,11 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { invariant(range == 0); } + // If the cursor epoch has moved on, close all cursors in the session. + uint64_t cursorEpoch = _cursorEpoch.load(); + if (session->_getCursorEpoch() != cursorEpoch) + session->closeAllCursors(); + bool returnedToCache = false; uint64_t currentEpoch = _epoch.load(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index 9ec468d9ffc..eb3a116eb30 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -69,22 +69,23 @@ public: * Creates a new WT session on the specified connection. * * @param conn WT connection - * @param epoch In which session cache cleanup epoch was this session instantiated. Value - * of -1 means that this value is not necessary since the session will not be - * cached. + * @param epoch In which session cache cleanup epoch was this session instantiated. + * @param cursorEpoch In which cursor cache cleanup epoch was this session instantiated. */ - WiredTigerSession(WT_CONNECTION* conn, int epoch = -1); + WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch = 0, uint64_t cursorEpoch = 0); /** * Creates a new WT session on the specified connection. * * @param conn WT connection * @param cache The WiredTigerSessionCache that owns this session. - * @param epoch In which session cache cleanup epoch was this session instantiated. Value - * of -1 means that this value is not necessary since the session will not be - * cached. + * @param epoch In which session cache cleanup epoch was this session instantiated. + * @param cursorEpoch In which cursor cache cleanup epoch was this session instantiated. */ - WiredTigerSession(WT_CONNECTION* conn, WiredTigerSessionCache* cache, int epoch = -1); + WiredTigerSession(WT_CONNECTION* conn, + WiredTigerSessionCache* cache, + uint64_t epoch = 0, + uint64_t cursorEpoch = 0); ~WiredTigerSession(); @@ -120,7 +121,13 @@ private: return _epoch; } + // Used internally by WiredTigerSessionCache + uint64_t _getCursorEpoch() const { + return _cursorEpoch; + } + const uint64_t _epoch; + uint64_t _cursorEpoch; WiredTigerSessionCache* _cache; // not owned WT_SESSION* _session; // owned CursorCache _cursors; // owned @@ -160,6 +167,12 @@ public: void closeAll(); /** + * Closes all cached cursors and ensures that previously opened cursors will be closed on + * release. + */ + void closeAllCursors(); + + /** * Transitions the cache to shutting down mode. Any already released sessions are freed and * any sessions released subsequently are leaked. Must be called while holding the global * lock in exclusive mode to avoid races with getSession. @@ -187,6 +200,10 @@ public: void setJournalListener(JournalListener* jl); + uint64_t getCursorEpoch() const { + return _cursorEpoch.load(); + } + private: WiredTigerKVEngine* _engine; // not owned, might be NULL WT_CONNECTION* _conn; // not owned @@ -205,6 +222,9 @@ private: // Bumped when all open sessions need to be closed AtomicUInt64 _epoch; // atomic so we can check it outside of the lock + // Bumped when all open cursors need to be closed + AtomicUInt64 _cursorEpoch; // atomic so we can check it outside of the lock + // Counter and critical section mutex for waitUntilDurable AtomicUInt32 _lastSyncTime; stdx::mutex _lastSyncMutex; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp index 9f8b57b1abe..ed3c2aecf8d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp @@ -422,7 +422,7 @@ int WiredTigerUtil::verifyTable(OperationContext* txn, // Try to close as much as possible to avoid EBUSY errors. WiredTigerRecoveryUnit::get(txn)->getSession(txn)->closeAllCursors(); WiredTigerSessionCache* sessionCache = WiredTigerRecoveryUnit::get(txn)->getSessionCache(); - sessionCache->closeAll(); + sessionCache->closeAllCursors(); // Open a new session with custom error handlers. WT_CONNECTION* conn = WiredTigerRecoveryUnit::get(txn)->getSessionCache()->conn(); |