summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript21
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/reap_logical_session_cache_now.cpp96
-rw-r--r--src/mongo/db/db.cpp2
-rw-r--r--src/mongo/db/logical_session_cache.h5
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp33
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.h5
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.cpp2
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp53
-rw-r--r--src/mongo/db/logical_session_cache_impl.h14
-rw-r--r--src/mongo/db/logical_session_cache_noop.h4
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp4
-rw-r--r--src/mongo/db/logical_session_id.idl2
-rw-r--r--src/mongo/db/logical_session_id_test.cpp2
-rw-r--r--src/mongo/db/sessions_collection.cpp50
-rw-r--r--src/mongo/db/sessions_collection.h31
-rw-r--r--src/mongo/db/sessions_collection_mock.h5
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp90
-rw-r--r--src/mongo/db/sessions_collection_rs.h6
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp12
-rw-r--r--src/mongo/db/sessions_collection_sharded.h3
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp18
-rw-r--r--src/mongo/db/sessions_collection_standalone.h3
-rw-r--r--src/mongo/db/transaction_reaper.cpp249
-rw-r--r--src/mongo/db/transaction_reaper.h62
25 files changed, 699 insertions, 74 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 4128794d1c1..e63c3b20742 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1050,6 +1050,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/s/query/cluster_query',
+ 'sessions_collection_rs',
],
)
@@ -1082,6 +1083,23 @@ env.Library(
],
)
+env.Library(
+ target='transaction_reaper',
+ source=[
+ 'transaction_reaper.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/s/client/shard_interface',
+ '$BUILD_DIR/mongo/s/coreshard',
+ 'dbdirectclient',
+ 'logical_session_id',
+ 'sessions_collection',
+ 'write_ops',
+ ],
+)
+
envWithAsio.CppUnitTest(
target='logical_session_cache_test',
source=[
@@ -1115,6 +1133,9 @@ envWithAsio.Library(
'sessions_collection_sharded',
'sessions_collection_standalone',
],
+ LIBDEPS_PRIVATE=[
+ 'transaction_reaper',
+ ],
)
envWithAsio.Library(
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 3efcaa19f17..99cb82a1298 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -68,6 +68,7 @@ env.Library(
"kill_sessions_command.cpp",
"mr_common.cpp",
"parameters.cpp",
+ "reap_logical_session_cache_now.cpp",
"refresh_logical_session_cache_now.cpp",
"refresh_sessions_command.cpp",
"refresh_sessions_command_internal.cpp",
diff --git a/src/mongo/db/commands/reap_logical_session_cache_now.cpp b/src/mongo/db/commands/reap_logical_session_cache_now.cpp
new file mode 100644
index 00000000000..a1817d4f24e
--- /dev/null
+++ b/src/mongo/db/commands/reap_logical_session_cache_now.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/init.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/logical_session_cache.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+namespace {
+
+class ReapLogicalSessionCacheNowCommand final : public BasicCommand {
+ MONGO_DISALLOW_COPYING(ReapLogicalSessionCacheNowCommand);
+
+public:
+ ReapLogicalSessionCacheNowCommand() : BasicCommand("reapLogicalSessionCacheNow") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+
+ bool adminOnly() const override {
+ return false;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ void help(std::stringstream& help) const override {
+ help << "force the logical session cache to reap. Test command only.";
+ }
+
+ // No auth needed because it only works when enabled via command line.
+ Status checkAuthForOperation(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+ return Status::OK();
+ }
+
+ virtual bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ auto cache = LogicalSessionCache::get(opCtx);
+ auto client = opCtx->getClient();
+
+ auto res = cache->reapNow(client);
+ if (!res.isOK()) {
+ return appendCommandStatus(result, res);
+ }
+
+ return true;
+ }
+};
+
+MONGO_INITIALIZER(RegisterReapLogicalSessionCacheNowCommand)(InitializerContext* context) {
+ if (Command::testCommandsEnabled) {
+ // Leaked intentionally: a Command registers itself when constructed.
+ new ReapLogicalSessionCacheNowCommand();
+ }
+ return Status::OK();
+}
+
+} // namespace
+
+} // namespace mongo
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 85ee094c037..7bef30a8593 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -776,7 +776,7 @@ ExitCode _initAndListen(int listenPort) {
kind = LogicalSessionCacheServer::kReplicaSet;
}
- auto sessionCache = makeLogicalSessionCacheD(kind);
+ auto sessionCache = makeLogicalSessionCacheD(globalServiceContext, kind);
LogicalSessionCache::set(globalServiceContext, std::move(sessionCache));
// MessageServer::run will return when exit code closes its socket and we don't need the
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 8e7ba166874..3a1ce97ef39 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -97,6 +97,11 @@ public:
virtual Status refreshNow(Client* client) = 0;
/**
+ * Reaps transaction records synchronously.
+ */
+ virtual Status reapNow(Client* client) = 0;
+
+ /**
* Returns the current time.
*/
virtual Date_t now() = 0;
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 7f958e547a7..0fdc1135d56 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include <memory>
@@ -37,20 +39,22 @@
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/sessions_collection_standalone.h"
+#include "mongo/db/transaction_reaper.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
-std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) {
+std::shared_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) {
switch (state) {
case LogicalSessionCacheServer::kSharded:
- return stdx::make_unique<SessionsCollectionSharded>();
+ return std::make_shared<SessionsCollectionSharded>();
case LogicalSessionCacheServer::kReplicaSet:
- return stdx::make_unique<SessionsCollectionRS>();
+ return std::make_shared<SessionsCollectionRS>();
case LogicalSessionCacheServer::kStandalone:
- return stdx::make_unique<SessionsCollectionStandalone>();
+ return std::make_shared<SessionsCollectionStandalone>();
default:
MONGO_UNREACHABLE;
}
@@ -58,13 +62,28 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe
} // namespace
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) {
+std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(ServiceContext* svc,
+ LogicalSessionCacheServer state) {
auto liason = stdx::make_unique<ServiceLiasonMongod>();
// Set up the logical session cache
auto sessionsColl = makeSessionsCollection(state);
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liason), std::move(sessionsColl), LogicalSessionCacheImpl::Options{});
+
+ auto reaper = [&]() -> std::shared_ptr<TransactionReaper> {
+ switch (state) {
+ case LogicalSessionCacheServer::kSharded:
+ return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl);
+ case LogicalSessionCacheServer::kReplicaSet:
+ return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl);
+ default:
+ return nullptr;
+ }
+ }();
+
+ return stdx::make_unique<LogicalSessionCacheImpl>(std::move(liason),
+ std::move(sessionsColl),
+ std::move(reaper),
+ LogicalSessionCacheImpl::Options{});
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.h b/src/mongo/db/logical_session_cache_factory_mongod.h
index e69c459d469..b6ac0430fd0 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.h
+++ b/src/mongo/db/logical_session_cache_factory_mongod.h
@@ -37,6 +37,9 @@ namespace mongo {
enum class LogicalSessionCacheServer { kSharded, kReplicaSet, kStandalone };
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state);
+class ServiceContext;
+
+std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(ServiceContext* svc,
+ LogicalSessionCacheServer state);
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp
index 6ba42da89de..9822ec00b93 100644
--- a/src/mongo/db/logical_session_cache_factory_mongos.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongos.cpp
@@ -45,7 +45,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() {
auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liason), std::move(sessionsColl), LogicalSessionCacheImpl::Options{});
+ std::move(liason), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{});
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index e84d431083b..ffafcccbd4f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -58,18 +58,22 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, f
constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service,
- std::unique_ptr<SessionsCollection> collection,
- Options options)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(
+ std::unique_ptr<ServiceLiason> service,
+ std::shared_ptr<SessionsCollection> collection,
+ std::shared_ptr<TransactionReaper> transactionReaper,
+ Options options)
: _refreshInterval(options.refreshInterval),
_sessionTimeout(options.sessionTimeout),
_service(std::move(service)),
_sessionsColl(std::move(collection)),
+ _transactionReaper(std::move(transactionReaper)),
_cache(options.capacity) {
if (!disableLogicalSessionCacheRefresh) {
- PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); },
- duration_cast<Milliseconds>(_refreshInterval)};
- _service->scheduleJob(std::move(job));
+ _service->scheduleJob(
+ {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval});
+ _service->scheduleJob(
+ {[this](Client* client) { _periodicReap(client); }, _refreshInterval});
}
}
@@ -145,6 +149,10 @@ Status LogicalSessionCacheImpl::refreshNow(Client* client) {
return _refresh(client);
}
+Status LogicalSessionCacheImpl::reapNow(Client* client) {
+ return _reap(client);
+}
+
Date_t LogicalSessionCacheImpl::now() {
return _service->now();
}
@@ -163,6 +171,39 @@ void LogicalSessionCacheImpl::_periodicRefresh(Client* client) {
return;
}
+void LogicalSessionCacheImpl::_periodicReap(Client* client) {
+ auto res = _reap(client);
+ if (!res.isOK()) {
+ log() << "Failed to reap transaction table: " << res;
+ }
+
+ return;
+}
+
+Status LogicalSessionCacheImpl::_reap(Client* client) {
+ if (!_transactionReaper) {
+ return Status::OK();
+ }
+
+ try {
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+ stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
+ _transactionReaper->reap(opCtx);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+
+ return Status::OK();
+}
+
Status LogicalSessionCacheImpl::_refresh(Client* client) {
LogicalSessionRecordSet activeSessions;
LogicalSessionRecordSet deadSessions;
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 47a2f897f3b..688e04a8a37 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -34,6 +34,7 @@
#include "mongo/db/service_liason.h"
#include "mongo/db/sessions_collection.h"
#include "mongo/db/time_proof_service.h"
+#include "mongo/db/transaction_reaper.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/lru_cache.h"
@@ -96,7 +97,8 @@ public:
* Construct a new session cache.
*/
explicit LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service,
- std::unique_ptr<SessionsCollection> collection,
+ std::shared_ptr<SessionsCollection> collection,
+ std::shared_ptr<TransactionReaper> transactionReaper,
Options options = Options{});
LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete;
@@ -119,6 +121,8 @@ public:
Status refreshNow(Client* client) override;
+ Status reapNow(Client* client) override;
+
Date_t now() override;
size_t size() override;
@@ -138,6 +142,9 @@ private:
void _periodicRefresh(Client* client);
Status _refresh(Client* client);
+ void _periodicReap(Client* client);
+ Status _reap(Client* client);
+
/**
* Returns true if a record has passed its given expiration.
*/
@@ -152,7 +159,10 @@ private:
const Minutes _sessionTimeout;
std::unique_ptr<ServiceLiason> _service;
- std::unique_ptr<SessionsCollection> _sessionsColl;
+ std::shared_ptr<SessionsCollection> _sessionsColl;
+
+ mutable stdx::mutex _reaperMutex;
+ std::shared_ptr<TransactionReaper> _transactionReaper;
mutable stdx::mutex _cacheMutex;
LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache;
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index 7c5e0be1784..077efa741b9 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -66,6 +66,10 @@ public:
return Status::OK();
}
+ Status reapNow(Client* client) override {
+ return Status::OK();
+ }
+
Date_t now() override {
return Date_t::now();
}
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index 6198b61d233..cb8a226dea7 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -69,8 +69,8 @@ public:
auto mockService = stdx::make_unique<MockServiceLiason>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
- _cache = stdx::make_unique<LogicalSessionCacheImpl>(std::move(mockService),
- std::move(mockSessions));
+ _cache = stdx::make_unique<LogicalSessionCacheImpl>(
+ std::move(mockService), std::move(mockSessions), nullptr);
}
void tearDown() override {
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index e7bac0f5165..365f06edc0d 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -155,7 +155,7 @@ structs:
description: "Parser for forming the fetch request for SessionsCollection::fetch"
strict: true
fields:
- find: namespacestring
+ find: string
filter: SessionsCollectionFetchRequestFilter
projection: SessionsCollectionFetchRequestProjection
batchSize: int
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index 7f763c8f625..6d796fb7710 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -100,7 +100,7 @@ public:
std::make_shared<MockSessionsCollectionImpl>());
auto localLogicalSessionCache = stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(localServiceLiason), std::move(localSessionsCollection));
+ std::move(localServiceLiason), std::move(localSessionsCollection), nullptr);
LogicalSessionCache::set(&serviceContext, std::move(localLogicalSessionCache));
}
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index d1618901350..b6341eb0423 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -144,14 +144,17 @@ Status runBulkCmd(StringData label,
constexpr StringData SessionsCollection::kSessionsDb;
constexpr StringData SessionsCollection::kSessionsCollection;
constexpr StringData SessionsCollection::kSessionsFullNS;
+const NamespaceString SessionsCollection::kSessionsNamespaceString =
+ NamespaceString{SessionsCollection::kSessionsFullNS};
SessionsCollection::~SessionsCollection() = default;
-SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) {
- auto send = [client](BSONObj batch) -> Status {
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(
+ const NamespaceString& ns, DBClientBase* client) {
+ auto send = [client, ns](BSONObj batch) -> Status {
BSONObj res;
- if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) {
+ if (!client->runCommand(ns.db().toString(), batch, res)) {
return getStatusFromCommandResult(res);
}
@@ -167,10 +170,11 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBCl
return send;
}
-SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) {
- auto send = [client](BSONObj cmd) -> Status {
+SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(const NamespaceString& ns,
+ DBClientBase* client) {
+ auto send = [client, ns](BSONObj cmd) -> Status {
BSONObj res;
- if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) {
+ if (!client->runCommand(ns.db().toString(), cmd, res)) {
return getStatusFromCommandResult(res);
}
@@ -180,10 +184,11 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClien
return send;
}
-SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClientBase* client) {
- auto send = [client](BSONObj cmd) -> StatusWith<BSONObj> {
+SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const NamespaceString& ns,
+ DBClientBase* client) {
+ auto send = [client, ns](BSONObj cmd) -> StatusWith<BSONObj> {
BSONObj res;
- if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) {
+ if (!client->runCommand(ns.db().toString(), cmd, res)) {
return getStatusFromCommandResult(res);
}
@@ -193,11 +198,12 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClien
return send;
}
-Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
+Status SessionsCollection::doRefresh(const NamespaceString& ns,
+ const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send) {
- auto init = [](BSONObjBuilder* batch) {
- batch->append("update", kSessionsCollection);
+ auto init = [ns](BSONObjBuilder* batch) {
+ batch->append("update", ns.coll());
batch->append("ordered", false);
};
@@ -210,7 +216,8 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
return runBulkCmd("updates", init, add, send, sessions);
}
-Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions,
+Status SessionsCollection::doRefreshExternal(const NamespaceString& ns,
+ const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionRecord>{}; };
@@ -227,9 +234,11 @@ Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sess
return runBulkGeneric(makeT, add, sendLocal, sessions);
}
-Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) {
- auto init = [](BSONObjBuilder* batch) {
- batch->append("delete", kSessionsCollection);
+Status SessionsCollection::doRemove(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
+ SendBatchFn send) {
+ auto init = [ns](BSONObjBuilder* batch) {
+ batch->append("delete", ns.coll());
batch->append("ordered", false);
};
@@ -240,12 +249,15 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat
return runBulkCmd("deletes", init, add, send, sessions);
}
-Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) {
+Status SessionsCollection::doRemoveExternal(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
+ SendBatchFn send) {
// TODO SERVER-28335 Implement endSessions, with internal counterpart.
return Status::OK();
}
-StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSessionIdSet& sessions,
+StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
FindBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionId>{}; };
@@ -275,7 +287,7 @@ StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSession
auto sendLocal = [&](std::vector<LogicalSessionId>& batch) {
SessionsCollectionFetchRequest request;
- request.setFind(NamespaceString{SessionsCollection::kSessionsCollection});
+ request.setFind(ns.coll());
request.setFilter({});
request.getFilter().set_id({});
request.getFilter().get_id().setIn(batch);
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 3c9a5d65fb1..e86e29cd67b 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -53,6 +53,8 @@ public:
static constexpr StringData kSessionsCollection = "system.sessions"_sd;
static constexpr StringData kSessionsFullNS = "admin.system.sessions"_sd;
+ static const NamespaceString kSessionsNamespaceString;
+
/**
* Updates the last-use times on the given sessions to be greater than
* or equal to the given time. Returns an error if a networking issue occurred.
@@ -71,6 +73,9 @@ public:
*/
virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
+ virtual Status removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) = 0;
+
/**
* Checks a set of lsids and returns the set that no longer exists
*
@@ -84,29 +89,39 @@ protected:
* Makes a send function for the given client.
*/
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
- SendBatchFn makeSendFnForCommand(DBClientBase* client);
- SendBatchFn makeSendFnForBatchWrite(DBClientBase* client);
+ SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>;
- FindBatchFn makeFindFnForCommand(DBClientBase* client);
+ FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
*/
- Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send);
- Status doRefreshExternal(const LogicalSessionRecordSet& sessions,
+ Status doRefresh(const NamespaceString& ns,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime,
+ SendBatchFn send);
+ Status doRefreshExternal(const NamespaceString& ns,
+ const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send);
/**
* Formats and sends batches of deletes for the given set of sessions.
*/
- Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send);
- Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send);
+ Status doRemove(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
+ SendBatchFn send);
+ Status doRemoveExternal(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
+ SendBatchFn send);
/**
* Formats and sends batches of fetches for the given set of sessions.
*/
- StatusWith<LogicalSessionIdSet> doFetch(const LogicalSessionIdSet& sessions, FindBatchFn send);
+ StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns,
+ const LogicalSessionIdSet& sessions,
+ FindBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h
index f88d877b17b..6a88a3af816 100644
--- a/src/mongo/db/sessions_collection_mock.h
+++ b/src/mongo/db/sessions_collection_mock.h
@@ -116,6 +116,11 @@ public:
return LogicalSessionIdSet{};
}
+ Status removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) override {
+ return Status::OK();
+ }
+
private:
std::shared_ptr<MockSessionsCollectionImpl> _impl;
};
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index 89387af9531..f4d5f44b453 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -84,13 +84,16 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo
}
template <typename Callback>
-auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback)
+auto runIfStandaloneOrPrimary(const NamespaceString& ns,
+ LockMode mode,
+ OperationContext* opCtx,
+ Callback callback)
-> boost::optional<decltype(std::declval<Callback>()())> {
- Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX);
- Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX);
+ Lock::DBLock lk(opCtx, ns.db(), mode);
+ Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, mode);
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) {
+ if (coord->canAcceptWritesForDatabase(opCtx, ns.db())) {
return callback();
}
@@ -110,10 +113,14 @@ auto sendToPrimary(OperationContext* opCtx, Callback callback)
}
template <typename LocalCallback, typename RemoteCallback>
-auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback)
+auto dispatch(const NamespaceString& ns,
+ LockMode mode,
+ OperationContext* opCtx,
+ LocalCallback localCallback,
+ RemoteCallback remoteCallback)
-> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) {
// If we are the primary, write directly to ourself.
- auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); });
+ auto result = runIfStandaloneOrPrimary(ns, mode, opCtx, [&] { return localCallback(); });
if (result) {
return *result;
@@ -127,38 +134,83 @@ auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallba
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions,
Date_t refreshTime) {
- return dispatch(opCtx,
+ return dispatch(
+ kSessionsNamespaceString,
+ MODE_IX,
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doRefresh(kSessionsNamespaceString,
+ sessions,
+ refreshTime,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ },
+ [&](DBClientBase* client) {
+ return doRefreshExternal(kSessionsNamespaceString,
+ sessions,
+ refreshTime,
+ makeSendFnForCommand(kSessionsNamespaceString, client));
+ });
+}
+
+Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return dispatch(kSessionsNamespaceString,
+ MODE_IX,
+ opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
+ return doRemove(kSessionsNamespaceString,
+ sessions,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
},
[&](DBClientBase* client) {
- return doRefreshExternal(
- sessions, refreshTime, makeSendFnForCommand(client));
+ return doRemoveExternal(
+ kSessionsNamespaceString,
+ sessions,
+ makeSendFnForCommand(kSessionsNamespaceString, client));
});
}
-Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- return dispatch(opCtx,
+StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ return dispatch(kSessionsNamespaceString,
+ MODE_IS,
+ opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRemove(sessions, makeSendFnForBatchWrite(&client));
+ return doFetch(kSessionsNamespaceString,
+ sessions,
+ makeFindFnForCommand(kSessionsNamespaceString, &client));
},
[&](DBClientBase* client) {
- return doRemoveExternal(sessions, makeSendFnForCommand(client));
+ return doFetch(kSessionsNamespaceString,
+ sessions,
+ makeFindFnForCommand(kSessionsNamespaceString, client));
});
}
-StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
- OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
return dispatch(
+ kSessionsNamespaceString,
+ MODE_IX,
opCtx,
[&] {
DBDirectClient client(opCtx);
- return doFetch(sessions, makeFindFnForCommand(&client));
+ return doRemove(NamespaceString::kSessionTransactionsTableNamespace,
+ sessions,
+ makeSendFnForBatchWrite(
+ NamespaceString::kSessionTransactionsTableNamespace, &client));
},
- [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); });
+ [](DBClientBase*) {
+ return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records");
+ });
+}
+
+Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions);
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h
index a273dd88955..cc49ecf511b 100644
--- a/src/mongo/db/sessions_collection_rs.h
+++ b/src/mongo/db/sessions_collection_rs.h
@@ -68,6 +68,12 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ Status removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) override;
+
+ static Status removeTransactionRecordsHelper(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index 6c35d317b26..544915c8494 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/query_request.h"
+#include "mongo/db/sessions_collection_rs.h"
#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -71,7 +72,7 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return Status(error, response.getErrMessage());
};
- return doRefresh(sessions, refreshTime, send);
+ return doRefresh(kSessionsNamespaceString, sessions, refreshTime, send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -94,7 +95,7 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return Status(error, response.getErrMessage());
};
- return doRemove(sessions, send);
+ return doRemove(kSessionsNamespaceString, sessions, send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -140,7 +141,12 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return result.obj();
};
- return doFetch(sessions, send);
+ return doFetch(kSessionsNamespaceString, sessions, send);
+}
+
+Status SessionsCollectionSharded::removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return SessionsCollectionRS::removeTransactionRecordsHelper(opCtx, sessions);
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index cb0a2e6fd9e..f9a8165a1c7 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -58,6 +58,9 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ Status removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 8187b19ad06..ada3583bb26 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -48,19 +48,31 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions,
Date_t refreshTime) {
DBDirectClient client(opCtx);
- return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
+ return doRefresh(kSessionsNamespaceString,
+ sessions,
+ refreshTime,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
}
Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
- return doRemove(sessions, makeSendFnForBatchWrite(&client));
+ return doRemove(kSessionsNamespaceString,
+ sessions,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
}
StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
- return doFetch(sessions, makeFindFnForCommand(&client));
+ return doFetch(kSessionsNamespaceString,
+ sessions,
+ makeFindFnForCommand(kSessionsNamespaceString, &client));
+}
+
+Status SessionsCollectionStandalone::removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ MONGO_UNREACHABLE;
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
index 2c1bfe32096..2ecd49afa79 100644
--- a/src/mongo/db/sessions_collection_standalone.h
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -57,6 +57,9 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ Status removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
new file mode 100644
index 00000000000..7257a56f29a
--- /dev/null
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -0,0 +1,249 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_reaper.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/db/session_txn_record.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/destructor_guard.h"
+
+namespace mongo {
+
+namespace {
+
+constexpr Minutes kTransactionRecordMinimumLifetime(30);
+
+/**
+ * The minimum lifetime for a transaction record is how long it has to have lived on the server
+ * before we'll consider it for cleanup. This is effectively the window for how long it is
+ * permissible for a mongos to hang before we're willing to accept a failure of the retryable write
+ * subsystem.
+ *
+ * Specifically, we imagine that a client connects to one mongos on a session and performs a
+ * retryable write. That mongos hangs. Then the client connects to a new mongos on the same
+ * session and successfully executes its write. After a day passes, the session will time out,
+ * cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
+ * executes the write (because all records of the session + transaction have been deleted).
+ *
+ * So the write is performed twice, which is unavoidable without losing session vivification and/or
+ * requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
+ * guarantee after the minimum transaction lifetime.
+ */
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
+ int,
+ kTransactionRecordMinimumLifetime.count());
+
+const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+
+/**
+ * Makes the query we'll use to scan the transactions table.
+ *
+ * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt
+ * to pull records likely to be on the same chunks (because they sort near each other).
+ */
+Query makeQuery(Date_t now) {
+ Timestamp possiblyExpired(
+ duration_cast<Seconds>(
+ (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()),
+ 0);
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder subbob(bob.subobjStart(SessionTxnRecord::kLastWriteOpTimeTsFieldName));
+ subbob.append("$lt", possiblyExpired);
+ }
+ Query query(bob.obj());
+ query.sort(kSortById);
+ return query;
+}
+
+/**
+ * Our impl is templatized on a type which handles the lsids we see. It provides the top level
+ * scaffolding for figuring out if we're the primary node responsible for the transaction table and
+ * invoking the hanlder.
+ *
+ * The handler here will see all of the possibly expired txn ids in the transaction table and will
+ * have a lifetime associated with a single call to reap.
+ */
+template <typename Handler>
+class TransactionReaperImpl final : public TransactionReaper {
+public:
+ TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
+ : _collection(std::move(collection)) {}
+
+ void reap(OperationContext* opCtx) override {
+ Handler handler(opCtx, _collection.get());
+
+ Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IS);
+ Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IS);
+
+ auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
+ if (coord->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
+ DBDirectClient client(opCtx);
+
+ auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ query,
+ 0,
+ 0,
+ &kIdProjection);
+
+ while (cursor->more()) {
+ auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
+ "TransactionSession"_sd, cursor->next());
+
+ handler.handleLsid(transactionSession.get_id());
+ }
+ }
+ }
+
+private:
+ std::shared_ptr<SessionsCollection> _collection;
+};
+
+void handleBatchHelper(SessionsCollection* sessionsCollection,
+ OperationContext* opCtx,
+ const LogicalSessionIdSet& batch) {
+ auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch));
+ uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed));
+}
+
+/**
+ * The repl impl is simple, just pass along to the sessions collection for checking ids locally
+ */
+class ReplHandler {
+public:
+ ReplHandler(OperationContext* opCtx, SessionsCollection* collection)
+ : _opCtx(opCtx), _sessionsCollection(collection) {}
+
+ ~ReplHandler() {
+ DESTRUCTOR_GUARD([&] { handleBatchHelper(_sessionsCollection, _opCtx, _batch); }());
+ }
+
+ void handleLsid(const LogicalSessionId& lsid) {
+ _batch.insert(lsid);
+ if (_batch.size() > write_ops::kMaxWriteBatchSize) {
+ handleBatchHelper(_sessionsCollection, _opCtx, _batch);
+ _batch.clear();
+ }
+ }
+
+private:
+ OperationContext* _opCtx;
+ SessionsCollection* _sessionsCollection;
+
+ LogicalSessionIdSet _batch;
+};
+
+/**
+ * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small
+ * scans.
+ */
+class ShardedHandler {
+public:
+ ShardedHandler(OperationContext* opCtx, SessionsCollection* collection)
+ : _opCtx(opCtx), _sessionsCollection(collection) {}
+
+ ~ShardedHandler() {
+ DESTRUCTOR_GUARD([&] {
+ for (const auto& pair : _shards) {
+ handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
+ }
+ }());
+ }
+
+ void handleLsid(const LogicalSessionId& lsid) {
+ // There are some lifetime issues with when the reaper starts up versus when the grid is
+ // available. Moving routing info fetching until after we have a transaction moves us past
+ // the problem.
+ //
+ // Also, we should only need the chunk case, but that'll wait until the sessions table is
+ // actually sharded.
+ if (!(_cm || _primary)) {
+ auto routingInfo =
+ uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
+ _opCtx, SessionsCollection::kSessionsFullNS));
+ _cm = routingInfo.cm();
+ _primary = routingInfo.primary();
+ }
+ ShardId shardId;
+ if (_cm) {
+ auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
+ shardId = chunk->getShardId();
+ } else {
+ shardId = _primary->getId();
+ }
+ auto& lsids = _shards[shardId];
+ lsids.insert(lsid);
+ if (lsids.size() > write_ops::kMaxWriteBatchSize) {
+ handleBatchHelper(_sessionsCollection, _opCtx, lsids);
+ _shards.erase(shardId);
+ }
+ }
+
+private:
+ OperationContext* _opCtx;
+ SessionsCollection* _sessionsCollection;
+ std::shared_ptr<ChunkManager> _cm;
+ std::shared_ptr<Shard> _primary;
+
+ stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
+};
+
+} // namespace
+
+std::unique_ptr<TransactionReaper> TransactionReaper::make(
+ Type type, std::shared_ptr<SessionsCollection> collection) {
+ switch (type) {
+ case Type::kReplicaSet:
+ return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection));
+ case Type::kSharded:
+ return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection));
+ }
+ MONGO_UNREACHABLE;
+}
+
+TransactionReaper::~TransactionReaper() = default;
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h
new file mode 100644
index 00000000000..fdec1083916
--- /dev/null
+++ b/src/mongo/db/transaction_reaper.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+namespace mongo {
+
+class ServiceContext;
+class SessionsCollection;
+class OperationContext;
+
+/**
+ * TransactionReaper is responsible for scanning the transaction table, checking if sessions are
+ * still alive and deleting the transaction records if their sessions have expired.
+ */
+class TransactionReaper {
+public:
+ enum class Type {
+ kReplicaSet,
+ kSharded,
+ };
+
+ virtual ~TransactionReaper() = 0;
+
+ virtual void reap(OperationContext* OperationContext) = 0;
+
+ /**
+ * The implementation of the sessions collections is different in replica sets versus sharded
+ * clusters, so we have a factory to pick the right impl.
+ */
+ static std::unique_ptr<TransactionReaper> make(Type type,
+ std::shared_ptr<SessionsCollection> collection);
+};
+
+} // namespace mongo