diff options
author | Don Anderson <dda@mongodb.com> | 2017-06-16 07:04:17 -0400 |
---|---|---|
committer | Don Anderson <dda@mongodb.com> | 2017-06-21 11:46:00 -0400 |
commit | 5e528ece0ead6ca020ec0be14aa0f080a5265c92 (patch) | |
tree | 4572d529f7ff9f229c844115cc03818c6b114ae1 | |
parent | 8a5147e6f0d31702f97903797a4c8e5afcb9e68b (diff) | |
download | mongo-5e528ece0ead6ca020ec0be14aa0f080a5265c92.tar.gz |
SERVER-27347 For table drop, only close relevant URIs.
6 files changed, 80 insertions, 23 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 50729e013af..d819776ae38 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -487,7 +487,7 @@ protected: // Open cursors can cause bulk open_cursor to fail with EBUSY. // TODO any other cases that could cause EBUSY? WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn); - outerSession->closeAllCursors(); + outerSession->closeAllCursors(idx->uri()); // Not using cursor cache since we need to set "bulk". WT_CURSOR* cursor; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index c50a72d4ea4..f48fff37b0b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -367,11 +367,12 @@ int64_t WiredTigerKVEngine::getIdentSize(OperationContext* opCtx, StringData ide Status WiredTigerKVEngine::repairIdent(OperationContext* opCtx, StringData ident) { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); - session->closeAllCursors(); + string uri = _uri(ident); + session->closeAllCursors(uri); + _sessionCache->closeAllCursors(uri); if (isEphemeral()) { return Status::OK(); } - string uri = _uri(ident); return _salvageIfNeeded(uri.c_str()); } @@ -557,6 +558,8 @@ Status WiredTigerKVEngine::dropIdent(OperationContext* opCtx, StringData ident) bool WiredTigerKVEngine::_drop(StringData ident) { string uri = _uri(ident); + _sessionCache->closeAllCursors(uri); + WiredTigerSession session(_conn); int ret = session.getSession()->drop( @@ -572,9 +575,9 @@ bool WiredTigerKVEngine::_drop(StringData ident) { // this is expected, queue it up { stdx::lock_guard<stdx::mutex> lk(_identToDropMutex); - _identToDrop.push(uri); + _identToDrop.push_front(uri); } - _sessionCache->closeAllCursors(); + _sessionCache->closeCursorsForQueuedDrops(); return false; } @@ -582,6 +585,28 @@ bool WiredTigerKVEngine::_drop(StringData ident) { return false; } +std::list<WiredTigerCachedCursor> WiredTigerKVEngine::filterCursorsWithQueuedDrops( + std::list<WiredTigerCachedCursor>* cache) { + std::list<WiredTigerCachedCursor> toDrop; + + stdx::lock_guard<stdx::mutex> lk(_identToDropMutex); + if (_identToDrop.empty()) + return toDrop; + + for (auto i = cache->begin(); i != cache->end();) { + if (!i->_cursor || + std::find(_identToDrop.begin(), _identToDrop.end(), std::string(i->_cursor->uri)) == + _identToDrop.end()) { + ++i; + continue; + } + toDrop.push_back(*i); + i = cache->erase(i); + } + + return toDrop; +} + bool WiredTigerKVEngine::haveDropsQueued() const { Date_t now = Date_t::now(); Milliseconds delta = now - _previousCheckedDropsQueued; @@ -592,13 +617,14 @@ bool WiredTigerKVEngine::haveDropsQueued() const { } // We only want to check the queue max once per second or we'll thrash - // This is done in haveDropsQueued, not dropSomeQueuedIdents so we skip the mutex if (delta < Milliseconds(1000)) return false; _previousCheckedDropsQueued = now; - stdx::lock_guard<stdx::mutex> lk(_identToDropMutex); - return !_identToDrop.empty(); + + // Don't wait for the mutex: if we can't get it, report that no drops are queued. + stdx::unique_lock<stdx::mutex> lk(_identToDropMutex, stdx::defer_lock); + return lk.try_lock() && !_identToDrop.empty(); } void WiredTigerKVEngine::dropSomeQueuedIdents() { @@ -624,7 +650,7 @@ void WiredTigerKVEngine::dropSomeQueuedIdents() { if (_identToDrop.empty()) break; uri = _identToDrop.front(); - _identToDrop.pop(); + _identToDrop.pop_front(); } int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force,checkpoint_wait=false"); @@ -632,7 +658,7 @@ void WiredTigerKVEngine::dropSomeQueuedIdents() { if (ret == EBUSY) { stdx::lock_guard<stdx::mutex> lk(_identToDropMutex); - _identToDrop.push(uri); + _identToDrop.push_back(uri); } else { invariantWTOK(ret); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 4975f5f4ebe..e58b248b994 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -31,8 +31,8 @@ #pragma once +#include <list> #include <memory> -#include <queue> #include <string> #include <wiredtiger.h> @@ -138,6 +138,8 @@ public: return _conn; } void dropSomeQueuedIdents(); + std::list<WiredTigerCachedCursor> filterCursorsWithQueuedDrops( + std::list<WiredTigerCachedCursor>* cache); bool haveDropsQueued() const; void syncSizeInfo(bool sync) const; @@ -183,7 +185,7 @@ private: mutable stdx::mutex _dropAllQueuesMutex; mutable stdx::mutex _identToDropMutex; - std::queue<std::string> _identToDrop; + std::list<std::string> _identToDrop; mutable Date_t _previousCheckedDropsQueued; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 97244d4fd18..4fccd06ed6d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -122,16 +122,31 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { } } -void WiredTigerSession::closeAllCursors() { +void WiredTigerSession::closeAllCursors(const std::string& uri) { invariant(_session); - for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + + for (auto i = _cursors.begin(); i != _cursors.end();) { + WT_CURSOR* cursor = i->_cursor; + if (cursor && uri == cursor->uri) { + invariantWTOK(cursor->close(cursor)); + i = _cursors.erase(i); + } else + ++i; + } +} + +void WiredTigerSession::closeCursorsForQueuedDrops(WiredTigerKVEngine* engine) { + invariant(_session); + + _cursorEpoch = _cache->getCursorEpoch(); + auto toDrop = engine->filterCursorsWithQueuedDrops(&_cursors); + + for (auto i = toDrop.begin(); i != toDrop.end(); i++) { WT_CURSOR* cursor = i->_cursor; if (cursor) { invariantWTOK(cursor->close(cursor)); } } - _cursors.clear(); - _cursorEpoch = _cache->getCursorEpoch(); } namespace { @@ -235,13 +250,20 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { _journalListener->onDurable(token); } -void WiredTigerSessionCache::closeAllCursors() { +void WiredTigerSessionCache::closeAllCursors(const std::string& uri) { + stdx::lock_guard<stdx::mutex> lock(_cacheLock); + for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) { + (*i)->closeAllCursors(uri); + } +} + +void WiredTigerSessionCache::closeCursorsForQueuedDrops() { // Increment the cursor epoch so that all cursors from this epoch are closed. _cursorEpoch.fetchAndAdd(1); stdx::lock_guard<stdx::mutex> lock(_cacheLock); for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) { - (*i)->closeAllCursors(); + (*i)->closeCursorsForQueuedDrops(_engine); } } @@ -318,7 +340,7 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { // If the cursor epoch has moved on, close all cursors in the session. uint64_t cursorEpoch = _cursorEpoch.load(); if (session->_getCursorEpoch() != cursorEpoch) - session->closeAllCursors(); + session->closeCursorsForQueuedDrops(_engine); bool returnedToCache = false; uint64_t currentEpoch = _epoch.load(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index eb3a116eb30..e8fa9811796 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -97,7 +97,9 @@ public: void releaseCursor(uint64_t id, WT_CURSOR* cursor); - void closeAllCursors(); + void closeCursorsForQueuedDrops(WiredTigerKVEngine* engine); + + void closeAllCursors(const std::string& uri); int cursorsOut() const { return _cursorsOut; @@ -167,10 +169,15 @@ public: void closeAll(); /** + * Closes cached cursors for tables that are queued to be dropped. + */ + void closeCursorsForQueuedDrops(); + + /** * Closes all cached cursors and ensures that previously opened cursors will be closed on * release. */ - void closeAllCursors(); + void closeAllCursors(const std::string& uri); /** * Transitions the cache to shutting down mode. Any already released sessions are freed and diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp index 0f784c367ab..f00b4a33804 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp @@ -425,9 +425,9 @@ int WiredTigerUtil::verifyTable(OperationContext* txn, ErrorAccumulator eventHandler(errors); // Try to close as much as possible to avoid EBUSY errors. - WiredTigerRecoveryUnit::get(txn)->getSession(txn)->closeAllCursors(); + WiredTigerRecoveryUnit::get(txn)->getSession(txn)->closeAllCursors(uri); WiredTigerSessionCache* sessionCache = WiredTigerRecoveryUnit::get(txn)->getSessionCache(); - sessionCache->closeAllCursors(); + sessionCache->closeAllCursors(uri); // Open a new session with custom error handlers. WT_CONNECTION* conn = WiredTigerRecoveryUnit::get(txn)->getSessionCache()->conn(); |