summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-18 18:11:49 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-09-05 10:37:59 -0400
commit273ed84bc7eb6cc0e38eca16088fc1a454f1cb28 (patch)
tree6ee8e5dcdedffc4b1ce0a48bf871248fc3984816
parentd2c2e6c73c424d5a28d5bd2a9031e4796a5e4371 (diff)
downloadmongo-273ed84bc7eb6cc0e38eca16088fc1a454f1cb28.tar.gz
SERVER-37837 Move `config.transactions` manipulation out of SessionsCollection
(cherry picked from commit dabbf059e6f96edb4898b42d532d460efd2510f2) (cherry picked from commit 2b40ec0f649def6e120b78510e8b008a43852a09)
-rw-r--r--src/mongo/db/SConscript14
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp46
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.cpp2
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp16
-rw-r--r--src/mongo/db/logical_session_cache_impl.h51
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/sessions_collection.h3
-rw-r--r--src/mongo/db/sessions_collection_mock.h5
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp22
-rw-r--r--src/mongo/db/sessions_collection_rs.h18
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp5
-rw-r--r--src/mongo/db/sessions_collection_sharded.h3
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp5
-rw-r--r--src/mongo/db/sessions_collection_standalone.h3
-rw-r--r--src/mongo/db/transaction_reaper.cpp106
-rw-r--r--src/mongo/db/transaction_reaper_test.cpp104
-rw-r--r--src/mongo/embedded/logical_session_cache_factory_embedded.cpp2
18 files changed, 222 insertions, 189 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 76c666b0f0d..b7ba27da87c 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1397,20 +1397,24 @@ envWithAsio.CppUnitTest(
target='logical_session_cache_test',
source=[
'logical_session_cache_test.cpp',
+ 'transaction_reaper_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
'auth/authmocks',
- 'keys_collection_manager',
+ 'dbdirectclient',
'keys_collection_document',
+ 'keys_collection_manager',
'logical_clock',
- 'logical_session_id',
- 'logical_session_id_helpers',
- 'logical_session_cache',
'logical_session_cache_impl',
+ 'logical_session_id_helpers',
+ 'logical_session_id',
+ 'repl/replmocks',
+ 'service_context_d_test_fixture',
'service_liaison_mock',
- 'service_context_test_fixture',
'sessions_collection_mock',
+ 'transaction_reaper',
],
)
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 8eafdc8bf57..0b7544adc18 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -48,30 +48,23 @@
namespace mongo {
-namespace {
-
-std::shared_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) {
- switch (state) {
- case LogicalSessionCacheServer::kSharded:
- return std::make_shared<SessionsCollectionSharded>();
- case LogicalSessionCacheServer::kConfigServer:
- return std::make_shared<SessionsCollectionConfigServer>();
- case LogicalSessionCacheServer::kReplicaSet:
- return std::make_shared<SessionsCollectionRS>();
- case LogicalSessionCacheServer::kStandalone:
- return std::make_shared<SessionsCollectionStandalone>();
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-} // namespace
-
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) {
auto liaison = stdx::make_unique<ServiceLiaisonMongod>();
- // Set up the logical session cache
- auto sessionsColl = makeSessionsCollection(state);
+ auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> {
+ switch (state) {
+ case LogicalSessionCacheServer::kSharded:
+ return std::make_shared<SessionsCollectionSharded>();
+ case LogicalSessionCacheServer::kConfigServer:
+ return std::make_shared<SessionsCollectionConfigServer>();
+ case LogicalSessionCacheServer::kReplicaSet:
+ return std::make_shared<SessionsCollectionRS>();
+ case LogicalSessionCacheServer::kStandalone:
+ return std::make_shared<SessionsCollectionStandalone>();
+ }
+
+ MONGO_UNREACHABLE;
+ }();
auto reaper = [&]() -> std::shared_ptr<TransactionReaper> {
switch (state) {
@@ -79,15 +72,16 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach
return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl);
case LogicalSessionCacheServer::kReplicaSet:
return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl);
- default:
+ case LogicalSessionCacheServer::kConfigServer:
+ case LogicalSessionCacheServer::kStandalone:
return nullptr;
}
+
+ MONGO_UNREACHABLE;
}();
- return stdx::make_unique<LogicalSessionCacheImpl>(std::move(liaison),
- std::move(sessionsColl),
- std::move(reaper),
- LogicalSessionCacheImpl::Options{});
+ return stdx::make_unique<LogicalSessionCacheImpl>(
+ std::move(liaison), std::move(sessionsColl), std::move(reaper));
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp
index 078858d4569..2f3c297b202 100644
--- a/src/mongo/db/logical_session_cache_factory_mongos.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongos.cpp
@@ -47,7 +47,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() {
auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{});
+ std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 9d9b1498776..c2b282baf3f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -72,11 +72,8 @@ constexpr Milliseconds LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
LogicalSessionCacheImpl::LogicalSessionCacheImpl(
std::unique_ptr<ServiceLiaison> service,
std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper,
- Options options)
- : _refreshInterval(options.refreshInterval),
- _sessionTimeout(options.sessionTimeout),
- _service(std::move(service)),
+ std::shared_ptr<TransactionReaper> transactionReaper)
+ : _service(std::move(service)),
_sessionsColl(std::move(collection)),
_transactionReaper(std::move(transactionReaper)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
@@ -85,11 +82,12 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
if (!disableLogicalSessionCacheRefresh) {
_service->scheduleJob({"LogicalSessionCacheRefresh",
[this](Client* client) { _periodicRefresh(client); },
- _refreshInterval});
+ Milliseconds(logicalSessionRefreshMillis)});
+
if (_transactionReaper) {
_service->scheduleJob({"LogicalSessionCacheReap",
[this](Client* client) { _periodicReap(client); },
- _refreshInterval});
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
}
@@ -98,8 +96,8 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
try {
_service->join();
} catch (...) {
- // If we failed to join we might still be running a background thread,
- // log but swallow the error since there is no good way to recover.
+ // If we failed to join we might still be running a background thread, log but swallow the
+ // error since there is no good way to recover
severe() << "Failed to join background service thread";
}
}
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 4d2299a157f..9158da1397a 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -47,52 +47,30 @@ class Client;
class OperationContext;
class ServiceContext;
-extern int logicalSessionRefreshMillis;
-
/**
* A thread-safe cache structure for logical session records.
*
- * The cache takes ownership of the passed-in ServiceLiaison and
- * SessionsCollection helper types.
+ * The cache takes ownership of the passed-in ServiceLiaison and SessionsCollection helper types.
+ *
+ * Uses the following service-wide parameters:
+ * - A timeout value to use for sessions in the cache, in minutes. Defaults to 30 minutes.
+ * --setParameter localLogicalSessionTimeoutMinutes=X
+ *
+ * - The interval over which the cache will refresh session records. By default, this is set to
+ * every 5 minutes (300,000). If the caller is setting the sessionTimeout by hand, it is
+ * suggested that they consider also setting the refresh interval accordingly.
+ * --setParameter logicalSessionRefreshMillis=X.
*/
class LogicalSessionCacheImpl final : public LogicalSessionCache {
public:
static constexpr Milliseconds kLogicalSessionDefaultRefresh = Milliseconds(5 * 60 * 1000);
/**
- * An Options type to support the LogicalSessionCacheImpl.
- */
- struct Options {
- Options(){};
-
- /**
- * A timeout value to use for sessions in the cache, in minutes.
- *
- * By default, this is set to 30 minutes.
- *
- * May be set with --setParameter localLogicalSessionTimeoutMinutes=X.
- */
- Minutes sessionTimeout = Minutes(localLogicalSessionTimeoutMinutes);
-
- /**
- * The interval over which the cache will refresh session records.
- *
- * By default, this is set to every 5 minutes (300,000). If the caller
- * is setting the sessionTimeout by hand, it is suggested that they
- * consider also setting the refresh interval accordingly.
- *
- * May be set with --setParameter logicalSessionRefreshMillis=X.
- */
- Milliseconds refreshInterval = Milliseconds(logicalSessionRefreshMillis);
- };
-
- /**
* Construct a new session cache.
*/
- explicit LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper,
- Options options = Options{});
+ LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ std::shared_ptr<TransactionReaper> transactionReaper);
LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete;
LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete;
@@ -150,9 +128,6 @@ private:
*/
Status _addToCache(LogicalSessionRecord record);
- const Milliseconds _refreshInterval;
- const Minutes _sessionTimeout;
-
// This value is only modified under the lock, and is modified
// automatically by the background jobs.
LogicalSessionCacheStats _stats;
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index fe654e6adc3..8bbf986d638 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -55,7 +55,7 @@ namespace mongo {
namespace {
const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout);
-const Milliseconds kForceRefresh = LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
+const Milliseconds kForceRefresh{LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh};
using SessionList = std::list<LogicalSessionId>;
using unittest::EnsureFCV;
@@ -80,7 +80,7 @@ public:
auto mockService = stdx::make_unique<MockServiceLiaison>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache = stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(mockService), std::move(mockSessions), nullptr);
+ std::move(mockService), std::move(mockSessions), nullptr /* reaper */);
}
void waitUntilRefreshScheduled() {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 94005d84f5d..7decacbdc96 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -281,8 +281,6 @@ public:
*/
void alwaysAllowWrites(bool allowWrites);
- void setMaster(bool isMaster);
-
virtual ServiceContext* getServiceContext() override {
return _service;
}
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 5cb0c595419..5daabee3d82 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -81,9 +81,6 @@ public:
*/
virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
- virtual Status removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) = 0;
-
/**
* Checks a set of lsids and returns the set that no longer exists
*
diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h
index c7cb1ff5338..6bee96cf6af 100644
--- a/src/mongo/db/sessions_collection_mock.h
+++ b/src/mongo/db/sessions_collection_mock.h
@@ -128,11 +128,6 @@ public:
return _impl->findRemovedSessions(opCtx, sessions);
}
- Status removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) override {
- return Status::OK();
- }
-
private:
std::shared_ptr<MockSessionsCollectionImpl> _impl;
};
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index e916fda9d6c..c0c33539772 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -259,26 +259,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
});
}
-Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- return dispatch(
- NamespaceString::kSessionTransactionsTableNamespace,
- opCtx,
- [&] {
- DBDirectClient client(opCtx);
- return doRemove(NamespaceString::kSessionTransactionsTableNamespace,
- sessions,
- makeSendFnForBatchWrite(
- NamespaceString::kSessionTransactionsTableNamespace, &client));
- },
- [](DBClientBase*) {
- return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records");
- });
-}
-
-Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h
index 0dfe92fe301..f79c704b04f 100644
--- a/src/mongo/db/sessions_collection_rs.h
+++ b/src/mongo/db/sessions_collection_rs.h
@@ -90,24 +90,6 @@ public:
*/
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
-
- /**
- * Removes the transaction records for the specified sessions from the
- * transaction table.
- *
- * If a step-down happens on this node as this method is running, it may fail.
- */
- Status removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) override;
-
- /**
- * Helper for a shard server to run its transaction operations as a replica set
- * member.
- *
- * If a step-down happens on this node as this method is running, it may fail.
- */
- static Status removeTransactionRecordsHelper(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index a4f5735954b..bae7f3ef8c6 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -164,9 +164,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return doFetch(kSessionsNamespaceString, sessions, send);
}
-Status SessionsCollectionSharded::removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- return SessionsCollectionRS::removeTransactionRecordsHelper(opCtx, sessions);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index 39e1ff29087..112c56aae26 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -72,9 +72,6 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
- Status removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) override;
-
protected:
Status _checkCacheForSessionsCollection(OperationContext* opCtx);
};
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 0cc7a105353..67d61e0c8a2 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -122,9 +122,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSession
makeFindFnForCommand(kSessionsNamespaceString, &client));
}
-Status SessionsCollectionStandalone::removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- MONGO_UNREACHABLE;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
index ac45fe33336..81c32c9c817 100644
--- a/src/mongo/db/sessions_collection_standalone.h
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -68,9 +68,6 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
-
- Status removeTransactionRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index 2460284f21e..b50f7eecc77 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -33,9 +33,6 @@
#include "mongo/db/transaction_reaper.h"
#include "mongo/bson/bsonmisc.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/client.h"
-#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops.h"
@@ -48,7 +45,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/stdx/memory.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -108,24 +105,21 @@ public:
: _collection(std::move(collection)) {}
int reap(OperationContext* opCtx) override {
- auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx);
-
Handler handler(opCtx, *_collection);
if (!handler.initialize()) {
return 0;
}
- AutoGetCollection autoColl(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
-
- // Only start reaping if the shard or config server node is currently the primary
- if (!coord->canAcceptWritesForDatabase(
+ // Make a best-effort attempt to only reap when the node is running as a primary
+ const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
+ if (!coord->canAcceptWritesForDatabase_UNSAFE(
opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
return 0;
}
DBDirectClient client(opCtx);
+ // Fill all stale config.transactions entries
auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
auto cursor = client.query(
NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection);
@@ -149,35 +143,60 @@ private:
* Removes the specified set of session ids from the persistent sessions collection and returns the
* number of sessions actually removed.
*/
-int removeSessionsRecords(OperationContext* opCtx,
- SessionsCollection& sessionsCollection,
- const LogicalSessionIdSet& sessionIdsToRemove) {
+int removeSessionsTransactionRecords(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ const LogicalSessionIdSet& sessionIdsToRemove) {
if (sessionIdsToRemove.empty()) {
return 0;
}
- Locker* locker = opCtx->lockState();
-
- Locker::LockSnapshot snapshot;
- invariant(locker->saveLockStateAndUnlock(&snapshot));
-
- const auto guard = MakeGuard([&] {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- locker->restoreLockState(opCtx, snapshot);
- });
-
- // Top-level locks are freed, release any potential low-level (storage engine-specific
- // locks). If we are yielding, we are at a safe place to do so.
- opCtx->recoveryUnit()->abandonSnapshot();
-
- // Track the number of yields in CurOp.
- CurOp::get(opCtx)->yielded();
-
- auto removed =
+ // From the passed-in sessions, find the ones which are actually expired/removed
+ auto expiredSessionIds =
uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
- uassertStatusOK(sessionsCollection.removeTransactionRecords(opCtx, removed));
- return removed.size();
+ DBDirectClient client(opCtx);
+ int numDeleted = 0;
+
+ for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) {
+ write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ deleteOp.setDeletes([&] {
+ // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
+ // size limit
+ const int kMaxBatchSize = 10'000;
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) {
+ entries.emplace_back([&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()));
+ entry.setMulti(false);
+ return entry;
+ }());
+ }
+
+ return entries;
+ }());
+
+ BSONObj result;
+ client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
+ deleteOp.toBSON({}),
+ result);
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse response " << result,
+ response.parseBSON(result, &errmsg));
+ uassertStatusOK(response.getTopLevelStatus());
+
+ numDeleted += response.getN();
+ }
+
+ return numDeleted;
}
/**
@@ -195,8 +214,8 @@ public:
void handleLsid(const LogicalSessionId& lsid) {
_batch.insert(lsid);
- if (_batch.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch);
+ if (_batch.size() >= write_ops::kMaxWriteBatchSize) {
+ _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
_batch.clear();
}
}
@@ -205,7 +224,7 @@ public:
invariant(!_finalized);
_finalized = true;
- _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch);
+ _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
return _numReaped;
}
@@ -240,14 +259,18 @@ public:
void handleLsid(const LogicalSessionId& lsid) {
invariant(_cm);
+
+ // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain
+ // batches of lsids, which only fall on the same shard, so that the query to check whether
+ // they are alive doesn't need to do cross-shard scatter/gather queries
const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
- const auto shardId = chunk.getShardId();
+ const auto& shardId = chunk.getShardId();
auto& lsids = _shards[shardId];
lsids.insert(lsid);
- if (lsids.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, lsids);
+ if (lsids.size() >= write_ops::kMaxWriteBatchSize) {
+ _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids);
_shards.erase(shardId);
}
}
@@ -257,7 +280,8 @@ public:
_finalized = true;
for (const auto& pair : _shards) {
- _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, pair.second);
+ _numReaped +=
+ removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second);
}
return _numReaped;
diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/transaction_reaper_test.cpp
new file mode 100644
index 00000000000..fe78891ef13
--- /dev/null
+++ b/src/mongo/db/transaction_reaper_test.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/sessions_collection_mock.h"
+#include "mongo/db/transaction_reaper.h"
+#include "mongo/util/clock_source_mock.h"
+
+namespace mongo {
+namespace {
+
+class TransactionReaperTest : public ServiceContextMongoDTest {
+protected:
+ void setUp() override {
+ const auto service = getServiceContext();
+ auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+ service->setFastClockSource(std::make_unique<ClockSourceMock>());
+ }
+
+ ClockSourceMock* clock() {
+ return dynamic_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource());
+ }
+
+ ServiceContext::UniqueOperationContext _uniqueOpCtx{makeOperationContext()};
+ OperationContext* _opCtx{_uniqueOpCtx.get()};
+
+ std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{
+ std::make_shared<MockSessionsCollectionImpl>()};
+
+ std::unique_ptr<TransactionReaper> _reaper{
+ TransactionReaper::make(TransactionReaper::Type::kReplicaSet,
+ std::make_shared<MockSessionsCollection>(_collectionMock))};
+};
+
+TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
+ _collectionMock->add([&] {
+ LogicalSessionRecord rec;
+ rec.setId(makeLogicalSessionIdForTest());
+ rec.setLastUse(clock()->now());
+ return rec;
+ }());
+
+ _collectionMock->add([&] {
+ LogicalSessionRecord rec;
+ rec.setId(makeLogicalSessionIdForTest());
+ rec.setLastUse(clock()->now());
+ return rec;
+ }());
+
+ DBDirectClient client(_opCtx);
+ SessionTxnRecord txn1;
+ txn1.setSessionId(makeLogicalSessionIdForTest());
+ txn1.setTxnNum(100);
+ txn1.setLastWriteOpTime(repl::OpTime(Timestamp(100), 1));
+ txn1.setLastWriteDate(clock()->now());
+ SessionTxnRecord txn2;
+ txn2.setSessionId(makeLogicalSessionIdForTest());
+ txn2.setTxnNum(200);
+ txn2.setLastWriteOpTime(repl::OpTime(Timestamp(200), 1));
+ txn2.setLastWriteDate(clock()->now());
+
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ std::vector<BSONObj>{txn1.toBSON(), txn2.toBSON()});
+
+ clock()->advance(Minutes{31});
+ ASSERT_EQ(2, _reaper->reap(_opCtx));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
index 1a22a713c45..7f68667c466 100644
--- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
+++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
@@ -51,7 +51,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() {
auto sessionsColl = std::make_shared<SessionsCollectionStandalone>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{});
+ std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
}
} // namespace mongo