// wiredtiger_session_cache.cpp /** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/base/error_codes.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch) : _epoch(epoch), _cursorEpoch(cursorEpoch), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, WiredTigerSessionCache* cache, uint64_t epoch, uint64_t cursorEpoch) : _epoch(epoch), _cursorEpoch(cursorEpoch), _cache(cache), _session(NULL), _cursorGen(0), _cursorsCached(0), _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } WiredTigerSession::~WiredTigerSession() { if (_session) { invariantWTOK(_session->close(_session, NULL)); } } WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id, bool forRecordStore) { // Find the most recently used cursor for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { if (i->_id == id) { WT_CURSOR* c = i->_cursor; _cursors.erase(i); _cursorsOut++; _cursorsCached--; return c; } } WT_CURSOR* c = NULL; int ret = _session->open_cursor( _session, uri.c_str(), NULL, forRecordStore ? "" : "overwrite=false", &c); if (ret != ENOENT) invariantWTOK(ret); if (c) _cursorsOut++; return c; } void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { invariant(_session); invariant(cursor); _cursorsOut--; invariantWTOK(cursor->reset(cursor)); // Cursors are pushed to the front of the list and removed from the back _cursors.push_front(WiredTigerCachedCursor(id, _cursorGen++, cursor)); _cursorsCached++; // "Old" is defined as not used in the last N**2 operations, if we have N cursors cached. // The reasoning here is to imagine a workload with N tables performing operations randomly // across all of them (i.e., each cursor has 1/N chance of used for each operation). We // would like to cache N cursors in that case, so any given cursor could go N**2 operations // in between use. while (_cursorGen - _cursors.back()._gen > 10000) { cursor = _cursors.back()._cursor; _cursors.pop_back(); _cursorsCached--; invariantWTOK(cursor->close(cursor)); } } void WiredTigerSession::closeAllCursors() { invariant(_session); for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { WT_CURSOR* cursor = i->_cursor; if (cursor) { invariantWTOK(cursor->close(cursor)); } } _cursors.clear(); _cursorEpoch = _cache->getCursorEpoch(); } namespace { AtomicUInt64 nextTableId(1); } // static uint64_t WiredTigerSession::genTableId() { return nextTableId.fetchAndAdd(1); } // ----------------------- WiredTigerSessionCache::WiredTigerSessionCache(WiredTigerKVEngine* engine) : _engine(engine), _conn(engine->getConnection()), _snapshotManager(_conn), _shuttingDown(0) {} WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn) : _engine(NULL), _conn(conn), _snapshotManager(_conn), _shuttingDown(0) {} WiredTigerSessionCache::~WiredTigerSessionCache() { shuttingDown(); } void WiredTigerSessionCache::shuttingDown() { uint32_t actual = _shuttingDown.load(); uint32_t expected; // Try to atomically set _shuttingDown flag, but just return if another thread was first. do { expected = actual; actual = _shuttingDown.compareAndSwap(expected, expected | kShuttingDownMask); if (actual & kShuttingDownMask) return; } while (actual != expected); // Spin as long as there are threads in releaseSession while (_shuttingDown.load() != kShuttingDownMask) { sleepmillis(1); } closeAll(); _snapshotManager.shutdown(); } void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { // For inMemory storage engines, the data is "as durable as it's going to get". // That is, a restart is equivalent to a complete node failure. if (isEphemeral()) { return; } const int shuttingDown = _shuttingDown.fetchAndAdd(1); ON_BLOCK_EXIT([this] { _shuttingDown.fetchAndSubtract(1); }); uassert(ErrorCodes::ShutdownInProgress, "Cannot wait for durability because a shutdown is in progress", !(shuttingDown & kShuttingDownMask)); // When forcing a checkpoint with journaling enabled, don't synchronize with other // waiters, as a log flush is much cheaper than a full checkpoint. if (forceCheckpoint && _engine->isDurable()) { UniqueWiredTigerSession session = getSession(); WT_SESSION* s = session->getSession(); { stdx::unique_lock lk(_journalListenerMutex); JournalListener::Token token = _journalListener->getToken(); invariantWTOK(s->checkpoint(s, NULL)); _journalListener->onDurable(token); } LOG(4) << "created checkpoint (forced)"; return; } uint32_t start = _lastSyncTime.load(); // Do the remainder in a critical section that ensures only a single thread at a time // will attempt to synchronize. stdx::unique_lock lk(_lastSyncMutex); uint32_t current = _lastSyncTime.loadRelaxed(); // synchronized with writes through mutex if (current != start) { // Someone else synced already since we read lastSyncTime, so we're done! return; } _lastSyncTime.store(current + 1); // Nobody has synched yet, so we have to sync ourselves. auto session = getSession(); WT_SESSION* s = session->getSession(); // This gets the token (OpTime) from the last write, before flushing (either the journal, or a // checkpoint), and then reports that token (OpTime) as a durable write. stdx::unique_lock jlk(_journalListenerMutex); JournalListener::Token token = _journalListener->getToken(); // Use the journal when available, or a checkpoint otherwise. if (_engine && _engine->isDurable()) { invariantWTOK(s->log_flush(s, "sync=on")); LOG(4) << "flushed journal"; } else { invariantWTOK(s->checkpoint(s, NULL)); LOG(4) << "created checkpoint"; } _journalListener->onDurable(token); } void WiredTigerSessionCache::closeAllCursors() { // Increment the cursor epoch so that all cursors from this epoch are closed. _cursorEpoch.fetchAndAdd(1); stdx::lock_guard lock(_cacheLock); for (SessionCache::iterator i = _sessions.begin(); i != _sessions.end(); i++) { (*i)->closeAllCursors(); } } void WiredTigerSessionCache::closeAll() { // Increment the epoch as we are now closing all sessions with this epoch. SessionCache swap; { stdx::lock_guard lock(_cacheLock); _epoch.fetchAndAdd(1); _sessions.swap(swap); } for (SessionCache::iterator i = swap.begin(); i != swap.end(); i++) { delete (*i); } } bool WiredTigerSessionCache::isEphemeral() { return _engine && _engine->isEphemeral(); } UniqueWiredTigerSession WiredTigerSessionCache::getSession() { // We should never be able to get here after _shuttingDown is set, because no new // operations should be allowed to start. invariant(!(_shuttingDown.loadRelaxed() & kShuttingDownMask)); { stdx::lock_guard lock(_cacheLock); if (!_sessions.empty()) { // Get the most recently used session so that if we discard sessions, we're // discarding older ones WiredTigerSession* cachedSession = _sessions.back(); _sessions.pop_back(); return UniqueWiredTigerSession(cachedSession); } } // Outside of the cache partition lock, but on release will be put back on the cache return UniqueWiredTigerSession( new WiredTigerSession(_conn, this, _epoch.load(), _cursorEpoch.load())); } void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { invariant(session); invariant(session->cursorsOut() == 0); const int shuttingDown = _shuttingDown.fetchAndAdd(1); ON_BLOCK_EXIT([this] { _shuttingDown.fetchAndSubtract(1); }); if (shuttingDown & kShuttingDownMask) { // There is a race condition with clean shutdown, where the storage engine is ripped from // underneath OperationContexts, which are not "active" (i.e., do not have any locks), but // are just about to delete the recovery unit. See SERVER-16031 for more information. Since // shutting down the WT_CONNECTION will close all WT_SESSIONS, we shouldn't also try to // directly close this session. session->_session = nullptr; // Prevents calling _session->close() in destructor. delete session; return; } { WT_SESSION* ss = session->getSession(); uint64_t range; // This checks that we are only caching idle sessions and not something which might hold // locks or otherwise prevent truncation. invariantWTOK(ss->transaction_pinned_range(ss, &range)); invariant(range == 0); // Release resources in the session we're about to cache. invariantWTOK(ss->reset(ss)); } // If the cursor epoch has moved on, close all cursors in the session. uint64_t cursorEpoch = _cursorEpoch.load(); if (session->_getCursorEpoch() != cursorEpoch) session->closeAllCursors(); bool returnedToCache = false; uint64_t currentEpoch = _epoch.load(); if (session->_getEpoch() == currentEpoch) { // check outside of lock to reduce contention stdx::lock_guard lock(_cacheLock); if (session->_getEpoch() == _epoch.load()) { // recheck inside the lock for correctness returnedToCache = true; _sessions.push_back(session); } } else invariant(session->_getEpoch() < currentEpoch); if (!returnedToCache) delete session; if (_engine && _engine->haveDropsQueued()) _engine->dropSomeQueuedIdents(); } void WiredTigerSessionCache::setJournalListener(JournalListener* jl) { stdx::unique_lock lk(_journalListenerMutex); _journalListener = jl; } void WiredTigerSessionCache::WiredTigerSessionDeleter::operator()( WiredTigerSession* session) const { session->_cache->releaseSession(session); } } // namespace mongo