summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_reaper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_reaper.cpp')
-rw-r--r--src/mongo/db/transaction_reaper.cpp52
1 files changed, 14 insertions, 38 deletions
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index a3b804a839d..f9187cb9e82 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -40,7 +40,6 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection.h"
-#include "mongo/platform/atomic_word.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -105,7 +104,7 @@ public:
TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
: _collection(std::move(collection)) {}
- int reap(OperationContext* opCtx) override {
+ void reap(OperationContext* opCtx) override {
Handler handler(opCtx, _collection.get());
Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IS);
@@ -130,21 +129,17 @@ public:
handler.handleLsid(transactionSession.get_id());
}
}
-
- // Before the handler goes out of scope, flush its last batch to disk and collect stats.
- return handler.finalize();
}
private:
std::shared_ptr<SessionsCollection> _collection;
};
-int handleBatchHelper(SessionsCollection* sessionsCollection,
- OperationContext* opCtx,
- const LogicalSessionIdSet& batch) {
+void handleBatchHelper(SessionsCollection* sessionsCollection,
+ OperationContext* opCtx,
+ const LogicalSessionIdSet& batch) {
auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch));
uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed));
- return removed.size();
}
/**
@@ -153,35 +148,25 @@ int handleBatchHelper(SessionsCollection* sessionsCollection,
class ReplHandler {
public:
ReplHandler(OperationContext* opCtx, SessionsCollection* collection)
- : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
+ : _opCtx(opCtx), _sessionsCollection(collection) {}
~ReplHandler() {
- invariant(_finalized.load());
+ DESTRUCTOR_GUARD([&] { handleBatchHelper(_sessionsCollection, _opCtx, _batch); }());
}
void handleLsid(const LogicalSessionId& lsid) {
_batch.insert(lsid);
if (_batch.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
+ handleBatchHelper(_sessionsCollection, _opCtx, _batch);
_batch.clear();
}
}
- int finalize() {
- invariant(!_finalized.swap(true));
- _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
- return _numReaped;
- }
-
private:
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
LogicalSessionIdSet _batch;
-
- int _numReaped;
-
- AtomicBool _finalized;
};
/**
@@ -191,10 +176,14 @@ private:
class ShardedHandler {
public:
ShardedHandler(OperationContext* opCtx, SessionsCollection* collection)
- : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
+ : _opCtx(opCtx), _sessionsCollection(collection) {}
~ShardedHandler() {
- invariant(_finalized.load());
+ DESTRUCTOR_GUARD([&] {
+ for (const auto& pair : _shards) {
+ handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
+ }
+ }());
}
void handleLsid(const LogicalSessionId& lsid) {
@@ -221,31 +210,18 @@ public:
auto& lsids = _shards[shardId];
lsids.insert(lsid);
if (lsids.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, lsids);
+ handleBatchHelper(_sessionsCollection, _opCtx, lsids);
_shards.erase(shardId);
}
}
- int finalize() {
- invariant(!_finalized.swap(true));
- for (const auto& pair : _shards) {
- _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
- }
-
- return _numReaped;
- }
-
private:
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
std::shared_ptr<ChunkManager> _cm;
std::shared_ptr<Shard> _primary;
- int _numReaped;
-
stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
-
- AtomicBool _finalized;
};
} // namespace