summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-05-15 14:36:53 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-05-16 14:40:26 -0400
commita2774d00b637d178ed593abd212b0f8e7ee38669 (patch)
tree64dea021068912fe045340c2c6c3928fc7185530
parent417673fd65eac197f4bc21aab5ae5791b272dbf2 (diff)
downloadmongo-a2774d00b637d178ed593abd212b0f8e7ee38669.tar.gz
SERVER-34833 Sessions transaction reaper should refresh routing info before locking config.transactions
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp15
-rw-r--r--src/mongo/db/transaction_reaper.cpp46
2 files changed, 34 insertions, 27 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index f0f234d97af..fb91dfc6286 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -68,9 +68,11 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
_service->scheduleJob({"LogicalSessionCacheRefresh",
[this](Client* client) { _periodicRefresh(client); },
_refreshInterval});
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- _refreshInterval});
+ if (_transactionReaper) {
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ _refreshInterval});
+ }
}
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -207,6 +209,13 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return uniqueCtx->get();
}();
+ auto res = _sessionsColl->setupSessionsCollection(opCtx);
+ if (!res.isOK()) {
+ log() << "Sessions collection is not set up; "
+ << "waiting until next sessions reap interval: " << res.reason();
+ return Status::OK();
+ }
+
stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
numReaped = _transactionReaper->reap(opCtx);
} catch (...) {
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index 9a8e1cbd074..02e003adae7 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -94,7 +94,7 @@ Query makeQuery(Date_t now) {
/**
* Our impl is templatized on a type which handles the lsids we see. It provides the top level
* scaffolding for figuring out if we're the primary node responsible for the transaction table and
- * invoking the hanlder.
+ * invoking the handler.
*
* The handler here will see all of the possibly expired txn ids in the transaction table and will
* have a lifetime associated with a single call to reap.
@@ -108,6 +108,11 @@ public:
int reap(OperationContext* opCtx) override {
auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx);
+ Handler handler(opCtx, *_collection);
+ if (!handler.initialize()) {
+ return 0;
+ }
+
AutoGetCollection autoColl(
opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
@@ -117,7 +122,6 @@ public:
return 0;
}
- Handler handler(opCtx, *_collection);
DBDirectClient client(opCtx);
auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
@@ -182,6 +186,10 @@ public:
ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
: _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
+ bool initialize() {
+ return true;
+ }
+
void handleLsid(const LogicalSessionId& lsid) {
_batch.insert(lsid);
@@ -219,28 +227,19 @@ public:
ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
: _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
- void handleLsid(const LogicalSessionId& lsid) {
- // There are some lifetime issues with when the reaper starts up versus when the grid is
- // available. Moving routing info fetching until after we have a transaction moves us past
- // the problem.
- //
- // Also, we should only need the chunk case, but that'll wait until the sessions table is
- // actually sharded.
- if (!(_cm || _primary)) {
- auto routingInfo =
- uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
- _opCtx, SessionsCollection::kSessionsNamespaceString));
- _cm = routingInfo.cm();
- _primary = routingInfo.db().primary();
- }
+ // Returns false if the sessions collection is not set up.
+ bool initialize() {
+ auto routingInfo =
+ uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
+ _opCtx, SessionsCollection::kSessionsNamespaceString));
+ _cm = routingInfo.cm();
+ return !!_cm;
+ }
- ShardId shardId;
- if (_cm) {
- const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
- shardId = chunk.getShardId();
- } else {
- shardId = _primary->getId();
- }
+ void handleLsid(const LogicalSessionId& lsid) {
+ invariant(_cm);
+ const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
+ const auto shardId = chunk.getShardId();
auto& lsids = _shards[shardId];
lsids.insert(lsid);
@@ -267,7 +266,6 @@ private:
SessionsCollection& _sessionsCollection;
std::shared_ptr<ChunkManager> _cm;
- std::shared_ptr<Shard> _primary;
stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
int _numReaped{0};