summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSulabh Mahajan <sulabh.mahajan@mongodb.com>2019-01-16 10:50:34 +1100
committerSulabh Mahajan <sulabh.mahajan@mongodb.com>2019-01-16 10:50:34 +1100
commit97de2142f89ab280a4d0b2ddf168248c79f741d0 (patch)
treeae193ed6b5dbeb00eacb00eba3b844e6ea424057
parent77080261840ce98ea3808f570a7ef72ebeb08b78 (diff)
downloadmongo-97de2142f89ab280a4d0b2ddf168248c79f741d0.tar.gz
SERVER-38779 Have a session sweep job to close old idle WT sessionsr4.1.7
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript9
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp71
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp52
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h26
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp119
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp8
9 files changed, 288 insertions, 13 deletions
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index ee16fbc7ab0..cd313193493 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -266,3 +266,12 @@ if wiredtiger:
'storage_wiredtiger_mock',
],
)
+
+ wtEnv.CppUnitTest(
+ target='storage_wiredtiger_session_cache_test',
+ source=['wiredtiger_session_cache_test.cpp',
+ ],
+ LIBDEPS=[
+ 'storage_wiredtiger_mock',
+ ],
+ )
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index ef00dd7a21b..7f4df8bc885 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -99,6 +99,19 @@
namespace mongo {
+// Close idle wiredtiger sessions in the session cache after this many seconds.
+// The default is 5 mins. Have a shorter default in the debug build to aid testing.
+MONGO_EXPORT_SERVER_PARAMETER(wiredTigerSessionCloseIdleTimeSecs,
+ std::int32_t,
+ kDebugBuild ? 5 : 300)
+ ->withValidator([](const auto& potentialNewValue) {
+ if (potentialNewValue < 0) {
+ return Status(ErrorCodes::BadValue,
+ "wiredTigerSessionCloseIdleTimeSecs must be greater than or equal to 0s");
+ }
+ return Status::OK();
+ });
+
bool WiredTigerFileVersion::shouldDowngrade(bool readOnly,
bool repairMode,
bool hasRecoveryTimestamp) {
@@ -169,6 +182,54 @@ namespace dps = ::mongo::dotted_path_support;
const int WiredTigerKVEngine::kDefaultJournalDelayMillis = 100;
+class WiredTigerKVEngine::WiredTigerSessionSweeper : public BackgroundJob {
+public:
+ explicit WiredTigerSessionSweeper(WiredTigerSessionCache* sessionCache)
+ : BackgroundJob(false /* deleteSelf */), _sessionCache(sessionCache) {}
+
+ virtual string name() const {
+ return "WTIdleSessionSweeper";
+ }
+
+ virtual void run() {
+ ThreadClient tc(name(), getGlobalServiceContext());
+ LOG(1) << "starting " << name() << " thread";
+
+ while (!_shuttingDown.load()) {
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ MONGO_IDLE_THREAD_BLOCK;
+ // Check every 10 seconds or sooner in the debug builds
+ _condvar.wait_for(lock, stdx::chrono::seconds(kDebugBuild ? 1 : 10));
+ }
+
+ _sessionCache->closeExpiredIdleSessions(wiredTigerSessionCloseIdleTimeSecs.load() *
+ 1000);
+ }
+ LOG(1) << "stopping " << name() << " thread";
+ }
+
+ void shutdown() {
+ _shuttingDown.store(true);
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ // Wake up the session sweeper thread early, we do not want the shutdown
+ // to wait for us too long.
+ _condvar.notify_one();
+ }
+ wait();
+ }
+
+private:
+ WiredTigerSessionCache* _sessionCache;
+ AtomicWord<bool> _shuttingDown{false};
+
+ stdx::mutex _mutex; // protects _condvar
+ // The session sweeper thread idles on this condition variable for a particular time duration
+ // between cleaning up expired sessions. It can be triggered early to expediate shutdown.
+ stdx::condition_variable _condvar;
+};
+
class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob {
public:
explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache)
@@ -369,7 +430,7 @@ private:
WiredTigerSessionCache* _sessionCache;
stdx::mutex _mutex; // protects _condvar
- // The checkpoint thead idles on this condition variable for a particular time duration between
+ // The checkpoint thread idles on this condition variable for a particular time duration between
// taking checkpoints. It can be triggered early to expediate immediate checkpointing.
stdx::condition_variable _condvar;
@@ -585,6 +646,9 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
_sessionCache.reset(new WiredTigerSessionCache(this));
+ _sessionSweeper = stdx::make_unique<WiredTigerSessionSweeper>(_sessionCache.get());
+ _sessionSweeper->go();
+
if (_durable && !_ephemeral) {
_journalFlusher = stdx::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
_journalFlusher->go();
@@ -716,6 +780,11 @@ void WiredTigerKVEngine::cleanShutdown() {
}
// these must be the last things we do before _conn->close();
+ if (_sessionSweeper) {
+ log() << "Shutting down session sweeper thread";
+ _sessionSweeper->shutdown();
+ log() << "Finished shutting down session sweeper thread";
+ }
if (_journalFlusher) {
log() << "Shutting down journal flusher thread";
_journalFlusher->shutdown();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 30f1ec78d91..83d5c0191d0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -347,7 +347,12 @@ public:
*/
Timestamp getPinnedOplog() const;
+ ClockSource* getClockSource() const {
+ return _clockSource;
+ }
+
private:
+ class WiredTigerSessionSweeper;
class WiredTigerJournalFlusher;
class WiredTigerCheckpointThread;
@@ -432,6 +437,7 @@ private:
// timestamp.
const bool _keepDataHistory = true;
+ std::unique_ptr<WiredTigerSessionSweeper> _sessionSweeper;
std::unique_ptr<WiredTigerJournalFlusher> _journalFlusher; // Depends on _sizeStorer
std::unique_ptr<WiredTigerCheckpointThread> _checkpointThread;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp
index d6e2af60630..512cadb6748 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_index_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/system_clock_source.h"
namespace mongo {
namespace {
@@ -61,7 +62,8 @@ public:
int ret = wiredtiger_open(_dbpath.path().c_str(), NULL, config, &_conn);
invariantWTOK(ret);
- _sessionCache = new WiredTigerSessionCache(_conn);
+ _fastClockSource = stdx::make_unique<SystemClockSource>();
+ _sessionCache = new WiredTigerSessionCache(_conn, _fastClockSource.get());
}
~MyHarnessHelper() final {
@@ -110,6 +112,7 @@ public:
private:
unittest::TempDir _dbpath;
+ std::unique_ptr<ClockSource> _fastClockSource;
WT_CONNECTION* _conn;
WiredTigerSessionCache* _sessionCache;
WiredTigerOplogManager _oplogManager;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 9c0fe8078ba..3d4038aab1d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -88,7 +88,12 @@ ExportedServerParameter<std::int32_t, ServerParameterType::kStartupAndRuntime>
&kWiredTigerCursorCacheSize);
WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64_t cursorEpoch)
- : _epoch(epoch), _cursorEpoch(cursorEpoch), _session(NULL), _cursorGen(0), _cursorsOut(0) {
+ : _epoch(epoch),
+ _cursorEpoch(cursorEpoch),
+ _session(NULL),
+ _cursorGen(0),
+ _cursorsOut(0),
+ _idleExpireTime(Date_t::min()) {
invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
}
@@ -101,7 +106,8 @@ WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn,
_cache(cache),
_session(NULL),
_cursorGen(0),
- _cursorsOut(0) {
+ _cursorsOut(0),
+ _idleExpireTime(Date_t::min()) {
invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
}
@@ -226,10 +232,13 @@ uint64_t WiredTigerSession::genTableId() {
// -----------------------
WiredTigerSessionCache::WiredTigerSessionCache(WiredTigerKVEngine* engine)
- : _engine(engine), _conn(engine->getConnection()), _shuttingDown(0) {}
+ : _engine(engine),
+ _conn(engine->getConnection()),
+ _clockSource(_engine->getClockSource()),
+ _shuttingDown(0) {}
-WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn)
- : _engine(NULL), _conn(conn), _shuttingDown(0) {}
+WiredTigerSessionCache::WiredTigerSessionCache(WT_CONNECTION* conn, ClockSource* cs)
+ : _engine(NULL), _conn(conn), _clockSource(cs), _shuttingDown(0) {}
WiredTigerSessionCache::~WiredTigerSessionCache() {
shuttingDown();
@@ -365,6 +374,34 @@ void WiredTigerSessionCache::closeCursorsForQueuedDrops() {
}
}
+size_t WiredTigerSessionCache::getIdleSessionsCount() {
+ stdx::lock_guard<stdx::mutex> lock(_cacheLock);
+ return _sessions.size();
+}
+
+void WiredTigerSessionCache::closeExpiredIdleSessions(int64_t idleTimeMillis) {
+ // Do nothing if session close idle time is set to 0 or less
+ if (idleTimeMillis <= 0) {
+ return;
+ }
+
+ auto cutoffTime = _clockSource->now() - Milliseconds(idleTimeMillis);
+ {
+ stdx::lock_guard<stdx::mutex> lock(_cacheLock);
+ // Discard all sessions that became idle before the cutoff time
+ for (auto it = _sessions.begin(); it != _sessions.end();) {
+ auto session = *it;
+ invariant(session->getIdleExpireTime() != Date_t::min());
+ if (session->getIdleExpireTime() < cutoffTime) {
+ it = _sessions.erase(it);
+ delete (session);
+ } else {
+ ++it;
+ }
+ }
+ }
+}
+
void WiredTigerSessionCache::closeAll() {
// Increment the epoch as we are now closing all sessions with this epoch.
SessionCache swap;
@@ -396,6 +433,8 @@ UniqueWiredTigerSession WiredTigerSessionCache::getSession() {
// discarding older ones
WiredTigerSession* cachedSession = _sessions.back();
_sessions.pop_back();
+ // Reset the idle time
+ cachedSession->setIdleExpireTime(Date_t::min());
return UniqueWiredTigerSession(cachedSession);
}
}
@@ -450,8 +489,9 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
bool dropQueuedIdentsAtSessionEnd = session->isDropQueuedIdentsAtSessionEndAllowed();
// Reset this session's flag for dropping queued idents to default, before returning it to
- // session cache.
+ // session cache. Also set the time this session got idle at.
session->dropQueuedIdentsAtSessionEndAllowed(true);
+ session->setIdleExpireTime(_clockSource->now());
if (session->_getEpoch() == currentEpoch) { // check outside of lock to reduce contention
stdx::lock_guard<stdx::mutex> lock(_cacheLock);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index 7a87cef036e..affbf28fe55 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -156,6 +156,14 @@ public:
*/
static const uint64_t kMetadataTableId = 0;
+ void setIdleExpireTime(Date_t idleExpireTime) {
+ _idleExpireTime = idleExpireTime;
+ }
+
+ Date_t getIdleExpireTime() const {
+ return _idleExpireTime;
+ }
+
private:
friend class WiredTigerSessionCache;
@@ -180,6 +188,7 @@ private:
uint64_t _cursorGen;
int _cursorsOut;
bool _dropQueuedIdentsAtSessionEnd = true;
+ Date_t _idleExpireTime;
};
/**
@@ -189,7 +198,7 @@ private:
class WiredTigerSessionCache {
public:
WiredTigerSessionCache(WiredTigerKVEngine* engine);
- WiredTigerSessionCache(WT_CONNECTION* conn);
+ WiredTigerSessionCache(WT_CONNECTION* conn, ClockSource* cs);
~WiredTigerSessionCache();
/**
@@ -213,6 +222,16 @@ public:
std::unique_ptr<WiredTigerSession, WiredTigerSessionDeleter> getSession();
/**
+ * Get a count of idle sessions in the session cache.
+ */
+ size_t getIdleSessionsCount();
+
+ /**
+ * Closes all cached sessions whose idle expiration time has been reached.
+ */
+ void closeExpiredIdleSessions(int64_t idleTimeMillis);
+
+ /**
* Free all cached sessions and ensures that previously acquired sessions will be freed on
* release.
*/
@@ -286,8 +305,9 @@ public:
}
private:
- WiredTigerKVEngine* _engine; // not owned, might be NULL
- WT_CONNECTION* _conn; // not owned
+ WiredTigerKVEngine* _engine; // not owned, might be NULL
+ WT_CONNECTION* _conn; // not owned
+ ClockSource* const _clockSource; // not owned
WiredTigerSnapshotManager _snapshotManager;
// Used as follows:
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp
new file mode 100644
index 00000000000..82209292da6
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache_test.cpp
@@ -0,0 +1,119 @@
+// wiredtiger_session_cache_test.cpp
+
+
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <sstream>
+#include <string>
+
+#include "mongo/base/string_data.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/system_clock_source.h"
+
+namespace mongo {
+
+using std::string;
+using std::stringstream;
+
+class WiredTigerConnection {
+public:
+ WiredTigerConnection(StringData dbpath, StringData extraStrings) : _conn(NULL) {
+ std::stringstream ss;
+ ss << "create,";
+ ss << extraStrings;
+ string config = ss.str();
+ _fastClockSource = stdx::make_unique<SystemClockSource>();
+ int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &_conn);
+ ASSERT_OK(wtRCToStatus(ret));
+ ASSERT(_conn);
+ }
+ ~WiredTigerConnection() {
+ _conn->close(_conn, NULL);
+ }
+ WT_CONNECTION* getConnection() const {
+ return _conn;
+ }
+ ClockSource* getClockSource() {
+ return _fastClockSource.get();
+ }
+
+private:
+ WT_CONNECTION* _conn;
+ std::unique_ptr<ClockSource> _fastClockSource;
+};
+
+class WiredTigerSessionCacheHarnessHelper {
+public:
+ WiredTigerSessionCacheHarnessHelper(StringData extraStrings)
+ : _dbpath("wt_test"),
+ _connection(_dbpath.path(), extraStrings),
+ _sessionCache(_connection.getConnection(), _connection.getClockSource()) {}
+
+
+ WiredTigerSessionCache* getSessionCache() {
+ return &_sessionCache;
+ }
+
+private:
+ unittest::TempDir _dbpath;
+ WiredTigerConnection _connection;
+ WiredTigerSessionCache _sessionCache;
+};
+
+TEST(WiredTigerSessionCacheTest, CheckSessionCacheCleanup) {
+ WiredTigerSessionCacheHarnessHelper harnessHelper("");
+ WiredTigerSessionCache* sessionCache = harnessHelper.getSessionCache();
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U);
+ {
+ UniqueWiredTigerSession _session = sessionCache->getSession();
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U);
+ }
+ // Destroying of a session puts it in the session cache
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U);
+
+ // An idle timeout of 0 means never expire idle sessions
+ sessionCache->closeExpiredIdleSessions(0);
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U);
+ sleepmillis(10);
+
+ // Expire sessions that have been idle for 10 secs
+ sessionCache->closeExpiredIdleSessions(10000);
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 1U);
+ // Expire sessions that have been idle for 2 millisecs
+ sessionCache->closeExpiredIdleSessions(2);
+ ASSERT_EQUALS(sessionCache->getIdleSessionsCount(), 0U);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp
index 0078021ff21..7806d375c91 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_index_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/system_clock_source.h"
namespace mongo {
namespace {
@@ -61,7 +62,8 @@ public:
int ret = wiredtiger_open(_dbpath.path().c_str(), NULL, config, &_conn);
invariantWTOK(ret);
- _sessionCache = new WiredTigerSessionCache(_conn);
+ _fastClockSource = stdx::make_unique<SystemClockSource>();
+ _sessionCache = new WiredTigerSessionCache(_conn, _fastClockSource.get());
}
~MyHarnessHelper() final {
@@ -110,6 +112,7 @@ public:
private:
unittest::TempDir _dbpath;
+ std::unique_ptr<ClockSource> _fastClockSource;
WT_CONNECTION* _conn;
WiredTigerSessionCache* _sessionCache;
WiredTigerOplogManager _oplogManager;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
index a3431d77cd3..f86ed718881 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/system_clock_source.h"
namespace mongo {
@@ -57,6 +58,7 @@ public:
ss << "create,";
ss << extraStrings;
string config = ss.str();
+ _fastClockSource = stdx::make_unique<SystemClockSource>();
int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &_conn);
ASSERT_OK(wtRCToStatus(ret));
ASSERT(_conn);
@@ -67,9 +69,13 @@ public:
WT_CONNECTION* getConnection() const {
return _conn;
}
+ ClockSource* getClockSource() {
+ return _fastClockSource.get();
+ }
private:
WT_CONNECTION* _conn;
+ std::unique_ptr<ClockSource> _fastClockSource;
};
class WiredTigerUtilHarnessHelper {
@@ -77,7 +83,7 @@ public:
WiredTigerUtilHarnessHelper(StringData extraStrings)
: _dbpath("wt_test"),
_connection(_dbpath.path(), extraStrings),
- _sessionCache(_connection.getConnection()) {}
+ _sessionCache(_connection.getConnection(), _connection.getClockSource()) {}
WiredTigerSessionCache* getSessionCache() {