summaryrefslogtreecommitdiff
path: root/src/mongo/db/cursor_manager.cpp
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2018-03-27 11:20:13 -0400
committerJames Wahlin <james@mongodb.com>2018-04-16 14:33:12 -0400
commit9652d252a2932fc0096704fccb1152b6b290fe6f (patch)
treeb5a6b0f0f23ef1c1e9c9924c37e8d1d5eedc39e1 /src/mongo/db/cursor_manager.cpp
parentc02574298a711b6de8a3d89cedcfe98040a6f55b (diff)
downloadmongo-9652d252a2932fc0096704fccb1152b6b290fe6f.tar.gz
SERVER-33690 Transaction abort and commit should kill any associated client cursors
Diffstat (limited to 'src/mongo/db/cursor_manager.cpp')
-rw-r--r--src/mongo/db/cursor_manager.cpp170
1 files changed, 163 insertions, 7 deletions
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index 39c82910bcb..d6c00d996d2 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
@@ -57,6 +58,21 @@
#include "mongo/util/startup_test.h"
namespace mongo {
+
+MONGO_INITIALIZER(RegisterCursorKillFunction)
+(InitializerContext* const) {
+ Session::registerCursorKillFunction(
+ [](OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) {
+ return CursorManager::killAllCursorsForTransaction(opCtx, lsid, txnNumber);
+ });
+
+ Session::registerCursorExistsFunction([](LogicalSessionId lsid, TxnNumber txnNumber) {
+ return CursorManager::hasTransactionCursorReference(lsid, txnNumber);
+ });
+
+ return Status::OK();
+}
+
using std::vector;
constexpr int CursorManager::kNumPartitions;
@@ -107,11 +123,32 @@ public:
int64_t nextSeed();
+ void addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId);
+
+ void removeTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ CursorId cursorId);
+
+ size_t numOpenCursorsForTransaction(LogicalSessionId lsid, TxnNumber txnNumber);
+
+ size_t killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber);
+
private:
+ // '_mutex' must not be held when acquiring a CursorManager mutex to avoid deadlock.
SimpleMutex _mutex;
- typedef stdx::unordered_map<unsigned, NamespaceString> Map;
- Map _idToNss;
+ using CursorIdToNssMap = stdx::unordered_map<CursorId, NamespaceString>;
+ using TxnNumberToCursorMap = stdx::unordered_map<TxnNumber, CursorIdToNssMap>;
+ using LsidToTxnCursorMap = LogicalSessionIdMap<TxnNumberToCursorMap>;
+ using IdToNssMap = stdx::unordered_map<unsigned, NamespaceString>;
+
+ LsidToTxnCursorMap _lsidToTxnCursorMap;
+ IdToNssMap _idToNss;
unsigned _nextId;
std::unique_ptr<SecureRandom> _secureRandom;
@@ -144,6 +181,72 @@ int64_t GlobalCursorIdCache::nextSeed() {
return _secureRandom->nextInt64();
}
+void GlobalCursorIdCache::addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ invariant(_lsidToTxnCursorMap[lsid][txnNumber].insert({cursorId, nss}).second == true,
+ "Expected insert to succeed");
+}
+
+void GlobalCursorIdCache::removeTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ CursorId cursorId) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ invariant(_lsidToTxnCursorMap[lsid][txnNumber].erase(cursorId) == 1); // cursorId was erased.
+
+ if (_lsidToTxnCursorMap[lsid][txnNumber].size() == 0) {
+ invariant(_lsidToTxnCursorMap[lsid].erase(txnNumber) == 1);
+ if (_lsidToTxnCursorMap[lsid].size() == 0) {
+ invariant(_lsidToTxnCursorMap.erase(lsid) == 1);
+ }
+ }
+}
+
+size_t GlobalCursorIdCache::numOpenCursorsForTransaction(LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ if (_lsidToTxnCursorMap.count(lsid) == 0 || _lsidToTxnCursorMap[lsid].count(txnNumber) == 0) {
+ return 0;
+ }
+ return _lsidToTxnCursorMap[lsid][txnNumber].size();
+}
+
+size_t GlobalCursorIdCache::killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ size_t killCount = 0;
+ CursorIdToNssMap cursorToNssMap;
+ {
+ // Copy the cursorId/NamespaceString map to a local structure to avoid obtaining the
+ // CursorManager mutex while holding the GlobalCursorIdCache mutex.
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ for (auto&& cursorIdAndNss : _lsidToTxnCursorMap[lsid][txnNumber]) {
+ cursorToNssMap.insert(cursorIdAndNss);
+ }
+ }
+
+ for (auto&& cursorIdAndNss : cursorToNssMap) {
+ const auto cursorId = cursorIdAndNss.first;
+ const auto nss = cursorIdAndNss.second;
+
+ auto status = CursorManager::withCursorManager(
+ opCtx, cursorId, nss, [opCtx, cursorId, lsid, txnNumber](CursorManager* manager) {
+ const auto shouldAudit = false;
+ return manager->killCursor(opCtx, cursorId, shouldAudit, lsid, txnNumber);
+ });
+
+ if (status.isOK()) {
+ ++killCount;
+ }
+
+ invariant(status.isOK() || status.code() == ErrorCodes::CursorNotFound);
+ }
+
+ return killCount;
+}
+
uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
static const uint32_t kMaxIds = 1000 * 1000 * 1000;
static_assert((kMaxIds & (0b11 << 30)) == 0,
@@ -187,7 +290,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool
} else {
stdx::lock_guard<SimpleMutex> lk(_mutex);
uint32_t nsid = idFromCursorId(id);
- Map::const_iterator it = _idToNss.find(nsid);
+ IdToNssMap::const_iterator it = _idToNss.find(nsid);
if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
@@ -355,10 +458,35 @@ std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions(
return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
}
+void CursorManager::addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId) {
+ globalCursorIdCache->addTransactionCursorReference(lsid, txnNumber, nss, cursorId);
+}
+
+void CursorManager::removeTransactionCursorReference(const ClientCursor* cursor) {
+ // Remove cursor transaction registration if needed.
+ if (cursor->_lsid && cursor->_txnNumber) {
+ globalCursorIdCache->removeTransactionCursorReference(
+ *cursor->_lsid, *cursor->_txnNumber, cursor->_cursorid);
+ }
+}
+
std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
return globalCursorIdCache->timeoutCursors(opCtx, now);
}
+size_t CursorManager::killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ return globalCursorIdCache->killAllCursorsForTransaction(opCtx, lsid, txnNumber);
+}
+
+bool CursorManager::hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber) {
+ return globalCursorIdCache->numOpenCursorsForTransaction(lsid, txnNumber) > 0;
+}
+
int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
ConstDataCursor ids(_ids);
int numDeleted = 0;
@@ -455,6 +583,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx,
// responsible for cleaning it up. Otherwise we can immediately dispose of it.
if (cursor->_operationUsingCursor) {
it = partition.erase(it);
+ removeTransactionCursorReference(cursor);
continue;
}
@@ -463,6 +592,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx,
// result in a useful error message.
++it;
} else {
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
it = partition.erase(it);
@@ -514,6 +644,7 @@ std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
auto* cursor = it->second;
if (cursorShouldTimeout_inlock(cursor, now)) {
// Dispose of the cursor and remove it from the partition.
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor});
it = lockedPartition->erase(it);
@@ -561,6 +692,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx,
// This cursor was killed while it was idle.
Status error = cursor->getExecutor()->getKillStatus();
lockedPartition->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
return error;
@@ -606,6 +738,7 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) {
LOG(0) << "removing cursor " << cursor->cursorid()
<< " after completing batch: " << interruptStatus;
partition->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
} else if (!interruptStatus.isOK()) {
@@ -711,6 +844,13 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(
new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now));
+ // Register this cursor for lookup by transaction.
+ if (opCtx->getLogicalSessionId() && opCtx->getTxnNumber()) {
+ invariant(opCtx->getLogicalSessionId());
+ addTransactionCursorReference(
+ *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), cursorParams.nss, cursorId);
+ }
+
// Transfer ownership of the cursor to '_cursorMap'.
auto partition = _cursorMap->lockOnePartition(cursorId);
ClientCursor* unownedCursor = clientCursor.release();
@@ -718,11 +858,16 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
return ClientCursorPin(opCtx, unownedCursor);
}
-void CursorManager::deregisterCursor(ClientCursor* cc) {
- _cursorMap->erase(cc->cursorid());
+void CursorManager::deregisterCursor(ClientCursor* cursor) {
+ _cursorMap->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
}
-Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
+Status CursorManager::killCursor(OperationContext* opCtx,
+ CursorId id,
+ bool shouldAudit,
+ boost::optional<LogicalSessionId> lsid,
+ boost::optional<TxnNumber> txnNumber) {
auto lockedPartition = _cursorMap->lockOnePartition(id);
auto it = lockedPartition->find(id);
if (it == lockedPartition->end()) {
@@ -734,6 +879,17 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou
}
auto cursor = it->second;
+ if (lsid && lsid != cursor->getSessionId()) {
+ return {
+ ErrorCodes::CursorNotFound,
+ str::stream() << "killCursor LogicalSessionId must match that of cursor when provided"};
+ }
+
+ if (txnNumber && txnNumber != cursor->getTxnNumber()) {
+ return {ErrorCodes::CursorNotFound,
+ str::stream() << "killCursor TxnNumber must match that of cursor when provided"};
+ }
+
if (cursor->_operationUsingCursor) {
// Rather than removing the cursor directly, kill the operation that's currently using the
// cursor. It will stop on its own (and remove the cursor) when it sees that it's been
@@ -756,9 +912,9 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou
}
lockedPartition->erase(ownedCursor->cursorid());
+ cursor->_cursorManager->removeTransactionCursorReference(cursor);
ownedCursor->dispose(opCtx);
return Status::OK();
- ;
}
Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) {