diff options
9 files changed, 288 insertions, 13 deletions
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index ee16fbc7ab0..cd313193493 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -266,3 +266,12 @@ if wiredtiger: 'storage_wiredtiger_mock', ], ) + + wtEnv.CppUnitTest( + target='storage_wiredtiger_session_cache_test', + source=['wiredtiger_session_cache_test.cpp', + ], + LIBDEPS=[ + 'storage_wiredtiger_mock', + ], + ) diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index ef00dd7a21b..7f4df8bc885 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -99,6 +99,19 @@ namespace mongo { +// Close idle wiredtiger sessions in the session cache after this many seconds. +// The default is 5 mins. Have a shorter default in the debug build to aid testing. +MONGO_EXPORT_SERVER_PARAMETER(wiredTigerSessionCloseIdleTimeSecs, + std::int32_t, + kDebugBuild ? 5 : 300) + ->withValidator([](const auto& potentialNewValue) { + if (potentialNewValue < 0) { + return Status(ErrorCodes::BadValue, + "wiredTigerSessionCloseIdleTimeSecs must be greater than or equal to 0s"); + } + return Status::OK(); + }); + bool WiredTigerFileVersion::shouldDowngrade(bool readOnly, bool repairMode, bool hasRecoveryTimestamp) { @@ -169,6 +182,54 @@ namespace dps = ::mongo::dotted_path_support; const int WiredTigerKVEngine::kDefaultJournalDelayMillis = 100; +class WiredTigerKVEngine::WiredTigerSessionSweeper : public BackgroundJob { +public: + explicit WiredTigerSessionSweeper(WiredTigerSessionCache* sessionCache) + : BackgroundJob(false /* deleteSelf */), _sessionCache(sessionCache) {} + + virtual string name() const { + return "WTIdleSessionSweeper"; + } + + virtual void run() { + ThreadClient tc(name(), getGlobalServiceContext()); + LOG(1) << "starting " << name() << " thread"; + + while (!_shuttingDown.load()) { + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + MONGO_IDLE_THREAD_BLOCK; + // Check every 10 seconds or sooner in the debug builds + _condvar.wait_for(lock, stdx::chrono::seconds(kDebugBuild ? 1 : 10)); + } + + _sessionCache->closeExpiredIdleSessions(wiredTigerSessionCloseIdleTimeSecs.load() * + 1000); + } + LOG(1) << "stopping " << name() << " thread"; + } + + void shutdown() { + _shuttingDown.store(true); + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + // Wake up the session sweeper thread early, we do not want the shutdown + // to wait for us too long. + _condvar.notify_one(); + } + wait(); + } + +private: + WiredTigerSessionCache* _sessionCache; + AtomicWord<bool> _shuttingDown{false}; + + stdx::mutex _mutex; // protects _condvar + // The session sweeper thread idles on this condition variable for a particular time duration + // between cleaning up expired sessions. It can be triggered early to expediate shutdown. + stdx::condition_variable _condvar; +}; + class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob { public: explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache) @@ -369,7 +430,7 @@ private: WiredTigerSessionCache* _sessionCache; stdx::mutex _mutex; // protects _condvar - // The checkpoint thead idles on this condition variable for a particular time duration between + // The checkpoint thread idles on this condition variable for a particular time duration between // taking checkpoints. It can be triggered early to expediate immediate checkpointing. stdx::condition_variable _condvar; @@ -585,6 +646,9 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, _sessionCache.reset(new WiredTigerSessionCache(this)); + _sessionSweeper = stdx::make_unique<WiredTigerSessionSweeper>(_sessionCache.get()); + _sessionSweeper->go(); + if (_durable && !_ephemeral) { _journalFlusher = stdx::make_unique<WiredTigerJournalFlusher>(_sessionCache.get()); _journalFlusher->go(); @@ -716,6 +780,11 @@ void WiredTigerKVEngine::cleanShutdown() { } // these must be the last things we do before _conn->close(); + if (_sessionSweeper) { + log() << "Shutting down session sweeper thread"; + _sessionSweeper->shutdown(); + log() << "Finished shutting down session sweeper thread"; + } if (_journalFlusher) { log() << "Shutting down journal flusher thread"; _journalFlusher->shutdown(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 30f1ec78d91..83d5c0191d0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -347,7 +347,12 @@ public: */ Timestamp getPinnedOplog() const; + ClockSource* getClockSource() const { + return _clockSource; + } + private: + class WiredTigerSessionSweeper; class WiredTigerJournalFlusher; class WiredTigerCheckpointThread; @@ -432,6 +437,7 @@ private: // timestamp. const bool _keepDataHistory = true; + std::unique_ptr<WiredTigerSessionSweeper> _sessionSweeper; std::unique_ptr<WiredTigerJournalFlusher> _journalFlusher; // Depends on _sizeStorer std::unique_ptr<WiredTigerCheckpointThread> _checkpointThread; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp index d6e2af60630..512cadb6748 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp @@ -48,6 +48,7 @@ #include "mongo/stdx/memory.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/system_clock_source.h" namespace mongo { namespace { @@ -61,7 +62,8 @@ public: int ret = wiredtiger_open(_dbpath.path().c_str(), NULL, config, &_conn); invariantWTOK(ret); - _sessionCache = new WiredTigerSessionCache(_conn); + _fastClockSource = stdx::make_unique<SystemClockSource>(); + _sessionCache = new WiredTigerSessionCache(_conn, _fastClockSource.get()); } ~MyHarnessHelper() final { @@ -110,6 +112,7 @@ public: private: unittest::TempDir _dbpath; + std::unique_ptr<ClockSource> _fastClockSource; WT_CONNECTION* _conn; WiredTigerSessionCache* _sessionCache; WiredTigerOplogManager _oplogManager; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 9c0fe8078ba..3d4038aab1d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -88,7 +88,12 @@ ExportedServerParameter<std::int32_t, ServerParameterType::kStartupAndRuntime> &kWiredTigerCursorCacheSize); WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch) - : _epoch(epoch), _cursorEpoch(cursorEpoch), _session(NULL), _cursorGen(0), _cursorsOut(0) { + : _epoch(epoch), + _cursorEpoch(cursorEpoch), + _session(NULL), + _cursorGen(0), + _cursorsOut(0), + _idleExpireTime(Date_t::min()) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } @@ -101,7 +106,8 @@ WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, _cache(cache), _session(NULL), _cursorGen(0), - _cursorsOut(0) { + _cursorsOut(0), + _idleExpireTime(Date_t::min()) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } @@ -226,10 +232,13 @@ uint64_t WiredTigerSession::genTableId() { // ----------------------- WiredTigerSessionCache::WiredTigerSessionCache(WiredTigerKVEngine* engine) - : _engine(engine), _conn(engine->getConnection()), _shuttingDown(0) {} + : _engine(engine), + _conn(engine->getConnection()), + _clockSource(_engine->getClockSource()), + _shuttingDown(0) {} -WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn) - : _engine(NULL), _conn(conn), _shuttingDown(0) {} +WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn, ClockSource* cs) + : _engine(NULL), _conn(conn), _clockSource(cs), _shuttingDown(0) {} WiredTigerSessionCache::~WiredTigerSessionCache() { shuttingDown(); @@ -365,6 +374,34 @@ void WiredTigerSessionCache::closeCursorsForQueuedDrops() { } } +size_t WiredTigerSessionCache::getIdleSessionsCount() { + stdx::lock_guard<stdx::mutex> lock(_cacheLock); + return _sessions.size(); +} + +void WiredTigerSessionCache::closeExpiredIdleSessions(int64_t idleTimeMillis) { + // Do nothing if session close idle time is set to 0 or less + if (idleTimeMillis <= 0) { + return; + } + + auto cutoffTime = _clockSource->now() - Milliseconds(idleTimeMillis); + { + stdx::lock_guard<stdx::mutex> lock(_cacheLock); + // Discard all sessions that became idle before the cutoff time + for (auto it = _sessions.begin(); it != _sessions.end();) { + auto session = *it; + invariant(session->getIdleExpireTime() != Date_t::min()); + if (session->getIdleExpireTime() < cutoffTime) { + it = _sessions.erase(it); + delete (session); + } else { + ++it; + } + } + } +} + void WiredTigerSessionCache::closeAll() { // Increment the epoch as we are now closing all sessions with this epoch. SessionCache swap; @@ -396,6 +433,8 @@ UniqueWiredTigerSession WiredTigerSessionCache::getSession() { // discarding older ones WiredTigerSession* cachedSession = _sessions.back(); _sessions.pop_back(); + // Reset the idle time + cachedSession->setIdleExpireTime(Date_t::min()); return UniqueWiredTigerSession(cachedSession); } } @@ -450,8 +489,9 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { bool dropQueuedIdentsAtSessionEnd = session->isDropQueuedIdentsAtSessionEndAllowed(); // Reset this session's flag for dropping queued idents to default, before returning it to - // session cache. + // session cache. Also set the time this session got idle at. session->dropQueuedIdentsAtSessionEndAllowed(true); + session->setIdleExpireTime(_clockSource->now()); if (session->_getEpoch() == currentEpoch) { // check outside of lock to reduce contention stdx::lock_guard<stdx::mutex> lock(_cacheLock); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index 7a87cef036e..affbf28fe55 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -156,6 +156,14 @@ public: */ static const uint64_t kMetadataTableId = 0; + void setIdleExpireTime(Date_t idleExpireTime) { + _idleExpireTime = idleExpireTime; + } + + Date_t getIdleExpireTime() const { + return _idleExpireTime; + } + private: friend class WiredTigerSessionCache; @@ -180,6 +188,7 @@ private: uint64_t _cursorGen; int _cursorsOut; bool _dropQueuedIdentsAtSessionEnd = true; + Date_t _idleExpireTime; }; /** @@ -189,7 +198,7 @@ private: class WiredTigerSessionCache { public: WiredTigerSessionCache(WiredTigerKVEngine* engine); - WiredTigerSessionCache(WT_CONNECTION* conn); + WiredTigerSessionCache(WT_CONNECTION* conn, ClockSource* cs); ~WiredTigerSessionCache(); /** @@ -213,6 +222,16 @@ public: std::unique_ptr<WiredTigerSession, WiredTigerSessionDeleter> getSession(); /** + * Get a count of idle sessions in the session cache. + */ + size_t getIdleSessionsCount(); + + /** + * Closes all cached sessions whose idle expiration time has been reached. + */ + void closeExpiredIdleSessions(int64_t idleTimeMillis); + + /** * Free all cached sessions and ensures that previously acquired sessions will be freed on * release. */ @@ -286,8 +305,9 @@ public: } private: - WiredTigerKVEngine* _engine; // not owned, might be NULL - WT_CONNECTION* _conn; // not owned + WiredTigerKVEngine* _engine; // not owned, might be NULL + WT_CONNECTION* _conn; // not owned + ClockSource* const _clockSource; // not owned WiredTigerSnapshotManager _snapshotManager; // Used as follows: diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp new file mode 100644 index 00000000000..82209292da6 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp @@ -0,0 +1,119 @@ +// wiredtiger_session_cache_test.cpp + + +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <sstream> +#include <string> + +#include "mongo/base/string_data.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/system_clock_source.h" + +namespace mongo { + +using std::string; +using std::stringstream; + +class WiredTigerConnection { +public: + WiredTigerConnection(StringData dbpath, StringData extraStrings) : _conn(NULL) { + std::stringstream ss; + ss << "create,"; + ss << extraStrings; + string config = ss.str(); + _fastClockSource = stdx::make_unique<SystemClockSource>(); + int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &_conn); + ASSERT_OK(wtRCToStatus(ret)); + ASSERT(_conn); + } + ~WiredTigerConnection() { + _conn->close(_conn, NULL); + } + WT_CONNECTION* getConnection() const { + return _conn; + } + ClockSource* getClockSource() { + return _fastClockSource.get(); + } + +private: + WT_CONNECTION* _conn; + std::unique_ptr<ClockSource> _fastClockSource; +}; + +class WiredTigerSessionCacheHarnessHelper { +public: + WiredTigerSessionCacheHarnessHelper(StringData extraStrings) + : _dbpath("wt_test"), + _connection(_dbpath.path(), extraStrings), + _sessionCache(_connection.getConnection(), _connection.getClockSource()) {} + + + WiredTigerSessionCache* getSessionCache() { + return &_sessionCache; + } + +private: + unittest::TempDir _dbpath; + WiredTigerConnection _connection; + WiredTigerSessionCache _sessionCache; +}; + +TEST(WiredTigerSessionCacheTest, CheckSessionCacheCleanup) { + WiredTigerSessionCacheHarnessHelper harnessHelper(""); + WiredTigerSessionCache* sessionCache = harnessHelper.getSessionCache(); + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U); + { + UniqueWiredTigerSession _session = sessionCache->getSession(); + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U); + } + // Destroying of a session puts it in the session cache + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U); + + // An idle timeout of 0 means never expire idle sessions + sessionCache->closeExpiredIdleSessions(0); + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U); + sleepmillis(10); + + // Expire sessions that have been idle for 10 secs + sessionCache->closeExpiredIdleSessions(10000); + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U); + // Expire sessions that have been idle for 2 millisecs + sessionCache->closeExpiredIdleSessions(2); + ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U); +} + +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp index 0078021ff21..7806d375c91 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp @@ -48,6 +48,7 @@ #include "mongo/stdx/memory.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/system_clock_source.h" namespace mongo { namespace { @@ -61,7 +62,8 @@ public: int ret = wiredtiger_open(_dbpath.path().c_str(), NULL, config, &_conn); invariantWTOK(ret); - _sessionCache = new WiredTigerSessionCache(_conn); + _fastClockSource = stdx::make_unique<SystemClockSource>(); + _sessionCache = new WiredTigerSessionCache(_conn, _fastClockSource.get()); } ~MyHarnessHelper() final { @@ -110,6 +112,7 @@ public: private: unittest::TempDir _dbpath; + std::unique_ptr<ClockSource> _fastClockSource; WT_CONNECTION* _conn; WiredTigerSessionCache* _sessionCache; WiredTigerOplogManager _oplogManager; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp index a3431d77cd3..f86ed718881 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/system_clock_source.h" namespace mongo { @@ -57,6 +58,7 @@ public: ss << "create,"; ss << extraStrings; string config = ss.str(); + _fastClockSource = stdx::make_unique<SystemClockSource>(); int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &_conn); ASSERT_OK(wtRCToStatus(ret)); ASSERT(_conn); @@ -67,9 +69,13 @@ public: WT_CONNECTION* getConnection() const { return _conn; } + ClockSource* getClockSource() { + return _fastClockSource.get(); + } private: WT_CONNECTION* _conn; + std::unique_ptr<ClockSource> _fastClockSource; }; class WiredTigerUtilHarnessHelper { @@ -77,7 +83,7 @@ public: WiredTigerUtilHarnessHelper(StringData extraStrings) : _dbpath("wt_test"), _connection(_dbpath.path(), extraStrings), - _sessionCache(_connection.getConnection()) {} + _sessionCache(_connection.getConnection(), _connection.getClockSource()) {} WiredTigerSessionCache* getSessionCache() { |