summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDon Anderson <dda@mongodb.com>2017-06-16 07:04:17 -0400
committerDon Anderson <dda@mongodb.com>2017-06-21 11:46:00 -0400
commit5e528ece0ead6ca020ec0be14aa0f080a5265c92 (patch)
tree4572d529f7ff9f229c844115cc03818c6b114ae1
parent8a5147e6f0d31702f97903797a4c8e5afcb9e68b (diff)
downloadmongo-5e528ece0ead6ca020ec0be14aa0f080a5265c92.tar.gz
SERVER-27347 For table drop, only close relevant URIs.
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp44
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp36
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h11
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp4
6 files changed, 80 insertions, 23 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index 50729e013af..d819776ae38 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -487,7 +487,7 @@ protected:
// Open cursors can cause bulk open_cursor to fail with EBUSY.
// TODO any other cases that could cause EBUSY?
WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn);
- outerSession->closeAllCursors();
+ outerSession->closeAllCursors(idx->uri());
// Not using cursor cache since we need to set "bulk".
WT_CURSOR* cursor;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index c50a72d4ea4..f48fff37b0b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -367,11 +367,12 @@ int64_t WiredTigerKVEngine::getIdentSize(OperationContext* opCtx, StringData ide
Status WiredTigerKVEngine::repairIdent(OperationContext* opCtx, StringData ident) {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
- session->closeAllCursors();
+ string uri = _uri(ident);
+ session->closeAllCursors(uri);
+ _sessionCache->closeAllCursors(uri);
if (isEphemeral()) {
return Status::OK();
}
- string uri = _uri(ident);
return _salvageIfNeeded(uri.c_str());
}
@@ -557,6 +558,8 @@ Status WiredTigerKVEngine::dropIdent(OperationContext* opCtx, StringData ident)
bool WiredTigerKVEngine::_drop(StringData ident) {
string uri = _uri(ident);
+ _sessionCache->closeAllCursors(uri);
+
WiredTigerSession session(_conn);
int ret = session.getSession()->drop(
@@ -572,9 +575,9 @@ bool WiredTigerKVEngine::_drop(StringData ident) {
// this is expected, queue it up
{
stdx::lock_guard<stdx::mutex> lk(_identToDropMutex);
- _identToDrop.push(uri);
+ _identToDrop.push_front(uri);
}
- _sessionCache->closeAllCursors();
+ _sessionCache->closeCursorsForQueuedDrops();
return false;
}
@@ -582,6 +585,28 @@ bool WiredTigerKVEngine::_drop(StringData ident) {
return false;
}
+std::list<WiredTigerCachedCursor> WiredTigerKVEngine::filterCursorsWithQueuedDrops(
+ std::list<WiredTigerCachedCursor>* cache) {
+ std::list<WiredTigerCachedCursor> toDrop;
+
+ stdx::lock_guard<stdx::mutex> lk(_identToDropMutex);
+ if (_identToDrop.empty())
+ return toDrop;
+
+ for (auto i = cache->begin(); i != cache->end();) {
+ if (!i->_cursor ||
+ std::find(_identToDrop.begin(), _identToDrop.end(), std::string(i->_cursor->uri)) ==
+ _identToDrop.end()) {
+ ++i;
+ continue;
+ }
+ toDrop.push_back(*i);
+ i = cache->erase(i);
+ }
+
+ return toDrop;
+}
+
bool WiredTigerKVEngine::haveDropsQueued() const {
Date_t now = Date_t::now();
Milliseconds delta = now - _previousCheckedDropsQueued;
@@ -592,13 +617,14 @@ bool WiredTigerKVEngine::haveDropsQueued() const {
}
// We only want to check the queue max once per second or we'll thrash
- // This is done in haveDropsQueued, not dropSomeQueuedIdents so we skip the mutex
if (delta < Milliseconds(1000))
return false;
_previousCheckedDropsQueued = now;
- stdx::lock_guard<stdx::mutex> lk(_identToDropMutex);
- return !_identToDrop.empty();
+
+ // Don't wait for the mutex: if we can't get it, report that no drops are queued.
+ stdx::unique_lock<stdx::mutex> lk(_identToDropMutex, stdx::defer_lock);
+ return lk.try_lock() && !_identToDrop.empty();
}
void WiredTigerKVEngine::dropSomeQueuedIdents() {
@@ -624,7 +650,7 @@ void WiredTigerKVEngine::dropSomeQueuedIdents() {
if (_identToDrop.empty())
break;
uri = _identToDrop.front();
- _identToDrop.pop();
+ _identToDrop.pop_front();
}
int ret = session.getSession()->drop(
session.getSession(), uri.c_str(), "force,checkpoint_wait=false");
@@ -632,7 +658,7 @@ void WiredTigerKVEngine::dropSomeQueuedIdents() {
if (ret == EBUSY) {
stdx::lock_guard<stdx::mutex> lk(_identToDropMutex);
- _identToDrop.push(uri);
+ _identToDrop.push_back(uri);
} else {
invariantWTOK(ret);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 4975f5f4ebe..e58b248b994 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -31,8 +31,8 @@
#pragma once
+#include <list>
#include <memory>
-#include <queue>
#include <string>
#include <wiredtiger.h>
@@ -138,6 +138,8 @@ public:
return _conn;
}
void dropSomeQueuedIdents();
+ std::list<WiredTigerCachedCursor> filterCursorsWithQueuedDrops(
+ std::list<WiredTigerCachedCursor>* cache);
bool haveDropsQueued() const;
void syncSizeInfo(bool sync) const;
@@ -183,7 +185,7 @@ private:
mutable stdx::mutex _dropAllQueuesMutex;
mutable stdx::mutex _identToDropMutex;
- std::queue<std::string> _identToDrop;
+ std::list<std::string> _identToDrop;
mutable Date_t _previousCheckedDropsQueued;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 97244d4fd18..4fccd06ed6d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -122,16 +122,31 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) {
}
}
-void WiredTigerSession::closeAllCursors() {
+void WiredTigerSession::closeAllCursors(const std::string& uri) {
invariant(_session);
- for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
+
+ for (auto i = _cursors.begin(); i != _cursors.end();) {
+ WT_CURSOR* cursor = i->_cursor;
+ if (cursor && uri == cursor->uri) {
+ invariantWTOK(cursor->close(cursor));
+ i = _cursors.erase(i);
+ } else
+ ++i;
+ }
+}
+
+void WiredTigerSession::closeCursorsForQueuedDrops(WiredTigerKVEngine* engine) {
+ invariant(_session);
+
+ _cursorEpoch = _cache->getCursorEpoch();
+ auto toDrop = engine->filterCursorsWithQueuedDrops(&_cursors);
+
+ for (auto i = toDrop.begin(); i != toDrop.end(); i++) {
WT_CURSOR* cursor = i->_cursor;
if (cursor) {
invariantWTOK(cursor->close(cursor));
}
}
- _cursors.clear();
- _cursorEpoch = _cache->getCursorEpoch();
}
namespace {
@@ -235,13 +250,20 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
_journalListener->onDurable(token);
}
-void WiredTigerSessionCache::closeAllCursors() {
+void WiredTigerSessionCache::closeAllCursors(const std::string& uri) {
+ stdx::lock_guard<stdx::mutex> lock(_cacheLock);
+ for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) {
+ (*i)->closeAllCursors(uri);
+ }
+}
+
+void WiredTigerSessionCache::closeCursorsForQueuedDrops() {
// Increment the cursor epoch so that all cursors from this epoch are closed.
_cursorEpoch.fetchAndAdd(1);
stdx::lock_guard<stdx::mutex> lock(_cacheLock);
for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) {
- (*i)->closeAllCursors();
+ (*i)->closeCursorsForQueuedDrops(_engine);
}
}
@@ -318,7 +340,7 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
// If the cursor epoch has moved on, close all cursors in the session.
uint64_t cursorEpoch = _cursorEpoch.load();
if (session->_getCursorEpoch() != cursorEpoch)
- session->closeAllCursors();
+ session->closeCursorsForQueuedDrops(_engine);
bool returnedToCache = false;
uint64_t currentEpoch = _epoch.load();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index eb3a116eb30..e8fa9811796 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -97,7 +97,9 @@ public:
void releaseCursor(uint64_t id, WT_CURSOR* cursor);
- void closeAllCursors();
+ void closeCursorsForQueuedDrops(WiredTigerKVEngine* engine);
+
+ void closeAllCursors(const std::string& uri);
int cursorsOut() const {
return _cursorsOut;
@@ -167,10 +169,15 @@ public:
void closeAll();
/**
+ * Closes cached cursors for tables that are queued to be dropped.
+ */
+ void closeCursorsForQueuedDrops();
+
+ /**
* Closes all cached cursors and ensures that previously opened cursors will be closed on
* release.
*/
- void closeAllCursors();
+ void closeAllCursors(const std::string& uri);
/**
* Transitions the cache to shutting down mode. Any already released sessions are freed and
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp
index 0f784c367ab..f00b4a33804 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp
@@ -425,9 +425,9 @@ int WiredTigerUtil::verifyTable(OperationContext* txn,
ErrorAccumulator eventHandler(errors);
// Try to close as much as possible to avoid EBUSY errors.
- WiredTigerRecoveryUnit::get(txn)->getSession(txn)->closeAllCursors();
+ WiredTigerRecoveryUnit::get(txn)->getSession(txn)->closeAllCursors(uri);
WiredTigerSessionCache* sessionCache = WiredTigerRecoveryUnit::get(txn)->getSessionCache();
- sessionCache->closeAllCursors();
+ sessionCache->closeAllCursors(uri);
// Open a new session with custom error handlers.
WT_CONNECTION* conn = WiredTigerRecoveryUnit::get(txn)->getSessionCache()->conn();