summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/find_cmd.cpp2
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp10
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp1
-rw-r--r--src/mongo/db/cursor_manager.cpp170
-rw-r--r--src/mongo/db/cursor_manager.h38
-rw-r--r--src/mongo/db/db_raii.cpp3
-rw-r--r--src/mongo/db/exec/subplan.cpp1
-rw-r--r--src/mongo/db/kill_sessions_local.cpp15
-rw-r--r--src/mongo/db/kill_sessions_local.h9
-rw-r--r--src/mongo/db/operation_context.h18
-rw-r--r--src/mongo/db/query/query_test_service_context.cpp5
-rw-r--r--src/mongo/db/query/query_test_service_context.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/session.cpp194
-rw-r--r--src/mongo/db/session.h62
-rw-r--r--src/mongo/db/session_catalog_test.cpp6
-rw-r--r--src/mongo/db/session_test.cpp79
24 files changed, 523 insertions, 122 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 875638e46c2..f00f42cc9fd 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -399,8 +399,6 @@ public:
}
pinnedCursor.getCursor()->setPos(numResults);
- opCtx->setStashedCursor();
-
// Fill out curop based on the results.
endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId);
} else {
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e62e3667afd..83188a92793 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -217,10 +217,6 @@ public:
const GetMoreRequest& request,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
- // If we return early without freeing the cursor, indicate we have a stashed cursor, so that
- // transaction state is stashed.
- ScopeGuard stashedCursorIndicator = MakeGuard(&OperationContext::setStashedCursor, opCtx);
-
auto curOp = CurOp::get(opCtx);
curOp->debug().cursorid = request.cursorid;
@@ -355,10 +351,8 @@ public:
return CommandHelpers::appendCommandStatus(result, status);
}
- // On early return, get rid of the cursor. We should no longer indicate we have a stashed
- // cursor on early return.
+ // On early return, get rid of the cursor.
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue());
- stashedCursorIndicator.Dismiss();
const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode();
opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cursor->getReadConcernLevel(),
@@ -472,8 +466,6 @@ public:
cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
cursor->incPos(numResults);
-
- opCtx->setStashedCursor();
} else {
curOp->debug().cursorExhausted = true;
}
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index ae3818ab9ea..ea18c824716 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -174,7 +174,6 @@ public:
bucketsBuilder.append(threadResult.obj());
}
result.appendArray("cursors", bucketsBuilder.obj());
- opCtx->setStashedCursor();
return true;
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 3529cce56e9..2f068165ed9 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -529,7 +529,6 @@ Status runAggregate(OperationContext* opCtx,
const bool keepCursor =
handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result);
if (keepCursor) {
- opCtx->setStashedCursor();
cursorFreer.Dismiss();
}
}
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index 39c82910bcb..d6c00d996d2 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
@@ -57,6 +58,21 @@
#include "mongo/util/startup_test.h"
namespace mongo {
+
+MONGO_INITIALIZER(RegisterCursorKillFunction)
+(InitializerContext* const) {
+ Session::registerCursorKillFunction(
+ [](OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) {
+ return CursorManager::killAllCursorsForTransaction(opCtx, lsid, txnNumber);
+ });
+
+ Session::registerCursorExistsFunction([](LogicalSessionId lsid, TxnNumber txnNumber) {
+ return CursorManager::hasTransactionCursorReference(lsid, txnNumber);
+ });
+
+ return Status::OK();
+}
+
using std::vector;
constexpr int CursorManager::kNumPartitions;
@@ -107,11 +123,32 @@ public:
int64_t nextSeed();
+ void addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId);
+
+ void removeTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ CursorId cursorId);
+
+ size_t numOpenCursorsForTransaction(LogicalSessionId lsid, TxnNumber txnNumber);
+
+ size_t killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber);
+
private:
+ // '_mutex' must not be held when acquiring a CursorManager mutex to avoid deadlock.
SimpleMutex _mutex;
- typedef stdx::unordered_map<unsigned, NamespaceString> Map;
- Map _idToNss;
+ using CursorIdToNssMap = stdx::unordered_map<CursorId, NamespaceString>;
+ using TxnNumberToCursorMap = stdx::unordered_map<TxnNumber, CursorIdToNssMap>;
+ using LsidToTxnCursorMap = LogicalSessionIdMap<TxnNumberToCursorMap>;
+ using IdToNssMap = stdx::unordered_map<unsigned, NamespaceString>;
+
+ LsidToTxnCursorMap _lsidToTxnCursorMap;
+ IdToNssMap _idToNss;
unsigned _nextId;
std::unique_ptr<SecureRandom> _secureRandom;
@@ -144,6 +181,72 @@ int64_t GlobalCursorIdCache::nextSeed() {
return _secureRandom->nextInt64();
}
+void GlobalCursorIdCache::addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ invariant(_lsidToTxnCursorMap[lsid][txnNumber].insert({cursorId, nss}).second == true,
+ "Expected insert to succeed");
+}
+
+void GlobalCursorIdCache::removeTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ CursorId cursorId) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ invariant(_lsidToTxnCursorMap[lsid][txnNumber].erase(cursorId) == 1); // cursorId was erased.
+
+ if (_lsidToTxnCursorMap[lsid][txnNumber].size() == 0) {
+ invariant(_lsidToTxnCursorMap[lsid].erase(txnNumber) == 1);
+ if (_lsidToTxnCursorMap[lsid].size() == 0) {
+ invariant(_lsidToTxnCursorMap.erase(lsid) == 1);
+ }
+ }
+}
+
+size_t GlobalCursorIdCache::numOpenCursorsForTransaction(LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ if (_lsidToTxnCursorMap.count(lsid) == 0 || _lsidToTxnCursorMap[lsid].count(txnNumber) == 0) {
+ return 0;
+ }
+ return _lsidToTxnCursorMap[lsid][txnNumber].size();
+}
+
+size_t GlobalCursorIdCache::killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ size_t killCount = 0;
+ CursorIdToNssMap cursorToNssMap;
+ {
+ // Copy the cursorId/NamespaceString map to a local structure to avoid obtaining the
+ // CursorManager mutex while holding the GlobalCursorIdCache mutex.
+ stdx::lock_guard<SimpleMutex> lk(_mutex);
+ for (auto&& cursorIdAndNss : _lsidToTxnCursorMap[lsid][txnNumber]) {
+ cursorToNssMap.insert(cursorIdAndNss);
+ }
+ }
+
+ for (auto&& cursorIdAndNss : cursorToNssMap) {
+ const auto cursorId = cursorIdAndNss.first;
+ const auto nss = cursorIdAndNss.second;
+
+ auto status = CursorManager::withCursorManager(
+ opCtx, cursorId, nss, [opCtx, cursorId, lsid, txnNumber](CursorManager* manager) {
+ const auto shouldAudit = false;
+ return manager->killCursor(opCtx, cursorId, shouldAudit, lsid, txnNumber);
+ });
+
+ if (status.isOK()) {
+ ++killCount;
+ }
+
+ invariant(status.isOK() || status.code() == ErrorCodes::CursorNotFound);
+ }
+
+ return killCount;
+}
+
uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
static const uint32_t kMaxIds = 1000 * 1000 * 1000;
static_assert((kMaxIds & (0b11 << 30)) == 0,
@@ -187,7 +290,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool
} else {
stdx::lock_guard<SimpleMutex> lk(_mutex);
uint32_t nsid = idFromCursorId(id);
- Map::const_iterator it = _idToNss.find(nsid);
+ IdToNssMap::const_iterator it = _idToNss.find(nsid);
if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
@@ -355,10 +458,35 @@ std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions(
return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
}
+void CursorManager::addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId) {
+ globalCursorIdCache->addTransactionCursorReference(lsid, txnNumber, nss, cursorId);
+}
+
+void CursorManager::removeTransactionCursorReference(const ClientCursor* cursor) {
+ // Remove cursor transaction registration if needed.
+ if (cursor->_lsid && cursor->_txnNumber) {
+ globalCursorIdCache->removeTransactionCursorReference(
+ *cursor->_lsid, *cursor->_txnNumber, cursor->_cursorid);
+ }
+}
+
std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
return globalCursorIdCache->timeoutCursors(opCtx, now);
}
+size_t CursorManager::killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ return globalCursorIdCache->killAllCursorsForTransaction(opCtx, lsid, txnNumber);
+}
+
+bool CursorManager::hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber) {
+ return globalCursorIdCache->numOpenCursorsForTransaction(lsid, txnNumber) > 0;
+}
+
int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
ConstDataCursor ids(_ids);
int numDeleted = 0;
@@ -455,6 +583,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx,
// responsible for cleaning it up. Otherwise we can immediately dispose of it.
if (cursor->_operationUsingCursor) {
it = partition.erase(it);
+ removeTransactionCursorReference(cursor);
continue;
}
@@ -463,6 +592,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx,
// result in a useful error message.
++it;
} else {
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
it = partition.erase(it);
@@ -514,6 +644,7 @@ std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
auto* cursor = it->second;
if (cursorShouldTimeout_inlock(cursor, now)) {
// Dispose of the cursor and remove it from the partition.
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor});
it = lockedPartition->erase(it);
@@ -561,6 +692,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx,
// This cursor was killed while it was idle.
Status error = cursor->getExecutor()->getKillStatus();
lockedPartition->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
return error;
@@ -606,6 +738,7 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) {
LOG(0) << "removing cursor " << cursor->cursorid()
<< " after completing batch: " << interruptStatus;
partition->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
cursor->dispose(opCtx);
delete cursor;
} else if (!interruptStatus.isOK()) {
@@ -711,6 +844,13 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(
new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now));
+ // Register this cursor for lookup by transaction.
+ if (opCtx->getLogicalSessionId() && opCtx->getTxnNumber()) {
+ invariant(opCtx->getLogicalSessionId());
+ addTransactionCursorReference(
+ *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), cursorParams.nss, cursorId);
+ }
+
// Transfer ownership of the cursor to '_cursorMap'.
auto partition = _cursorMap->lockOnePartition(cursorId);
ClientCursor* unownedCursor = clientCursor.release();
@@ -718,11 +858,16 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
return ClientCursorPin(opCtx, unownedCursor);
}
-void CursorManager::deregisterCursor(ClientCursor* cc) {
- _cursorMap->erase(cc->cursorid());
+void CursorManager::deregisterCursor(ClientCursor* cursor) {
+ _cursorMap->erase(cursor->cursorid());
+ removeTransactionCursorReference(cursor);
}
-Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
+Status CursorManager::killCursor(OperationContext* opCtx,
+ CursorId id,
+ bool shouldAudit,
+ boost::optional<LogicalSessionId> lsid,
+ boost::optional<TxnNumber> txnNumber) {
auto lockedPartition = _cursorMap->lockOnePartition(id);
auto it = lockedPartition->find(id);
if (it == lockedPartition->end()) {
@@ -734,6 +879,17 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou
}
auto cursor = it->second;
+ if (lsid && lsid != cursor->getSessionId()) {
+ return {
+ ErrorCodes::CursorNotFound,
+ str::stream() << "killCursor LogicalSessionId must match that of cursor when provided"};
+ }
+
+ if (txnNumber && txnNumber != cursor->getTxnNumber()) {
+ return {ErrorCodes::CursorNotFound,
+ str::stream() << "killCursor TxnNumber must match that of cursor when provided"};
+ }
+
if (cursor->_operationUsingCursor) {
// Rather than removing the cursor directly, kill the operation that's currently using the
// cursor. It will stop on its own (and remove the cursor) when it sees that it's been
@@ -756,9 +912,9 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou
}
lockedPartition->erase(ownedCursor->cursorid());
+ cursor->_cursorManager->removeTransactionCursorReference(cursor);
ownedCursor->dispose(opCtx);
return Status::OK();
- ;
}
Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) {
diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h
index 90895407f53..2104a558bae 100644
--- a/src/mongo/db/cursor_manager.h
+++ b/src/mongo/db/cursor_manager.h
@@ -100,6 +100,20 @@ public:
static std::pair<Status, int> killCursorsWithMatchingSessions(
OperationContext* opCtx, const SessionKiller::Matcher& matcher);
+ /**
+ * Kills all cursors with matching logical session and transaction number. Returns the number of
+ * cursors successfully killed.
+ */
+ static size_t killAllCursorsForTransaction(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber);
+
+ /**
+ * Returns true if the CursorManager has cursor references for the given session ID and
+ * transaction number.
+ */
+ static bool hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber);
+
CursorManager(NamespaceString nss);
/**
@@ -110,7 +124,8 @@ public:
/**
* Kills all managed query executors and ClientCursors. Callers must have exclusive access to
- * the collection (i.e. must have the collection, databse, or global resource locked in MODE_X).
+ * the collection (i.e. must have the collection, database, or global resource locked in
+ * MODE_X).
*
* 'collectionGoingAway' indicates whether the Collection instance is being deleted. This could
* be because the db is being closed, or the collection/db is being dropped.
@@ -182,9 +197,14 @@ public:
* Returns ErrorCodes::CursorNotFound if the cursor id is not owned by this manager. Returns
* ErrorCodes::OperationFailed if attempting to erase a pinned cursor.
*
- * If 'shouldAudit' is true, will perform audit logging.
+ * If 'shouldAudit' is true, will perform audit logging. If 'lsid' or 'txnNumber' are provided
+ * we will confirm that the cursor is owned by the given session or transaction.
*/
- Status killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit);
+ Status killCursor(OperationContext* opCtx,
+ CursorId id,
+ bool shouldAudit,
+ boost::optional<LogicalSessionId> lsid = boost::none,
+ boost::optional<TxnNumber> txnNumber = boost::none);
/**
* Returns an OK status if we're authorized to erase the cursor. Otherwise, returns
@@ -256,12 +276,22 @@ private:
struct PlanExecutorPartitioner {
std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions);
};
+
+ // Adds a CursorId to structure that allows for lookup by LogicalSessionId and TxnNumber.
+ void addTransactionCursorReference(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ NamespaceString nss,
+ CursorId cursorId);
+
+ // Removes a CursorId from the LogicalSessionId / TxnNumber lookup structure.
+ void removeTransactionCursorReference(const ClientCursor* cursor);
+
CursorId allocateCursorId_inlock();
ClientCursorPin _registerCursor(
OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor);
- void deregisterCursor(ClientCursor* cc);
+ void deregisterCursor(ClientCursor* cursor);
void unpin(OperationContext* opCtx, ClientCursor* cursor);
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 03c3ee52ec5..73032902a49 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -366,8 +366,7 @@ LockMode getLockModeForQuery(OperationContext* opCtx) {
invariant(opCtx);
// Use IX locks for autocommit:false multi-statement transactions; otherwise, use IS locks.
- auto session = OperationContextSession::get(opCtx);
- if (session && session->inMultiDocumentTransaction()) {
+ if (Session::TransactionState::get(opCtx).requiresIXReadUpgrade) {
return MODE_IX;
}
return MODE_IS;
diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp
index e75f19aae33..265f7be65e1 100644
--- a/src/mongo/db/exec/subplan.cpp
+++ b/src/mongo/db/exec/subplan.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/matcher/extensions_callback_real.h"
-#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/planner_access.h"
#include "mongo/db/query/planner_analysis.h"
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 1eec2904850..1ab5238485f 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -51,10 +51,19 @@ void killSessionsLocalKillCursors(OperationContext* opCtx, const SessionKiller::
} // namespace
void killSessionsLocalKillTransactions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+ const SessionKiller::Matcher& matcher,
+ bool shouldKillClientCursors) {
+ SessionCatalog::get(opCtx)->scanSessions(
+ opCtx, matcher, [shouldKillClientCursors](OperationContext* opCtx, Session* session) {
+ session->abortArbitraryTransaction(opCtx, shouldKillClientCursors);
+ });
+}
+
+void killSessionsLocalKillTransactionCursors(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
SessionCatalog::get(opCtx)->scanSessions(
opCtx, matcher, [](OperationContext* opCtx, Session* session) {
- session->abortArbitraryTransaction();
+ session->killTransactionCursors(opCtx);
});
}
@@ -72,7 +81,7 @@ void killAllExpiredTransactions(OperationContext* opCtx) {
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
SessionCatalog::get(opCtx)->scanSessions(
opCtx, matcherAllSessions, [](OperationContext* opCtx, Session* session) {
- session->abortArbitraryTransactionIfExpired();
+ session->abortArbitraryTransactionIfExpired(opCtx);
});
}
diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h
index a21bcec4ae5..2022a0fda05 100644
--- a/src/mongo/db/kill_sessions_local.h
+++ b/src/mongo/db/kill_sessions_local.h
@@ -46,7 +46,14 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
* Kills all transactions on mongod for sessions matching 'matcher'.
*/
void killSessionsLocalKillTransactions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
+ const SessionKiller::Matcher& matcher,
+ bool shouldKillClientCursors = true);
+
+/**
+ * Kills all transactions cursors on mongod for sessions matching 'matcher'.
+ */
+void killSessionsLocalKillTransactionCursors(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher);
/**
* Aborts any expired transactions.
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index dc30d6dfd17..07980165e33 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -411,20 +411,6 @@ public:
*/
Microseconds getRemainingMaxTimeMicros() const;
- /**
- * Indicate that the current network operation will leave an open client cursor on completion.
- */
- void setStashedCursor() {
- _hasStashedCursor = true;
- }
-
- /**
- * Returns whether the current network operation will leave an open client cursor on completion.
- */
- bool hasStashedCursor() {
- return _hasStashedCursor;
- }
-
private:
/**
* Returns true if this operation has a deadline and it has passed according to the fast clock
@@ -509,10 +495,6 @@ private:
Timer _elapsedTime;
bool _writesAreReplicated = true;
-
- // When true, the cursor used by this operation will be stashed for use by a subsequent network
- // operation.
- bool _hasStashedCursor = false;
};
namespace repl {
diff --git a/src/mongo/db/query/query_test_service_context.cpp b/src/mongo/db/query/query_test_service_context.cpp
index d752db15664..407a0341792 100644
--- a/src/mongo/db/query/query_test_service_context.cpp
+++ b/src/mongo/db/query/query_test_service_context.cpp
@@ -46,9 +46,12 @@ ServiceContext::UniqueOperationContext QueryTestServiceContext::makeOperationCon
}
ServiceContext::UniqueOperationContext QueryTestServiceContext::makeOperationContext(
- LogicalSessionId lsid) {
+ LogicalSessionId lsid, boost::optional<TxnNumber> txnNumber) {
auto opCtx = makeOperationContext();
opCtx->setLogicalSessionId(lsid);
+ if (txnNumber) {
+ opCtx->setTxnNumber(*txnNumber);
+ }
return opCtx;
}
diff --git a/src/mongo/db/query/query_test_service_context.h b/src/mongo/db/query/query_test_service_context.h
index c52944cf2f5..bbf265a06ac 100644
--- a/src/mongo/db/query/query_test_service_context.h
+++ b/src/mongo/db/query/query_test_service_context.h
@@ -45,7 +45,8 @@ public:
ServiceContext::UniqueOperationContext makeOperationContext();
- ServiceContext::UniqueOperationContext makeOperationContext(LogicalSessionId lsid);
+ ServiceContext::UniqueOperationContext makeOperationContext(
+ LogicalSessionId lsid, boost::optional<TxnNumber> txnNumber);
Client* getClient() const;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index b9cd7dee465..aed9b92127a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -210,11 +210,16 @@ public:
/**
* Kills all operations that have a Client that is associated with an incoming user
- * connection. Used during stepdown.
+ * connection. Also kills stashed transaction resources. Used during stepdown.
*/
virtual void killAllUserOperations(OperationContext* opCtx) = 0;
/**
+ * Kills all transaction owned client cursors. Used during stepdown.
+ */
+ virtual void killAllTransactionCursors(OperationContext* opCtx) = 0;
+
+ /**
* Resets any active sharding metadata on this server and stops any sharding-related threads
* (such as the balancer). It is called after stepDown to ensure that if the node becomes
* primary again in the future it will recover its state from a clean slate.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 5ec7ca490b7..c14bda1f3ec 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -643,7 +643,14 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon
// Destroy all stashed transaction resources, in order to release locks.
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- killSessionsLocalKillTransactions(opCtx, matcherAllSessions);
+ bool killCursors = false;
+ killSessionsLocalKillTransactions(opCtx, matcherAllSessions, killCursors);
+}
+
+void ReplicationCoordinatorExternalStateImpl::killAllTransactionCursors(OperationContext* opCtx) {
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+ killSessionsLocalKillTransactionCursors(opCtx, matcherAllSessions);
}
void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 242071f9cb7..f5cd7bbf82b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -92,6 +92,7 @@ public:
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
+ virtual void killAllTransactionCursors(OperationContext* opCtx);
virtual void shardingOnStepDownHook();
virtual void signalApplierToChooseNewSyncSource();
virtual void stopProducer();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index e0bbea95a66..f100f191f46 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -201,6 +201,8 @@ void ReplicationCoordinatorExternalStateMock::closeConnections() {
void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationContext* opCtx) {}
+void ReplicationCoordinatorExternalStateMock::killAllTransactionCursors(OperationContext* opCtx) {}
+
void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 4beafc38d5b..139402949d1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -81,6 +81,7 @@ public:
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
+ virtual void killAllTransactionCursors(OperationContext* opCtx);
virtual void shardingOnStepDownHook();
virtual void signalApplierToChooseNewSyncSource();
virtual void stopProducer();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 58a8b513ef9..1437bfa6991 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1583,6 +1583,10 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
"specified that we should step down for"};
}
+ // TODO SERVER-34395: Remove this method and kill cursors as part of killAllUserOperations call
+ // when the CursorManager no longer requires collection locks to kill cursors.
+ _externalState->killAllTransactionCursors(opCtx);
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto status = opCtx->checkForInterruptNoAssert();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index a73bad469fd..dd731fdcacb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -400,6 +400,10 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
globalExclusiveLock.waitForLockUntil(Date_t::max());
invariant(globalExclusiveLock.isLocked());
+ // TODO SERVER-34395: Remove this method and kill cursors as part of killAllUserOperations call
+ // when the CursorManager no longer requires collection locks to kill cursors.
+ _externalState->killAllTransactionCursors(opCtx.get());
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
_topCoord->finishUnconditionalStepDown();
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index a7747ccf996..1b1674be4df 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -64,6 +64,12 @@ namespace mongo {
server_parameter_storage_type<int, ServerParameterType::kStartupAndRuntime>::value_type
transactionLifetimeLimitSeconds(60);
+const OperationContext::Decoration<Session::TransactionState> Session::TransactionState::get =
+ OperationContext::declareDecoration<Session::TransactionState>();
+
+Session::CursorKillFunction Session::_cursorKillFunction;
+Session::CursorExistsFunction Session::_cursorExistsFunction;
+
/**
* Implements a validation function for server parameter 'transactionLifetimeLimitSeconds'
* instantiated above. 'transactionLifetimeLimitSeconds' can only be set to >= 1.
@@ -329,16 +335,33 @@ void Session::beginOrContinueTxn(OperationContext* opCtx,
invariant(!opCtx->lockState()->isLocked());
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _beginOrContinueTxn(lg, txnNumber, autocommit, startTransaction);
+ TxnNumber txnNumberAtStart;
+ bool canKillCursors = false;
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
+ _beginOrContinueTxn(lg, opCtx, txnNumber, autocommit, startTransaction, &canKillCursors);
+ }
+
+ if (canKillCursors) {
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
+ }
}
void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber) {
invariant(!opCtx->getClient()->isInDirectClient());
invariant(!opCtx->lockState()->isLocked());
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _beginOrContinueTxnOnMigration(lg, txnNumber);
+ TxnNumber txnNumberAtStart;
+ bool canKillCursors = false;
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
+ _beginOrContinueTxnOnMigration(lg, opCtx, txnNumber, &canKillCursors);
+ }
+ if (canKillCursors) {
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
+ }
}
@@ -470,9 +493,11 @@ bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtI
}
void Session::_beginOrContinueTxn(WithLock wl,
+ OperationContext* opCtx,
TxnNumber txnNumber,
boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction) {
+ boost::optional<bool> startTransaction,
+ bool* canKillCursors) {
// Check whether the session information needs to be refreshed from disk.
_checkValid(wl);
@@ -481,6 +506,11 @@ void Session::_beginOrContinueTxn(WithLock wl,
// be >= the active transaction number.
_checkTxnValid(wl, txnNumber);
+ ON_BLOCK_EXIT([this, opCtx] {
+ Session::TransactionState::get(opCtx).requiresIXReadUpgrade =
+ _txnState == MultiDocumentTransactionState::kInProgress;
+ });
+
//
// Continue an active transaction.
//
@@ -515,7 +545,7 @@ void Session::_beginOrContinueTxn(WithLock wl,
// implicitly abort the transaction. It is not safe to continue the transaction, in
// particular because we have not saved the readConcern from the first statement of
// the transaction.
- _abortTransaction(wl);
+ _abortTransaction(wl, opCtx, canKillCursors);
uasserted(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << txnNumber << " has been aborted.");
}
@@ -548,7 +578,7 @@ void Session::_beginOrContinueTxn(WithLock wl,
(serverGlobalParams.featureCompatibility.getVersion() ==
ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40));
- _setActiveTxn(wl, txnNumber);
+ _setActiveTxn(wl, opCtx, txnNumber, canKillCursors);
_autocommit = false;
_txnState = MultiDocumentTransactionState::kInProgress;
_transactionExpireDate =
@@ -556,7 +586,7 @@ void Session::_beginOrContinueTxn(WithLock wl,
} else {
// Execute a retryable write or snapshot read.
invariant(startTransaction == boost::none);
- _setActiveTxn(wl, txnNumber);
+ _setActiveTxn(wl, opCtx, txnNumber, canKillCursors);
_autocommit = true;
_txnState = MultiDocumentTransactionState::kNone;
}
@@ -637,9 +667,9 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
stdx::unique_lock<stdx::mutex> lg(_mutex);
- // Always check '_activeTxnNumber', since it can be modified by migration, which does not check
- // out the session. We intentionally do not error if _txnState=kAborted, since we expect this
- // function to be called at the end of the 'abortTransaction' command.
+ // Always check '_activeTxnNumber', since it can be modified by migration, which does not
+ // check out the session. We intentionally do not error if _txnState=kAborted, since we
+ // expect this function to be called at the end of the 'abortTransaction' command.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
if (_txnState != MultiDocumentTransactionState::kInProgress &&
@@ -648,15 +678,15 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
return;
}
- if (_txnState == MultiDocumentTransactionState::kInSnapshotRead && !opCtx->hasStashedCursor()) {
+ if (_txnState == MultiDocumentTransactionState::kInSnapshotRead &&
+ !_cursorExistsFunction(_sessionId, _activeTxnNumber)) {
// The snapshot read is complete.
invariant(opCtx->getWriteUnitOfWork());
_commitTransaction(std::move(lg), opCtx);
- return;
+ } else {
+ invariant(!_txnResourceStash);
+ _txnResourceStash = TxnResources(opCtx);
}
-
- invariant(!_txnResourceStash);
- _txnResourceStash = TxnResources(opCtx);
}
void Session::unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName) {
@@ -737,45 +767,85 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st
}
}
-void Session::abortArbitraryTransaction() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _abortArbitraryTransaction(lock);
+void Session::abortArbitraryTransaction(OperationContext* opCtx, bool shouldKillClientCursors) {
+ TxnNumber txnNumberAtStart;
+ bool canKillCursors = false;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
+ _abortArbitraryTransaction(lock, opCtx, &canKillCursors);
+ }
+
+ if (shouldKillClientCursors && canKillCursors) {
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
+ }
}
-void Session::abortArbitraryTransactionIfExpired() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) {
- return;
+void Session::abortArbitraryTransactionIfExpired(OperationContext* opCtx) {
+ TxnNumber txnNumberAtStart;
+ bool canKillCursors = false;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) {
+ return;
+ }
+ txnNumberAtStart = _activeTxnNumber;
+ _abortArbitraryTransaction(lock, opCtx, &canKillCursors);
+ }
+
+ if (canKillCursors) {
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
}
- _abortArbitraryTransaction(lock);
}
-void Session::_abortArbitraryTransaction(WithLock lock) {
+void Session::_abortArbitraryTransaction(WithLock lock,
+ OperationContext* opCtx,
+ bool* canKillCursors) {
if (_txnState != MultiDocumentTransactionState::kInProgress &&
_txnState != MultiDocumentTransactionState::kInSnapshotRead) {
return;
}
- _abortTransaction(lock);
+
+ _abortTransaction(lock, opCtx, canKillCursors);
}
void Session::abortActiveTransaction(OperationContext* opCtx) {
- stdx::lock_guard<Client> clientLock(*opCtx->getClient());
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ TxnNumber txnNumberAtStart;
+ bool canKillCursors = false;
+ {
+ stdx::unique_lock<Client> clientLock(*opCtx->getClient());
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
- if (_txnState != MultiDocumentTransactionState::kInProgress &&
- _txnState != MultiDocumentTransactionState::kInSnapshotRead) {
- return;
- }
+ if (_txnState != MultiDocumentTransactionState::kInProgress &&
+ _txnState != MultiDocumentTransactionState::kInSnapshotRead) {
+ return;
+ }
- _abortTransaction(lock);
+ _abortTransaction(lock, opCtx, &canKillCursors);
- // Abort the WUOW. We should be able to abort empty transactions that don't have WUOW.
- if (opCtx->getWriteUnitOfWork()) {
- opCtx->setWriteUnitOfWork(nullptr);
+ // Abort the WUOW. We should be able to abort empty transactions that don't have WUOW.
+ if (opCtx->getWriteUnitOfWork()) {
+ opCtx->setWriteUnitOfWork(nullptr);
+ }
+ }
+ if (canKillCursors) {
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
}
}
-void Session::_abortTransaction(WithLock wl) {
+void Session::killTransactionCursors(OperationContext* opCtx) {
+ TxnNumber txnNumberAtStart;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
+ }
+
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
+}
+
+void Session::_abortTransaction(WithLock wl, OperationContext* opCtx, bool* canKillCursors) {
+ invariant(canKillCursors);
// TODO SERVER-33432 Disallow aborting committed transaction after we implement implicit abort.
// A transaction in kCommitting state will either commit or abort for storage-layer reasons; it
// is too late to abort externally.
@@ -787,9 +857,13 @@ void Session::_abortTransaction(WithLock wl) {
_transactionOperationBytes = 0;
_transactionOperations.clear();
_txnState = MultiDocumentTransactionState::kAborted;
+ *canKillCursors = true;
}
-void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) {
+void Session::_beginOrContinueTxnOnMigration(WithLock wl,
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ bool* canKillCursors) {
_checkValid(wl);
_checkTxnValid(wl, txnNumber);
@@ -797,14 +871,17 @@ void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) {
if (txnNumber == _activeTxnNumber)
return;
- _setActiveTxn(wl, txnNumber);
+ _setActiveTxn(wl, opCtx, txnNumber, canKillCursors);
}
-void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) {
+void Session::_setActiveTxn(WithLock wl,
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ bool* canKillCursors) {
// Abort the existing transaction if it's not committed or aborted.
if (_txnState == MultiDocumentTransactionState::kInProgress ||
_txnState == MultiDocumentTransactionState::kInSnapshotRead) {
- _abortTransaction(wl);
+ _abortTransaction(wl, opCtx, canKillCursors);
}
_activeTxnNumber = txnNumber;
_activeTxnCommittedStatements.clear();
@@ -851,14 +928,19 @@ std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations(
}
void Session::commitTransaction(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ TxnNumber txnNumberAtStart;
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ txnNumberAtStart = _activeTxnNumber;
- // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
- // and migration, which do not check out the session.
- _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
+ // Always check '_activeTxnNumber' and '_txnState', since they can be modified by
+ // session kill and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
- invariant(_txnState != MultiDocumentTransactionState::kCommitted);
- _commitTransaction(std::move(lk), opCtx);
+ invariant(_txnState != MultiDocumentTransactionState::kCommitted);
+ _commitTransaction(std::move(lk), opCtx);
+ }
+ _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart);
}
void Session::_commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx) {
@@ -930,6 +1012,19 @@ void Session::reportStashedState(BSONObjBuilder* builder) const {
}
}
+// TODO SERVER-34395: Remove opCtx from this interface once no longer required.
+void Session::_killTransactionCursors(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ invariant(_cursorKillFunction);
+
+ if (!opCtx) {
+ return;
+ }
+
+ _cursorKillFunction(opCtx, lsid, txnNumber);
+}
+
void Session::_checkValid(WithLock) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Session " << getSessionId()
@@ -1035,7 +1130,10 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
// entry gets invalidated and immediately refreshed while there were no writes for
// newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
// and we will fail to update the cache even though the write was successful.
- _beginOrContinueTxn(lg, newTxnNumber, boost::none, boost::none);
+ OperationContext* opCtx = nullptr;
+ bool ignoredCanKillCursors = false;
+ _beginOrContinueTxn(
+ lg, opCtx, newTxnNumber, boost::none, boost::none, &ignoredCanKillCursors);
}
if (newTxnNumber == _activeTxnNumber) {
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index a8b10f9da66..9240e8fe0af 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -62,6 +62,12 @@ class Session {
MONGO_DISALLOW_COPYING(Session);
public:
+ struct TransactionState {
+ static const OperationContext::Decoration<TransactionState> get;
+
+ bool requiresIXReadUpgrade = false;
+ };
+
/**
* Holds state for a snapshot read or multi-statement transaction in between network operations.
*/
@@ -100,6 +106,9 @@ public:
};
using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
+ using CursorKillFunction =
+ std::function<size_t(OperationContext*, LogicalSessionId, TxnNumber)>;
+ using CursorExistsFunction = std::function<bool(LogicalSessionId, TxnNumber)>;
static const BSONObj kDeadEndSentinel;
@@ -239,6 +248,20 @@ public:
void unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName);
/**
+ * Registers a function that will be used to kill client cursors on transaction commit or abort.
+ * TODO SERVER-34395: Move cursor kill function into Session instead of registering.
+ */
+ static void registerCursorKillFunction(CursorKillFunction cursorKillFunc) {
+ _cursorKillFunction = cursorKillFunc;
+ }
+
+ // TODO SERVER-34113: Remove the "cursor exists" mechanism from both Session and CursorManager
+ // once snapshot reads outside of multi-statement transcactions are no longer supported.
+ static void registerCursorExistsFunction(CursorExistsFunction cursorExistsFunc) {
+ _cursorExistsFunction = cursorExistsFunc;
+ }
+
+ /**
* Commits the transaction, including committing the write unit of work and updating
* transaction state.
*/
@@ -247,13 +270,13 @@ public:
/**
* Aborts the transaction outside the transaction, releasing transaction resources.
*/
- void abortArbitraryTransaction();
+ void abortArbitraryTransaction(OperationContext* opCtx, bool shouldKillClientCursors);
/**
* Same as abortArbitraryTransaction, except only executes if _transactionExpireDate indicates
* that the transaction has expired.
*/
- void abortArbitraryTransactionIfExpired();
+ void abortArbitraryTransactionIfExpired(OperationContext* opCtx);
/*
* Aborts the transaction inside the transaction, releasing transaction resources.
@@ -262,6 +285,11 @@ public:
*/
void abortActiveTransaction(OperationContext* opCtx);
+ /**
+ * Kills any open client cursors associated with the current transaction.
+ */
+ void killTransactionCursors(OperationContext* opCtx);
+
bool getAutocommit() const {
return _autocommit;
}
@@ -347,12 +375,23 @@ public:
const repl::OplogEntry& entry);
private:
+ // Holds function to be used to kill client cursors.
+ static CursorKillFunction _cursorKillFunction;
+ // Holds function which determines whether the CursorManager has client cursor references for a
+ // given transaction.
+ static CursorExistsFunction _cursorExistsFunction;
+
void _beginOrContinueTxn(WithLock,
+ OperationContext* opCtx,
TxnNumber txnNumber,
boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction);
+ boost::optional<bool> startTransaction,
+ bool* canKillCursors);
- void _beginOrContinueTxnOnMigration(WithLock, TxnNumber txnNumber);
+ void _beginOrContinueTxnOnMigration(WithLock,
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ bool* canKillCursors);
// Checks if there is a conflicting operation on the current Session
void _checkValid(WithLock) const;
@@ -361,7 +400,10 @@ private:
// we don't start a txn that is too old.
void _checkTxnValid(WithLock, TxnNumber txnNumber) const;
- void _setActiveTxn(WithLock, TxnNumber txnNumber);
+ void _setActiveTxn(WithLock,
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ bool* canKillCursors);
void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber, bool checkAbort) const;
@@ -379,10 +421,12 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
- void _abortArbitraryTransaction(WithLock);
+ void _abortArbitraryTransaction(WithLock, OperationContext* opCtx, bool* canKillCursors);
// Releases stashed transaction resources to abort the transaction.
- void _abortTransaction(WithLock);
+ // 'canKillCursors' is an output parameter, which when set to true indicates that transaction
+ // client cursors may be killed.
+ void _abortTransaction(WithLock, OperationContext* opCtx, bool* canKillCursors);
// Committing a transaction first changes its state to "Committing" and writes to the oplog,
// then it changes the state to "Committed".
@@ -398,6 +442,10 @@ private:
// 3) Migration. Should be able to skip committing transactions.
void _commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx);
+ void _killTransactionCursors(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber);
+
const LogicalSessionId _sessionId;
// Protects the member variables below.
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 68151010483..922e4143130 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -200,9 +200,9 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
DirectClientSetter inDirectClient(opCtx());
OperationContextSession innerScopedSession(opCtx(), true, boost::none, boost::none);
- // Indicate that there is a stashed cursor. If we were not in a nested session, this
- // would ensure that stashing is not a noop.
- opCtx()->setStashedCursor();
+ // Report to Session that there is a stashed cursor. If we were not in a nested session,
+ // this would ensure that stashing is not a noop.
+ Session::registerCursorExistsFunction([](LogicalSessionId, TxnNumber) { return true; });
OperationContextSession::get(opCtx())->stashTransactionResources(opCtx());
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index fd4a5674139..9c48a73e8e0 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/net/sock.h"
@@ -49,6 +50,7 @@ namespace {
const NamespaceString kNss("TestDB", "TestColl");
const OptionalCollectionUUID kUUID;
+const bool kKillCursors = true;
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
@@ -160,6 +162,14 @@ protected:
}
};
+size_t noopKillCursorFunction(OperationContext*, LogicalSessionId, TxnNumber) {
+ return 0;
+};
+
+bool noopCursorExistsFunction(LogicalSessionId, TxnNumber) {
+ return false;
+};
+
TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
@@ -569,6 +579,21 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
}
+DEATH_TEST_F(SessionTest, CommitWithoutCursorKillFunctionInvariants, "_cursorKillFunction") {
+ Session::registerCursorKillFunction(nullptr);
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false, true);
+ session.unstashTransactionResources(opCtx(), "find");
+
+ session.commitTransaction(opCtx());
+}
+
TEST_F(SessionTest, StashAndUnstashResources) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
@@ -580,10 +605,12 @@ TEST_F(SessionTest, StashAndUnstashResources) {
ASSERT(originalLocker);
ASSERT(originalRecoveryUnit);
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
- session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none);
+ session.beginOrContinueTxn(opCtx(), txnNum, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
@@ -604,7 +631,6 @@ TEST_F(SessionTest, StashAndUnstashResources) {
ASSERT(lk.isLocked());
// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
- opCtx()->setStashedCursor();
session.stashTransactionResources(opCtx());
ASSERT_NOT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_NOT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
@@ -625,6 +651,8 @@ TEST_F(SessionTest, StashAndUnstashResources) {
}
TEST_F(SessionTest, ReportStashedResources) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
opCtx()->setLogicalSessionId(sessionId);
@@ -636,7 +664,7 @@ TEST_F(SessionTest, ReportStashedResources) {
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
- session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none);
+ session.beginOrContinueTxn(opCtx(), txnNum, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
@@ -673,7 +701,6 @@ TEST_F(SessionTest, ReportStashedResources) {
fillLockerInfo(*lockerInfo, reportBuilder);
// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
- opCtx()->setStashedCursor();
session.stashTransactionResources(opCtx());
ASSERT(!opCtx()->getWriteUnitOfWork());
@@ -696,6 +723,8 @@ TEST_F(SessionTest, ReportStashedResources) {
}
TEST_F(SessionTest, CannotSpecifyStartTransactionOnInProgressTxn) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -818,6 +847,8 @@ TEST_F(SessionTest, TransactionsOnlyPermitAllowedReadPreferences) {
}
TEST_F(SessionTest, SameTransactionPreservesStoredStatements) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -845,6 +876,8 @@ TEST_F(SessionTest, SameTransactionPreservesStoredStatements) {
}
TEST_F(SessionTest, AbortClearsStoredStatements) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -860,7 +893,7 @@ TEST_F(SessionTest, AbortClearsStoredStatements) {
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); }
session.stashTransactionResources(opCtx());
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
ASSERT_TRUE(session.transactionOperationsForTest().empty());
ASSERT_TRUE(session.transactionIsAborted());
}
@@ -868,6 +901,8 @@ TEST_F(SessionTest, AbortClearsStoredStatements) {
// This test makes sure the commit machinery works even when no operations are done on the
// transaction.
TEST_F(SessionTest, EmptyTransactionCommit) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -887,6 +922,8 @@ TEST_F(SessionTest, EmptyTransactionCommit) {
// This test makes sure the abort machinery works even when no operations are done on the
// transaction.
TEST_F(SessionTest, EmptyTransactionAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -899,11 +936,13 @@ TEST_F(SessionTest, EmptyTransactionAbort) {
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); }
session.stashTransactionResources(opCtx());
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
ASSERT_TRUE(session.transactionIsAborted());
}
TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -914,7 +953,7 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) {
session.beginOrContinueTxn(opCtx(), txnNum, false, true);
// The transaction may be aborted without checking out the session.
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
// An unstash after an abort should uassert.
ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx(), "find"),
@@ -923,6 +962,8 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) {
}
TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -950,6 +991,8 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) {
}
TEST_F(SessionTest, ConcurrencyOfStashAndAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -962,13 +1005,15 @@ TEST_F(SessionTest, ConcurrencyOfStashAndAbort) {
session.unstashTransactionResources(opCtx(), "find");
// The transaction may be aborted without checking out the session.
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
// A stash after an abort should be a noop.
session.stashTransactionResources(opCtx());
}
TEST_F(SessionTest, ConcurrencyOfStashAndMigration) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -993,6 +1038,8 @@ TEST_F(SessionTest, ConcurrencyOfStashAndMigration) {
}
TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1005,7 +1052,7 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) {
session.unstashTransactionResources(opCtx(), "insert");
// The transaction may be aborted without checking out the session.
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
// An addTransactionOperation() after an abort should uassert.
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
@@ -1015,6 +1062,8 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) {
}
TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1040,6 +1089,8 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) {
}
TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1052,7 +1103,7 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
session.unstashTransactionResources(opCtx(), "insert");
// The transaction may be aborted without checking out the session.
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
// An endTransactionAndRetrieveOperations() after an abort should uassert.
ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()),
@@ -1061,6 +1112,8 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
}
TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1086,6 +1139,8 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration
}
TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
@@ -1098,7 +1153,7 @@ TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) {
session.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction may be aborted without checking out the session.
- session.abortArbitraryTransaction();
+ session.abortArbitraryTransaction(opCtx(), kKillCursors);
// An commitTransaction() after an abort should uassert.
ASSERT_THROWS_CODE(
@@ -1106,6 +1161,8 @@ TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) {
}
TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndMigration) {
+ Session::registerCursorKillFunction(noopKillCursorFunction);
+ Session::registerCursorExistsFunction(noopCursorExistsFunction);
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());