diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-05-15 14:36:53 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-05-16 14:40:26 -0400 |
commit | a2774d00b637d178ed593abd212b0f8e7ee38669 (patch) | |
tree | 64dea021068912fe045340c2c6c3928fc7185530 | |
parent | 417673fd65eac197f4bc21aab5ae5791b272dbf2 (diff) | |
download | mongo-a2774d00b637d178ed593abd212b0f8e7ee38669.tar.gz |
SERVER-34833 Sessions transaction reaper should refresh routing info before locking config.transactions
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 46 |
2 files changed, 34 insertions, 27 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index f0f234d97af..fb91dfc6286 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -68,9 +68,11 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( _service->scheduleJob({"LogicalSessionCacheRefresh", [this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); - _service->scheduleJob({"LogicalSessionCacheReap", - [this](Client* client) { _periodicReap(client); }, - _refreshInterval}); + if (_transactionReaper) { + _service->scheduleJob({"LogicalSessionCacheReap", + [this](Client* client) { _periodicReap(client); }, + _refreshInterval}); + } } _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setLastTransactionReaperJobTimestamp(now()); @@ -207,6 +209,13 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return uniqueCtx->get(); }(); + auto res = _sessionsColl->setupSessionsCollection(opCtx); + if (!res.isOK()) { + log() << "Sessions collection is not set up; " + << "waiting until next sessions reap interval: " << res.reason(); + return Status::OK(); + } + stdx::lock_guard<stdx::mutex> lk(_reaperMutex); numReaped = _transactionReaper->reap(opCtx); } catch (...) { diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index 9a8e1cbd074..02e003adae7 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -94,7 +94,7 @@ Query makeQuery(Date_t now) { /** * Our impl is templatized on a type which handles the lsids we see. It provides the top level * scaffolding for figuring out if we're the primary node responsible for the transaction table and - * invoking the hanlder. + * invoking the handler. * * The handler here will see all of the possibly expired txn ids in the transaction table and will * have a lifetime associated with a single call to reap. @@ -108,6 +108,11 @@ public: int reap(OperationContext* opCtx) override { auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx); + Handler handler(opCtx, *_collection); + if (!handler.initialize()) { + return 0; + } + AutoGetCollection autoColl( opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); @@ -117,7 +122,6 @@ public: return 0; } - Handler handler(opCtx, *_collection); DBDirectClient client(opCtx); auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); @@ -182,6 +186,10 @@ public: ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} + bool initialize() { + return true; + } + void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); @@ -219,28 +227,19 @@ public: ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} - void handleLsid(const LogicalSessionId& lsid) { - // There are some lifetime issues with when the reaper starts up versus when the grid is - // available. Moving routing info fetching until after we have a transaction moves us past - // the problem. - // - // Also, we should only need the chunk case, but that'll wait until the sessions table is - // actually sharded. - if (!(_cm || _primary)) { - auto routingInfo = - uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo( - _opCtx, SessionsCollection::kSessionsNamespaceString)); - _cm = routingInfo.cm(); - _primary = routingInfo.db().primary(); - } + // Returns false if the sessions collection is not set up. + bool initialize() { + auto routingInfo = + uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo( + _opCtx, SessionsCollection::kSessionsNamespaceString)); + _cm = routingInfo.cm(); + return !!_cm; + } - ShardId shardId; - if (_cm) { - const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); - shardId = chunk.getShardId(); - } else { - shardId = _primary->getId(); - } + void handleLsid(const LogicalSessionId& lsid) { + invariant(_cm); + const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); + const auto shardId = chunk.getShardId(); auto& lsids = _shards[shardId]; lsids.insert(lsid); @@ -267,7 +266,6 @@ private: SessionsCollection& _sessionsCollection; std::shared_ptr<ChunkManager> _cm; - std::shared_ptr<Shard> _primary; stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; int _numReaped{0}; |