diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-05-01 13:39:24 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-05-02 10:56:09 -0400 |
commit | bfac93c92c75d6d6c85b2b4653e3604b4384632b (patch) | |
tree | 2837ae7bcb75582150e7994c59085cd3e79a2c01 | |
parent | 8200acfe03477cb6311768b3d9609cfb339a7eee (diff) | |
download | mongo-bfac93c92c75d6d6c85b2b4653e3604b4384632b.tar.gz |
SERVER-34773 Do not require `finalize()` to be have been called on the TransactionReaper handlers
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 97 |
1 files changed, 45 insertions, 52 deletions
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index 6a218c39131..9a8e1cbd074 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -31,8 +31,8 @@ #include "mongo/db/transaction_reaper.h" #include "mongo/bson/bsonmisc.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" @@ -47,7 +47,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" -#include "mongo/util/destructor_guard.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -107,30 +106,29 @@ public: : _collection(std::move(collection)) {} int reap(OperationContext* opCtx) override { - Handler handler(opCtx, *_collection); + auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx); - Lock::DBLock lk(opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IS); - Lock::CollectionLock lock( - opCtx->lockState(), NamespaceString::kSessionTransactionsTableNamespace.ns(), MODE_IS); + AutoGetCollection autoColl( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); - auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); - if (coord->canAcceptWritesForDatabase( + // Only start reaping if the shard or config server node is currently the primary + if (!coord->canAcceptWritesForDatabase( opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { - DBDirectClient client(opCtx); + return 0; + } + + Handler handler(opCtx, *_collection); + DBDirectClient client(opCtx); - auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), - query, - 0, - 0, - &kIdProjection); + auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); + auto cursor = client.query( + NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection); - while (cursor->more()) { - auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( - "TransactionSession"_sd, cursor->next()); + while (cursor->more()) { + auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( + "TransactionSession"_sd, cursor->next()); - handler.handleLsid(transactionSession.get_id()); - } + handler.handleLsid(transactionSession.get_id()); } // Before the handler goes out of scope, flush its last batch to disk and collect stats. @@ -141,10 +139,14 @@ private: std::shared_ptr<SessionsCollection> _collection; }; -int handleBatchHelper(OperationContext* opCtx, - SessionsCollection& sessionsCollection, - const LogicalSessionIdSet& batch) { - if (batch.empty()) { +/** + * Removes the specified set of session ids from the persistent sessions collection and returns the + * number of sessions actually removed. + */ +int removeSessionsRecords(OperationContext* opCtx, + SessionsCollection& sessionsCollection, + const LogicalSessionIdSet& sessionIdsToRemove) { + if (sessionIdsToRemove.empty()) { return 0; } @@ -165,7 +167,8 @@ int handleBatchHelper(OperationContext* opCtx, // Track the number of yields in CurOp. CurOp::get(opCtx)->yielded(); - auto removed = uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, batch)); + auto removed = + uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); uassertStatusOK(sessionsCollection.removeTransactionRecords(opCtx, removed)); return removed.size(); @@ -177,27 +180,22 @@ int handleBatchHelper(OperationContext* opCtx, class ReplHandler { public: ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), - _sessionsCollection(sessionsCollection), - _numReaped(0), - _finalized(false) {} - - ~ReplHandler() { - invariant(_finalized.load()); - } + : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); if (_batch.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); _batch.clear(); } } int finalize() { - invariant(!_finalized.swap(true)); - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch); + invariant(!_finalized); + _finalized = true; + + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); return _numReaped; } @@ -207,9 +205,9 @@ private: LogicalSessionIdSet _batch; - int _numReaped; + int _numReaped{0}; - AtomicBool _finalized; + bool _finalized{false}; }; /** @@ -219,14 +217,7 @@ private: class ShardedHandler { public: ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), - _sessionsCollection(sessionsCollection), - _numReaped(0), - _finalized(false) {} - - ~ShardedHandler() { - invariant(_finalized.load()); - } + : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} void handleLsid(const LogicalSessionId& lsid) { // There are some lifetime issues with when the reaper starts up versus when the grid is @@ -253,16 +244,19 @@ public: auto& lsids = _shards[shardId]; lsids.insert(lsid); + if (lsids.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, lsids); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, lsids); _shards.erase(shardId); } } int finalize() { - invariant(!_finalized.swap(true)); + invariant(!_finalized); + _finalized = true; + for (const auto& pair : _shards) { - _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, pair.second); + _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, pair.second); } return _numReaped; @@ -275,11 +269,10 @@ private: std::shared_ptr<ChunkManager> _cm; std::shared_ptr<Shard> _primary; - int _numReaped; - stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; + int _numReaped{0}; - AtomicBool _finalized; + bool _finalized{false}; }; } // namespace |