#include "mongo/platform/basic.h"
#include "mongo/db/transaction_reaper.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
constexpr Minutes kTransactionRecordMinimumLifetime(30);
* The minimum lifetime for a transaction record is how long it has to have lived on the server
* before we'll consider it for cleanup. This is effectively the window for how long it is
* permissible for a mongos to hang before we're willing to accept a failure of the retryable write
* subsystem.
* Specifically, we imagine that a client connects to one mongos on a session and performs a
* retryable write. That mongos hangs. Then the client connects to a new mongos on the same
* session and successfully executes its write. After a day passes, the session will time out,
* cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
* executes the write (because all records of the session + transaction have been deleted).
* So the write is performed twice, which is unavoidable without losing session vivification and/or
* requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
* guarantee after the minimum transaction lifetime.
const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
* Makes the query we'll use to scan the transactions table.
* Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt
* to pull records likely to be on the same chunks (because they sort near each other).
Query makeQuery(Date_t now) {
const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes));
Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
return query;
* Our impl is templatized on a type which handles the lsids we see. It provides the top level
* scaffolding for figuring out if we're the primary node responsible for the transaction table and
* invoking the hanlder.
* The handler here will see all of the possibly expired txn ids in the transaction table and will
* have a lifetime associated with a single call to reap.
class TransactionReaperImpl final : public TransactionReaper {
TransactionReaperImpl(std::shared_ptr collection)
: _collection(std::move(collection)) {}
int reap(OperationContext* opCtx) override {
Handler handler(opCtx, _collection.get());
Lock::DBLock lk(opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IS);
Lock::CollectionLock lock(
opCtx->lockState(), NamespaceString::kSessionTransactionsTableNamespace.ns(), MODE_IS);
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
if (coord->canAcceptWritesForDatabase(
opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
DBDirectClient client(opCtx);
auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
while (cursor->more()) {
auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
"TransactionSession"_sd, cursor->next());
// Before the handler goes out of scope, flush its last batch to disk and collect stats.
return handler.finalize();
std::shared_ptr _collection;
int handleBatchHelper(SessionsCollection* sessionsCollection,
OperationContext* opCtx,
const LogicalSessionIdSet& batch) {
if (batch.empty()) {
return 0;
Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
const auto guard = MakeGuard([&] { locker->restoreLockState(snapshot); });
// Top-level locks are freed, release any potential low-level (storage engine-specific
// locks). If we are yielding, we are at a safe place to do so.
// Track the number of yields in CurOp.
auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch));
uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed));
return removed.size();
* The repl impl is simple, just pass along to the sessions collection for checking ids locally
class ReplHandler {
ReplHandler(OperationContext* opCtx, SessionsCollection* collection)
: _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
~ReplHandler() {
void handleLsid(const LogicalSessionId& lsid) {
if (_batch.size() > write_ops::kMaxWriteBatchSize) {
_numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
int finalize() {
_numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
return _numReaped;
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
LogicalSessionIdSet _batch;
int _numReaped;
AtomicBool _finalized;
* The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small
* scans.
class ShardedHandler {
ShardedHandler(OperationContext* opCtx, SessionsCollection* collection)
: _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
~ShardedHandler() {
void handleLsid(const LogicalSessionId& lsid) {
// There are some lifetime issues with when the reaper starts up versus when the grid is
// available. Moving routing info fetching until after we have a transaction moves us past
// the problem.
// Also, we should only need the chunk case, but that'll wait until the sessions table is
// actually sharded.
if (!(_cm || _primary)) {
auto routingInfo =
_opCtx, SessionsCollection::kSessionsFullNS));
_cm = routingInfo.cm();
_primary = routingInfo.primary();
ShardId shardId;
if (_cm) {
auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
shardId = chunk->getShardId();
} else {
shardId = _primary->getId();
auto& lsids = _shards[shardId];
if (lsids.size() > write_ops::kMaxWriteBatchSize) {
_numReaped += handleBatchHelper(_sessionsCollection, _opCtx, lsids);
int finalize() {
for (const auto& pair : _shards) {
_numReaped += handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
return _numReaped;
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
std::shared_ptr _cm;
std::shared_ptr _primary;
int _numReaped;
stdx::unordered_map _shards;
AtomicBool _finalized;
} // namespace
std::unique_ptr TransactionReaper::make(
Type type, std::shared_ptr collection) {
switch (type) {
case Type::kReplicaSet:
return stdx::make_unique>(std::move(collection));
case Type::kSharded:
return stdx::make_unique>(std::move(collection));
TransactionReaper::~TransactionReaper() = default;
} // namespace mongo