summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-05-01 13:39:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-05-02 10:56:09 -0400
commitbfac93c92c75d6d6c85b2b4653e3604b4384632b (patch)
tree2837ae7bcb75582150e7994c59085cd3e79a2c01
parent8200acfe03477cb6311768b3d9609cfb339a7eee (diff)
downloadmongo-bfac93c92c75d6d6c85b2b4653e3604b4384632b.tar.gz
SERVER-34773 Do not require `finalize()` to be have been called on the TransactionReaper handlers
-rw-r--r--src/mongo/db/transaction_reaper.cpp97
1 files changed, 45 insertions, 52 deletions
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index 6a218c39131..9a8e1cbd074 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -31,8 +31,8 @@
#include "mongo/db/transaction_reaper.h"
#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
-#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
@@ -47,7 +47,6 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
-#include "mongo/util/destructor_guard.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -107,30 +106,29 @@ public:
: _collection(std::move(collection)) {}
int reap(OperationContext* opCtx) override {
- Handler handler(opCtx, *_collection);
+ auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- Lock::DBLock lk(opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IS);
- Lock::CollectionLock lock(
- opCtx->lockState(), NamespaceString::kSessionTransactionsTableNamespace.ns(), MODE_IS);
+ AutoGetCollection autoColl(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
- auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (coord->canAcceptWritesForDatabase(
+ // Only start reaping if the shard or config server node is currently the primary
+ if (!coord->canAcceptWritesForDatabase(
opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
- DBDirectClient client(opCtx);
+ return 0;
+ }
+
+ Handler handler(opCtx, *_collection);
+ DBDirectClient client(opCtx);
- auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- query,
- 0,
- 0,
- &kIdProjection);
+ auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
+ auto cursor = client.query(
+ NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection);
- while (cursor->more()) {
- auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
- "TransactionSession"_sd, cursor->next());
+ while (cursor->more()) {
+ auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
+ "TransactionSession"_sd, cursor->next());
- handler.handleLsid(transactionSession.get_id());
- }
+ handler.handleLsid(transactionSession.get_id());
}
// Before the handler goes out of scope, flush its last batch to disk and collect stats.
@@ -141,10 +139,14 @@ private:
std::shared_ptr<SessionsCollection> _collection;
};
-int handleBatchHelper(OperationContext* opCtx,
- SessionsCollection& sessionsCollection,
- const LogicalSessionIdSet& batch) {
- if (batch.empty()) {
+/**
+ * Removes the specified set of session ids from the persistent sessions collection and returns the
+ * number of sessions actually removed.
+ */
+int removeSessionsRecords(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ const LogicalSessionIdSet& sessionIdsToRemove) {
+ if (sessionIdsToRemove.empty()) {
return 0;
}
@@ -165,7 +167,8 @@ int handleBatchHelper(OperationContext* opCtx,
// Track the number of yields in CurOp.
CurOp::get(opCtx)->yielded();
- auto removed = uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, batch));
+ auto removed =
+ uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
uassertStatusOK(sessionsCollection.removeTransactionRecords(opCtx, removed));
return removed.size();
@@ -177,27 +180,22 @@ int handleBatchHelper(OperationContext* opCtx,
class ReplHandler {
public:
ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx),
- _sessionsCollection(sessionsCollection),
- _numReaped(0),
- _finalized(false) {}
-
- ~ReplHandler() {
- invariant(_finalized.load());
- }
+ : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
void handleLsid(const LogicalSessionId& lsid) {
_batch.insert(lsid);
if (_batch.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch);
+ _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch);
_batch.clear();
}
}
int finalize() {
- invariant(!_finalized.swap(true));
- _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, _batch);
+ invariant(!_finalized);
+ _finalized = true;
+
+ _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch);
return _numReaped;
}
@@ -207,9 +205,9 @@ private:
LogicalSessionIdSet _batch;
- int _numReaped;
+ int _numReaped{0};
- AtomicBool _finalized;
+ bool _finalized{false};
};
/**
@@ -219,14 +217,7 @@ private:
class ShardedHandler {
public:
ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx),
- _sessionsCollection(sessionsCollection),
- _numReaped(0),
- _finalized(false) {}
-
- ~ShardedHandler() {
- invariant(_finalized.load());
- }
+ : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
void handleLsid(const LogicalSessionId& lsid) {
// There are some lifetime issues with when the reaper starts up versus when the grid is
@@ -253,16 +244,19 @@ public:
auto& lsids = _shards[shardId];
lsids.insert(lsid);
+
if (lsids.size() > write_ops::kMaxWriteBatchSize) {
- _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, lsids);
+ _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, lsids);
_shards.erase(shardId);
}
}
int finalize() {
- invariant(!_finalized.swap(true));
+ invariant(!_finalized);
+ _finalized = true;
+
for (const auto& pair : _shards) {
- _numReaped += handleBatchHelper(_opCtx, _sessionsCollection, pair.second);
+ _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, pair.second);
}
return _numReaped;
@@ -275,11 +269,10 @@ private:
std::shared_ptr<ChunkManager> _cm;
std::shared_ptr<Shard> _primary;
- int _numReaped;
-
stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
+ int _numReaped{0};
- AtomicBool _finalized;
+ bool _finalized{false};
};
} // namespace