/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/transaction_reaper.h" #include "mongo/bson/bsonmisc.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/server_parameters.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/sessions_collection.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace { constexpr Minutes kTransactionRecordMinimumLifetime(30); /** * The minimum lifetime for a transaction record is how long it has to have lived on the server * before we'll consider it for cleanup. This is effectively the window for how long it is * permissible for a mongos to hang before we're willing to accept a failure of the retryable write * subsystem. * * Specifically, we imagine that a client connects to one mongos on a session and performs a * retryable write. That mongos hangs. Then the client connects to a new mongos on the same * session and successfully executes its write. After a day passes, the session will time out, * cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and * executes the write (because all records of the session + transaction have been deleted). * * So the write is performed twice, which is unavoidable without losing session vivification and/or * requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker * guarantee after the minimum transaction lifetime. */ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes, int, kTransactionRecordMinimumLifetime.count()); const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; /** * Makes the query we'll use to scan the transactions table. * * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt * to pull records likely to be on the same chunks (because they sort near each other). */ Query makeQuery(Date_t now) { const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes)); Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); query.sort(kSortById); return query; } /** * Our impl is templatized on a type which handles the lsids we see. It provides the top level * scaffolding for figuring out if we're the primary node responsible for the transaction table and * invoking the hanlder. * * The handler here will see all of the possibly expired txn ids in the transaction table and will * have a lifetime associated with a single call to reap. */ template class TransactionReaperImpl final : public TransactionReaper { public: TransactionReaperImpl(std::shared_ptr collection) : _collection(std::move(collection)) {} int reap(OperationContext* opCtx) override { Handler handler(opCtx, _collection.get()); Lock::DBLock lk(opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IS); Lock::CollectionLock lock( opCtx->lockState(), NamespaceString::kSessionTransactionsTableNamespace.ns(), MODE_IS); auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); if (coord->canAcceptWritesForDatabase( opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { DBDirectClient client(opCtx); auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection); while (cursor->more()) { auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( "TransactionSession"_sd, cursor->next()); handler.handleLsid(transactionSession.get_id()); } } // Before the handler goes out of scope, flush its last batch to disk and collect stats. return handler.finalize(); } private: std::shared_ptr _collection; }; int handleBatchHelper(SessionsCollection* sessionsCollection, OperationContext* opCtx, const LogicalSessionIdSet& batch) { if (batch.empty()) { return 0; } Locker* locker = opCtx->lockState(); Locker::LockSnapshot snapshot; invariant(locker->saveLockStateAndUnlock(&snapshot)); const auto guard = MakeGuard([&] { locker->restoreLockState(snapshot); }); // Top-level locks are freed, release any potential low-level (storage engine-specific // locks). If we are yielding, we are at a safe place to do so. opCtx->recoveryUnit()->abandonSnapshot(); // Track the number of yields in CurOp. CurOp::get(opCtx)->yielded(); auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch)); uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed)); return removed.size(); } /** * The repl impl is simple, just pass along to the sessions collection for checking ids locally */ class ReplHandler { public: ReplHandler(OperationContext* opCtx, SessionsCollection* collection) : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {} ~ReplHandler() { invariant(_finalized.load()); } void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); if (_batch.size() > write_ops::kMaxWriteBatchSize) { _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch); _batch.clear(); } } int finalize() { invariant(!_finalized.swap(true)); _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch); return _numReaped; } private: OperationContext* _opCtx; SessionsCollection* _sessionsCollection; LogicalSessionIdSet _batch; int _numReaped; AtomicBool _finalized; }; /** * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small * scans. */ class ShardedHandler { public: ShardedHandler(OperationContext* opCtx, SessionsCollection* collection) : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {} ~ShardedHandler() { invariant(_finalized.load()); } void handleLsid(const LogicalSessionId& lsid) { // There are some lifetime issues with when the reaper starts up versus when the grid is // available. Moving routing info fetching until after we have a transaction moves us past // the problem. // // Also, we should only need the chunk case, but that'll wait until the sessions table is // actually sharded. if (!(_cm || _primary)) { auto routingInfo = uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo( _opCtx, NamespaceString(SessionsCollection::kSessionsFullNS))); _cm = routingInfo.cm(); _primary = routingInfo.primary(); } ShardId shardId; if (_cm) { const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); shardId = chunk->getShardId(); } else { shardId = _primary->getId(); } auto& lsids = _shards[shardId]; lsids.insert(lsid); if (lsids.size() > write_ops::kMaxWriteBatchSize) { _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, lsids); _shards.erase(shardId); } } int finalize() { invariant(!_finalized.swap(true)); for (const auto& pair : _shards) { _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, pair.second); } return _numReaped; } private: OperationContext* _opCtx; SessionsCollection* _sessionsCollection; std::shared_ptr _cm; std::shared_ptr _primary; int _numReaped; stdx::unordered_map _shards; AtomicBool _finalized; }; } // namespace std::unique_ptr TransactionReaper::make( Type type, std::shared_ptr collection) { switch (type) { case Type::kReplicaSet: return stdx::make_unique>(std::move(collection)); case Type::kSharded: return stdx::make_unique>(std::move(collection)); } MONGO_UNREACHABLE; } TransactionReaper::~TransactionReaper() = default; } // namespace mongo