summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp31
1 files changed, 26 insertions, 5 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 6619ce01865..836d42b9e42 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -44,8 +44,13 @@
namespace mongo {
-WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch)
- : _epoch(epoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) {
+WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch)
+ : _epoch(epoch),
+ _cursorEpoch(cursorEpoch),
+ _session(NULL),
+ _cursorGen(0),
+ _cursorsCached(0),
+ _cursorsOut(0) {
invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
}
@@ -101,7 +106,7 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) {
}
}
-void WiredTigerSession::closeAllCursors() {
+void WiredTigerSession::closeAllCursors(uint64_t cursorEpoch) {
invariant(_session);
for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
WT_CURSOR* cursor = i->_cursor;
@@ -110,6 +115,7 @@ void WiredTigerSession::closeAllCursors() {
}
}
_cursors.clear();
+ _cursorEpoch = cursorEpoch;
}
namespace {
@@ -209,8 +215,18 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
_journalListener->onDurable(token);
}
+void WiredTigerSessionCache::closeAllCursors() {
+ // Increment the cursor epoch so that all cursors from this epoch are closed.
+ uint64_t cursorEpoch = _cursorEpoch.addAndFetch(1);
+
+ stdx::lock_guard<stdx::mutex> lock(_cacheLock);
+ for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) {
+ (*i)->closeAllCursors(cursorEpoch);
+ }
+}
+
void WiredTigerSessionCache::closeAll() {
- // Increment the epoch as we are now closing all sessions with this epoch
+ // Increment the epoch as we are now closing all sessions with this epoch.
SessionCache swap;
{
@@ -245,7 +261,7 @@ WiredTigerSession* WiredTigerSessionCache::getSession() {
}
// Outside of the cache partition lock, but on release will be put back on the cache
- return new WiredTigerSession(_conn, _epoch.load());
+ return new WiredTigerSession(_conn, _epoch.load(), _cursorEpoch.load());
}
void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
@@ -272,6 +288,11 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
invariant(range == 0);
}
+ // If the cursor epoch has moved on, close all cursors in the session.
+ uint64_t cursorEpoch = _cursorEpoch.load();
+ if (session->_getCursorEpoch() != cursorEpoch)
+ session->closeAllCursors(cursorEpoch);
+
bool returnedToCache = false;
uint64_t currentEpoch = _epoch.load();