diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-05-01 13:39:24 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-06-01 13:00:36 -0400 |
commit | 771c301826e071c49484b8100223f2fcc923b04f (patch) | |
tree | dd45a67a6e0e3f7c6cd886ff86b6f3c1998dd1c5 | |
parent | d130db3272bb991c0b4ad1500a766ac72b4c4e0c (diff) | |
download | mongo-771c301826e071c49484b8100223f2fcc923b04f.tar.gz |
SERVER-34773 Do not require `finalize()` to be have been called on the TransactionReaper handlers
(cherry picked from commit bfac93c92c75d6d6c85b2b4653e3604b4384632b)
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 97 |
1 files changed, 45 insertions, 52 deletions
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index 34106678017..b1b05946eb2 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -31,8 +31,8 @@ #include "mongo/db/transaction_reaper.h" #include "mongo/bson/bsonmisc.h" +#include "mongo/db/catalog/catalog_raii.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" @@ -47,7 +47,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" -#include "mongo/util/destructor_guard.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -107,30 +106,29 @@ public: : _collection(std::move(collection)) {} int reap(OperationContext* opCtx) override { - Handler handler(opCtx, *_collection); + auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx); - Lock::DBLock lk(opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IS); - Lock::CollectionLock lock( - opCtx->lockState(), NamespaceString::kSessionTransactionsTableNamespace.ns(), MODE_IS); + AutoGetCollection autoColl( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); - auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); - if (coord->canAcceptWritesForDatabase( + // Only start reaping if the shard or config server node is currently the primary + if (!coord->canAcceptWritesForDatabase( opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { - DBDirectClient client(opCtx); + return 0; + } + + Handler handler(opCtx, *_collection); + DBDirectClient client(opCtx); - auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), - query, - 0, - 0, - &kIdProjection); + auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); + auto cursor = client.query( + NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection); - while (cursor->more()) { - auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( - "TransactionSession"_sd, cursor->next()); + while (cursor->more()) { + auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( + "TransactionSession"_sd, cursor->next()); - handler.handleLsid(transactionSession.get_id()); - } + handler.handleLsid(transactionSession.get_id()); } // Before the handler goes out of scope, flush its last batch to disk and collect stats. @@ -141,10 +139,14 @@ private: std::shared_ptr<SessionsCollection> _collection; }; -int handleBatchHelper(OperationContext* opCtx, - SessionsCollection& sessionsCollection, - const LogicalSessionIdSet& batch) { - if (batch.empty()) { +/** + * Removes the specified set of session ids from the persistent sessions collection and returns the + * number of sessions actually removed. + */ +int removeSessionsRecords(OperationContext* opCtx, + SessionsCollection& sessionsCollection, + const LogicalSessionIdSet& sessionIdsToRemove) { + if (sessionIdsToRemove.empty()) { return 0; } @@ -162,7 +164,8 @@ int handleBatchHelper(OperationContext* opCtx, // Track the number of yields in CurOp. CurOp::get(opCtx)->yielded(); - auto removed = uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, batch)); + auto removed = + uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); uassertStatusOK(sessionsCollection.removeTransactionRecords(opCtx, removed)); return removed.size(); @@ -174,27 +177,22 @@ int handleBatchHelper(OperationContext* opCtx, class ReplHandler { public: ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), - _sessionsCollection(sessionsCollection), - _numReaped(0), - _finalized(false) {} - - ~ReplHandler() { - invariant(_finalized.load()); - } + : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); if (_batch.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); _batch.clear(); } } int finalize() { - invariant(!_finalized.swap(true)); - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch); + invariant(!_finalized); + _finalized = true; + + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); return _numReaped; } @@ -204,9 +202,9 @@ private: LogicalSessionIdSet _batch; - int _numReaped; + int _numReaped{0}; - AtomicBool _finalized; + bool _finalized{false}; }; /** @@ -216,14 +214,7 @@ private: class ShardedHandler { public: ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), - _sessionsCollection(sessionsCollection), - _numReaped(0), - _finalized(false) {} - - ~ShardedHandler() { - invariant(_finalized.load()); - } + : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} void handleLsid(const LogicalSessionId& lsid) { // There are some lifetime issues with when the reaper starts up versus when the grid is @@ -250,16 +241,19 @@ public: auto& lsids = _shards[shardId]; lsids.insert(lsid); + if (lsids.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, lsids); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, lsids); _shards.erase(shardId); } } int finalize() { - invariant(!_finalized.swap(true)); + invariant(!_finalized); + _finalized = true; + for (const auto& pair : _shards) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, pair.second); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, pair.second); } return _numReaped; @@ -272,11 +266,10 @@ private: std::shared_ptr<ChunkManager> _cm; std::shared_ptr<Shard> _primary; - int _numReaped; - stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; + int _numReaped{0}; - AtomicBool _finalized; + bool _finalized{false}; }; } // namespace |