diff options
author | James Wahlin <james@mongodb.com> | 2018-03-27 11:20:13 -0400 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2018-04-16 14:33:12 -0400 |
commit | 9652d252a2932fc0096704fccb1152b6b290fe6f (patch) | |
tree | b5a6b0f0f23ef1c1e9c9924c37e8d1d5eedc39e1 /src/mongo/db/cursor_manager.cpp | |
parent | c02574298a711b6de8a3d89cedcfe98040a6f55b (diff) | |
download | mongo-9652d252a2932fc0096704fccb1152b6b290fe6f.tar.gz |
SERVER-33690 Transaction abort and commit should kill any associated client cursors
Diffstat (limited to 'src/mongo/db/cursor_manager.cpp')
-rw-r--r-- | src/mongo/db/cursor_manager.cpp | 170 |
1 files changed, 163 insertions, 7 deletions
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index 39c82910bcb..d6c00d996d2 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" @@ -57,6 +58,21 @@ #include "mongo/util/startup_test.h" namespace mongo { + +MONGO_INITIALIZER(RegisterCursorKillFunction) +(InitializerContext* const) { + Session::registerCursorKillFunction( + [](OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { + return CursorManager::killAllCursorsForTransaction(opCtx, lsid, txnNumber); + }); + + Session::registerCursorExistsFunction([](LogicalSessionId lsid, TxnNumber txnNumber) { + return CursorManager::hasTransactionCursorReference(lsid, txnNumber); + }); + + return Status::OK(); +} + using std::vector; constexpr int CursorManager::kNumPartitions; @@ -107,11 +123,32 @@ public: int64_t nextSeed(); + void addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId); + + void removeTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + CursorId cursorId); + + size_t numOpenCursorsForTransaction(LogicalSessionId lsid, TxnNumber txnNumber); + + size_t killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber); + private: + // '_mutex' must not be held when acquiring a CursorManager mutex to avoid deadlock. SimpleMutex _mutex; - typedef stdx::unordered_map<unsigned, NamespaceString> Map; - Map _idToNss; + using CursorIdToNssMap = stdx::unordered_map<CursorId, NamespaceString>; + using TxnNumberToCursorMap = stdx::unordered_map<TxnNumber, CursorIdToNssMap>; + using LsidToTxnCursorMap = LogicalSessionIdMap<TxnNumberToCursorMap>; + using IdToNssMap = stdx::unordered_map<unsigned, NamespaceString>; + + LsidToTxnCursorMap _lsidToTxnCursorMap; + IdToNssMap _idToNss; unsigned _nextId; std::unique_ptr<SecureRandom> _secureRandom; @@ -144,6 +181,72 @@ int64_t GlobalCursorIdCache::nextSeed() { return _secureRandom->nextInt64(); } +void GlobalCursorIdCache::addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + invariant(_lsidToTxnCursorMap[lsid][txnNumber].insert({cursorId, nss}).second == true, + "Expected insert to succeed"); +} + +void GlobalCursorIdCache::removeTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + CursorId cursorId) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + invariant(_lsidToTxnCursorMap[lsid][txnNumber].erase(cursorId) == 1); // cursorId was erased. + + if (_lsidToTxnCursorMap[lsid][txnNumber].size() == 0) { + invariant(_lsidToTxnCursorMap[lsid].erase(txnNumber) == 1); + if (_lsidToTxnCursorMap[lsid].size() == 0) { + invariant(_lsidToTxnCursorMap.erase(lsid) == 1); + } + } +} + +size_t GlobalCursorIdCache::numOpenCursorsForTransaction(LogicalSessionId lsid, + TxnNumber txnNumber) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + if (_lsidToTxnCursorMap.count(lsid) == 0 || _lsidToTxnCursorMap[lsid].count(txnNumber) == 0) { + return 0; + } + return _lsidToTxnCursorMap[lsid][txnNumber].size(); +} + +size_t GlobalCursorIdCache::killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + size_t killCount = 0; + CursorIdToNssMap cursorToNssMap; + { + // Copy the cursorId/NamespaceString map to a local structure to avoid obtaining the + // CursorManager mutex while holding the GlobalCursorIdCache mutex. + stdx::lock_guard<SimpleMutex> lk(_mutex); + for (auto&& cursorIdAndNss : _lsidToTxnCursorMap[lsid][txnNumber]) { + cursorToNssMap.insert(cursorIdAndNss); + } + } + + for (auto&& cursorIdAndNss : cursorToNssMap) { + const auto cursorId = cursorIdAndNss.first; + const auto nss = cursorIdAndNss.second; + + auto status = CursorManager::withCursorManager( + opCtx, cursorId, nss, [opCtx, cursorId, lsid, txnNumber](CursorManager* manager) { + const auto shouldAudit = false; + return manager->killCursor(opCtx, cursorId, shouldAudit, lsid, txnNumber); + }); + + if (status.isOK()) { + ++killCount; + } + + invariant(status.isOK() || status.code() == ErrorCodes::CursorNotFound); + } + + return killCount; +} + uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { static const uint32_t kMaxIds = 1000 * 1000 * 1000; static_assert((kMaxIds & (0b11 << 30)) == 0, @@ -187,7 +290,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool } else { stdx::lock_guard<SimpleMutex> lk(_mutex); uint32_t nsid = idFromCursorId(id); - Map::const_iterator it = _idToNss.find(nsid); + IdToNssMap::const_iterator it = _idToNss.find(nsid); if (it == _idToNss.end()) { // No namespace corresponding to this cursor id prefix. TODO: Consider writing to // audit log here (even though we don't have a namespace). @@ -355,10 +458,35 @@ std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions( return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled()); } +void CursorManager::addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId) { + globalCursorIdCache->addTransactionCursorReference(lsid, txnNumber, nss, cursorId); +} + +void CursorManager::removeTransactionCursorReference(const ClientCursor* cursor) { + // Remove cursor transaction registration if needed. + if (cursor->_lsid && cursor->_txnNumber) { + globalCursorIdCache->removeTransactionCursorReference( + *cursor->_lsid, *cursor->_txnNumber, cursor->_cursorid); + } +} + std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { return globalCursorIdCache->timeoutCursors(opCtx, now); } +size_t CursorManager::killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + return globalCursorIdCache->killAllCursorsForTransaction(opCtx, lsid, txnNumber); +} + +bool CursorManager::hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber) { + return globalCursorIdCache->numOpenCursorsForTransaction(lsid, txnNumber) > 0; +} + int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) { ConstDataCursor ids(_ids); int numDeleted = 0; @@ -455,6 +583,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx, // responsible for cleaning it up. Otherwise we can immediately dispose of it. if (cursor->_operationUsingCursor) { it = partition.erase(it); + removeTransactionCursorReference(cursor); continue; } @@ -463,6 +592,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx, // result in a useful error message. ++it; } else { + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; it = partition.erase(it); @@ -514,6 +644,7 @@ std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { auto* cursor = it->second; if (cursorShouldTimeout_inlock(cursor, now)) { // Dispose of the cursor and remove it from the partition. + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor}); it = lockedPartition->erase(it); @@ -561,6 +692,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, // This cursor was killed while it was idle. Status error = cursor->getExecutor()->getKillStatus(); lockedPartition->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; return error; @@ -606,6 +738,7 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { LOG(0) << "removing cursor " << cursor->cursorid() << " after completing batch: " << interruptStatus; partition->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; } else if (!interruptStatus.isOK()) { @@ -711,6 +844,13 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now)); + // Register this cursor for lookup by transaction. + if (opCtx->getLogicalSessionId() && opCtx->getTxnNumber()) { + invariant(opCtx->getLogicalSessionId()); + addTransactionCursorReference( + *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), cursorParams.nss, cursorId); + } + // Transfer ownership of the cursor to '_cursorMap'. auto partition = _cursorMap->lockOnePartition(cursorId); ClientCursor* unownedCursor = clientCursor.release(); @@ -718,11 +858,16 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, return ClientCursorPin(opCtx, unownedCursor); } -void CursorManager::deregisterCursor(ClientCursor* cc) { - _cursorMap->erase(cc->cursorid()); +void CursorManager::deregisterCursor(ClientCursor* cursor) { + _cursorMap->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); } -Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { +Status CursorManager::killCursor(OperationContext* opCtx, + CursorId id, + bool shouldAudit, + boost::optional<LogicalSessionId> lsid, + boost::optional<TxnNumber> txnNumber) { auto lockedPartition = _cursorMap->lockOnePartition(id); auto it = lockedPartition->find(id); if (it == lockedPartition->end()) { @@ -734,6 +879,17 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou } auto cursor = it->second; + if (lsid && lsid != cursor->getSessionId()) { + return { + ErrorCodes::CursorNotFound, + str::stream() << "killCursor LogicalSessionId must match that of cursor when provided"}; + } + + if (txnNumber && txnNumber != cursor->getTxnNumber()) { + return {ErrorCodes::CursorNotFound, + str::stream() << "killCursor TxnNumber must match that of cursor when provided"}; + } + if (cursor->_operationUsingCursor) { // Rather than removing the cursor directly, kill the operation that's currently using the // cursor. It will stop on its own (and remove the cursor) when it sees that it's been @@ -756,9 +912,9 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou } lockedPartition->erase(ownedCursor->cursorid()); + cursor->_cursorManager->removeTransactionCursorReference(cursor); ownedCursor->dispose(opCtx); return Status::OK(); - ; } Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) { |