diff options
author | James Wahlin <james@mongodb.com> | 2018-03-27 11:20:13 -0400 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2018-04-16 14:33:12 -0400 |
commit | 9652d252a2932fc0096704fccb1152b6b290fe6f (patch) | |
tree | b5a6b0f0f23ef1c1e9c9924c37e8d1d5eedc39e1 /src/mongo | |
parent | c02574298a711b6de8a3d89cedcfe98040a6f55b (diff) | |
download | mongo-9652d252a2932fc0096704fccb1152b6b290fe6f.tar.gz |
SERVER-33690 Transaction abort and commit should kill any associated client cursors
Diffstat (limited to 'src/mongo')
25 files changed, 660 insertions, 127 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 875638e46c2..f00f42cc9fd 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -399,8 +399,6 @@ public: } pinnedCursor.getCursor()->setPos(numResults); - opCtx->setStashedCursor(); - // Fill out curop based on the results. endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId); } else { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e62e3667afd..83188a92793 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -217,10 +217,6 @@ public: const GetMoreRequest& request, const BSONObj& cmdObj, BSONObjBuilder& result) { - // If we return early without freeing the cursor, indicate we have a stashed cursor, so that - // transaction state is stashed. - ScopeGuard stashedCursorIndicator = MakeGuard(&OperationContext::setStashedCursor, opCtx); - auto curOp = CurOp::get(opCtx); curOp->debug().cursorid = request.cursorid; @@ -355,10 +351,8 @@ public: return CommandHelpers::appendCommandStatus(result, status); } - // On early return, get rid of the cursor. We should no longer indicate we have a stashed - // cursor on early return. + // On early return, get rid of the cursor. ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &ccPin.getValue()); - stashedCursorIndicator.Dismiss(); const auto replicationMode = repl::ReplicationCoordinator::get(opCtx)->getReplicationMode(); opCtx->recoveryUnit()->setReadConcernLevelAndReplicationMode(cursor->getReadConcernLevel(), @@ -472,8 +466,6 @@ public: cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); cursor->incPos(numResults); - - opCtx->setStashedCursor(); } else { curOp->debug().cursorExhausted = true; } diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index ae3818ab9ea..ea18c824716 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -174,7 +174,6 @@ public: bucketsBuilder.append(threadResult.obj()); } result.appendArray("cursors", bucketsBuilder.obj()); - opCtx->setStashedCursor(); return true; } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 3529cce56e9..2f068165ed9 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -529,7 +529,6 @@ Status runAggregate(OperationContext* opCtx, const bool keepCursor = handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); if (keepCursor) { - opCtx->setStashedCursor(); cursorFreer.Dismiss(); } } diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index 39c82910bcb..d6c00d996d2 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" @@ -57,6 +58,21 @@ #include "mongo/util/startup_test.h" namespace mongo { + +MONGO_INITIALIZER(RegisterCursorKillFunction) +(InitializerContext* const) { + Session::registerCursorKillFunction( + [](OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { + return CursorManager::killAllCursorsForTransaction(opCtx, lsid, txnNumber); + }); + + Session::registerCursorExistsFunction([](LogicalSessionId lsid, TxnNumber txnNumber) { + return CursorManager::hasTransactionCursorReference(lsid, txnNumber); + }); + + return Status::OK(); +} + using std::vector; constexpr int CursorManager::kNumPartitions; @@ -107,11 +123,32 @@ public: int64_t nextSeed(); + void addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId); + + void removeTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + CursorId cursorId); + + size_t numOpenCursorsForTransaction(LogicalSessionId lsid, TxnNumber txnNumber); + + size_t killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber); + private: + // '_mutex' must not be held when acquiring a CursorManager mutex to avoid deadlock. SimpleMutex _mutex; - typedef stdx::unordered_map<unsigned, NamespaceString> Map; - Map _idToNss; + using CursorIdToNssMap = stdx::unordered_map<CursorId, NamespaceString>; + using TxnNumberToCursorMap = stdx::unordered_map<TxnNumber, CursorIdToNssMap>; + using LsidToTxnCursorMap = LogicalSessionIdMap<TxnNumberToCursorMap>; + using IdToNssMap = stdx::unordered_map<unsigned, NamespaceString>; + + LsidToTxnCursorMap _lsidToTxnCursorMap; + IdToNssMap _idToNss; unsigned _nextId; std::unique_ptr<SecureRandom> _secureRandom; @@ -144,6 +181,72 @@ int64_t GlobalCursorIdCache::nextSeed() { return _secureRandom->nextInt64(); } +void GlobalCursorIdCache::addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + invariant(_lsidToTxnCursorMap[lsid][txnNumber].insert({cursorId, nss}).second == true, + "Expected insert to succeed"); +} + +void GlobalCursorIdCache::removeTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + CursorId cursorId) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + invariant(_lsidToTxnCursorMap[lsid][txnNumber].erase(cursorId) == 1); // cursorId was erased. + + if (_lsidToTxnCursorMap[lsid][txnNumber].size() == 0) { + invariant(_lsidToTxnCursorMap[lsid].erase(txnNumber) == 1); + if (_lsidToTxnCursorMap[lsid].size() == 0) { + invariant(_lsidToTxnCursorMap.erase(lsid) == 1); + } + } +} + +size_t GlobalCursorIdCache::numOpenCursorsForTransaction(LogicalSessionId lsid, + TxnNumber txnNumber) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + if (_lsidToTxnCursorMap.count(lsid) == 0 || _lsidToTxnCursorMap[lsid].count(txnNumber) == 0) { + return 0; + } + return _lsidToTxnCursorMap[lsid][txnNumber].size(); +} + +size_t GlobalCursorIdCache::killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + size_t killCount = 0; + CursorIdToNssMap cursorToNssMap; + { + // Copy the cursorId/NamespaceString map to a local structure to avoid obtaining the + // CursorManager mutex while holding the GlobalCursorIdCache mutex. + stdx::lock_guard<SimpleMutex> lk(_mutex); + for (auto&& cursorIdAndNss : _lsidToTxnCursorMap[lsid][txnNumber]) { + cursorToNssMap.insert(cursorIdAndNss); + } + } + + for (auto&& cursorIdAndNss : cursorToNssMap) { + const auto cursorId = cursorIdAndNss.first; + const auto nss = cursorIdAndNss.second; + + auto status = CursorManager::withCursorManager( + opCtx, cursorId, nss, [opCtx, cursorId, lsid, txnNumber](CursorManager* manager) { + const auto shouldAudit = false; + return manager->killCursor(opCtx, cursorId, shouldAudit, lsid, txnNumber); + }); + + if (status.isOK()) { + ++killCount; + } + + invariant(status.isOK() || status.code() == ErrorCodes::CursorNotFound); + } + + return killCount; +} + uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { static const uint32_t kMaxIds = 1000 * 1000 * 1000; static_assert((kMaxIds & (0b11 << 30)) == 0, @@ -187,7 +290,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool } else { stdx::lock_guard<SimpleMutex> lk(_mutex); uint32_t nsid = idFromCursorId(id); - Map::const_iterator it = _idToNss.find(nsid); + IdToNssMap::const_iterator it = _idToNss.find(nsid); if (it == _idToNss.end()) { // No namespace corresponding to this cursor id prefix. TODO: Consider writing to // audit log here (even though we don't have a namespace). @@ -355,10 +458,35 @@ std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions( return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled()); } +void CursorManager::addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId) { + globalCursorIdCache->addTransactionCursorReference(lsid, txnNumber, nss, cursorId); +} + +void CursorManager::removeTransactionCursorReference(const ClientCursor* cursor) { + // Remove cursor transaction registration if needed. + if (cursor->_lsid && cursor->_txnNumber) { + globalCursorIdCache->removeTransactionCursorReference( + *cursor->_lsid, *cursor->_txnNumber, cursor->_cursorid); + } +} + std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { return globalCursorIdCache->timeoutCursors(opCtx, now); } +size_t CursorManager::killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + return globalCursorIdCache->killAllCursorsForTransaction(opCtx, lsid, txnNumber); +} + +bool CursorManager::hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber) { + return globalCursorIdCache->numOpenCursorsForTransaction(lsid, txnNumber) > 0; +} + int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) { ConstDataCursor ids(_ids); int numDeleted = 0; @@ -455,6 +583,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx, // responsible for cleaning it up. Otherwise we can immediately dispose of it. if (cursor->_operationUsingCursor) { it = partition.erase(it); + removeTransactionCursorReference(cursor); continue; } @@ -463,6 +592,7 @@ void CursorManager::invalidateAll(OperationContext* opCtx, // result in a useful error message. ++it; } else { + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; it = partition.erase(it); @@ -514,6 +644,7 @@ std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { auto* cursor = it->second; if (cursorShouldTimeout_inlock(cursor, now)) { // Dispose of the cursor and remove it from the partition. + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); toDelete.push_back(std::unique_ptr<ClientCursor, ClientCursor::Deleter>{cursor}); it = lockedPartition->erase(it); @@ -561,6 +692,7 @@ StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx, // This cursor was killed while it was idle. Status error = cursor->getExecutor()->getKillStatus(); lockedPartition->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; return error; @@ -606,6 +738,7 @@ void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { LOG(0) << "removing cursor " << cursor->cursorid() << " after completing batch: " << interruptStatus; partition->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); cursor->dispose(opCtx); delete cursor; } else if (!interruptStatus.isOK()) { @@ -711,6 +844,13 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor( new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now)); + // Register this cursor for lookup by transaction. + if (opCtx->getLogicalSessionId() && opCtx->getTxnNumber()) { + invariant(opCtx->getLogicalSessionId()); + addTransactionCursorReference( + *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), cursorParams.nss, cursorId); + } + // Transfer ownership of the cursor to '_cursorMap'. auto partition = _cursorMap->lockOnePartition(cursorId); ClientCursor* unownedCursor = clientCursor.release(); @@ -718,11 +858,16 @@ ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, return ClientCursorPin(opCtx, unownedCursor); } -void CursorManager::deregisterCursor(ClientCursor* cc) { - _cursorMap->erase(cc->cursorid()); +void CursorManager::deregisterCursor(ClientCursor* cursor) { + _cursorMap->erase(cursor->cursorid()); + removeTransactionCursorReference(cursor); } -Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { +Status CursorManager::killCursor(OperationContext* opCtx, + CursorId id, + bool shouldAudit, + boost::optional<LogicalSessionId> lsid, + boost::optional<TxnNumber> txnNumber) { auto lockedPartition = _cursorMap->lockOnePartition(id); auto it = lockedPartition->find(id); if (it == lockedPartition->end()) { @@ -734,6 +879,17 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou } auto cursor = it->second; + if (lsid && lsid != cursor->getSessionId()) { + return { + ErrorCodes::CursorNotFound, + str::stream() << "killCursor LogicalSessionId must match that of cursor when provided"}; + } + + if (txnNumber && txnNumber != cursor->getTxnNumber()) { + return {ErrorCodes::CursorNotFound, + str::stream() << "killCursor TxnNumber must match that of cursor when provided"}; + } + if (cursor->_operationUsingCursor) { // Rather than removing the cursor directly, kill the operation that's currently using the // cursor. It will stop on its own (and remove the cursor) when it sees that it's been @@ -756,9 +912,9 @@ Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shou } lockedPartition->erase(ownedCursor->cursorid()); + cursor->_cursorManager->removeTransactionCursorReference(cursor); ownedCursor->dispose(opCtx); return Status::OK(); - ; } Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) { diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h index 90895407f53..2104a558bae 100644 --- a/src/mongo/db/cursor_manager.h +++ b/src/mongo/db/cursor_manager.h @@ -100,6 +100,20 @@ public: static std::pair<Status, int> killCursorsWithMatchingSessions( OperationContext* opCtx, const SessionKiller::Matcher& matcher); + /** + * Kills all cursors with matching logical session and transaction number. Returns the number of + * cursors successfully killed. + */ + static size_t killAllCursorsForTransaction(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber); + + /** + * Returns true if the CursorManager has cursor references for the given session ID and + * transaction number. + */ + static bool hasTransactionCursorReference(LogicalSessionId lsid, TxnNumber txnNumber); + CursorManager(NamespaceString nss); /** @@ -110,7 +124,8 @@ public: /** * Kills all managed query executors and ClientCursors. Callers must have exclusive access to - * the collection (i.e. must have the collection, databse, or global resource locked in MODE_X). + * the collection (i.e. must have the collection, database, or global resource locked in + * MODE_X). * * 'collectionGoingAway' indicates whether the Collection instance is being deleted. This could * be because the db is being closed, or the collection/db is being dropped. @@ -182,9 +197,14 @@ public: * Returns ErrorCodes::CursorNotFound if the cursor id is not owned by this manager. Returns * ErrorCodes::OperationFailed if attempting to erase a pinned cursor. * - * If 'shouldAudit' is true, will perform audit logging. + * If 'shouldAudit' is true, will perform audit logging. If 'lsid' or 'txnNumber' are provided + * we will confirm that the cursor is owned by the given session or transaction. */ - Status killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit); + Status killCursor(OperationContext* opCtx, + CursorId id, + bool shouldAudit, + boost::optional<LogicalSessionId> lsid = boost::none, + boost::optional<TxnNumber> txnNumber = boost::none); /** * Returns an OK status if we're authorized to erase the cursor. Otherwise, returns @@ -256,12 +276,22 @@ private: struct PlanExecutorPartitioner { std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions); }; + + // Adds a CursorId to structure that allows for lookup by LogicalSessionId and TxnNumber. + void addTransactionCursorReference(LogicalSessionId lsid, + TxnNumber txnNumber, + NamespaceString nss, + CursorId cursorId); + + // Removes a CursorId from the LogicalSessionId / TxnNumber lookup structure. + void removeTransactionCursorReference(const ClientCursor* cursor); + CursorId allocateCursorId_inlock(); ClientCursorPin _registerCursor( OperationContext* opCtx, std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor); - void deregisterCursor(ClientCursor* cc); + void deregisterCursor(ClientCursor* cursor); void unpin(OperationContext* opCtx, ClientCursor* cursor); diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 03c3ee52ec5..73032902a49 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -366,8 +366,7 @@ LockMode getLockModeForQuery(OperationContext* opCtx) { invariant(opCtx); // Use IX locks for autocommit:false multi-statement transactions; otherwise, use IS locks. - auto session = OperationContextSession::get(opCtx); - if (session && session->inMultiDocumentTransaction()) { + if (Session::TransactionState::get(opCtx).requiresIXReadUpgrade) { return MODE_IX; } return MODE_IS; diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp index e75f19aae33..265f7be65e1 100644 --- a/src/mongo/db/exec/subplan.cpp +++ b/src/mongo/db/exec/subplan.cpp @@ -39,7 +39,6 @@ #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/matcher/extensions_callback_real.h" -#include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/planner_access.h" #include "mongo/db/query/planner_analysis.h" diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 1eec2904850..1ab5238485f 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -51,10 +51,19 @@ void killSessionsLocalKillCursors(OperationContext* opCtx, const SessionKiller:: } // namespace void killSessionsLocalKillTransactions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { + const SessionKiller::Matcher& matcher, + bool shouldKillClientCursors) { + SessionCatalog::get(opCtx)->scanSessions( + opCtx, matcher, [shouldKillClientCursors](OperationContext* opCtx, Session* session) { + session->abortArbitraryTransaction(opCtx, shouldKillClientCursors); + }); +} + +void killSessionsLocalKillTransactionCursors(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { SessionCatalog::get(opCtx)->scanSessions( opCtx, matcher, [](OperationContext* opCtx, Session* session) { - session->abortArbitraryTransaction(); + session->killTransactionCursors(opCtx); }); } @@ -72,7 +81,7 @@ void killAllExpiredTransactions(OperationContext* opCtx) { KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); SessionCatalog::get(opCtx)->scanSessions( opCtx, matcherAllSessions, [](OperationContext* opCtx, Session* session) { - session->abortArbitraryTransactionIfExpired(); + session->abortArbitraryTransactionIfExpired(opCtx); }); } diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h index a21bcec4ae5..2022a0fda05 100644 --- a/src/mongo/db/kill_sessions_local.h +++ b/src/mongo/db/kill_sessions_local.h @@ -46,7 +46,14 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx, * Kills all transactions on mongod for sessions matching 'matcher'. */ void killSessionsLocalKillTransactions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); + const SessionKiller::Matcher& matcher, + bool shouldKillClientCursors = true); + +/** + * Kills all transactions cursors on mongod for sessions matching 'matcher'. + */ +void killSessionsLocalKillTransactionCursors(OperationContext* opCtx, + const SessionKiller::Matcher& matcher); /** * Aborts any expired transactions. diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index dc30d6dfd17..07980165e33 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -411,20 +411,6 @@ public: */ Microseconds getRemainingMaxTimeMicros() const; - /** - * Indicate that the current network operation will leave an open client cursor on completion. - */ - void setStashedCursor() { - _hasStashedCursor = true; - } - - /** - * Returns whether the current network operation will leave an open client cursor on completion. - */ - bool hasStashedCursor() { - return _hasStashedCursor; - } - private: /** * Returns true if this operation has a deadline and it has passed according to the fast clock @@ -509,10 +495,6 @@ private: Timer _elapsedTime; bool _writesAreReplicated = true; - - // When true, the cursor used by this operation will be stashed for use by a subsequent network - // operation. - bool _hasStashedCursor = false; }; namespace repl { diff --git a/src/mongo/db/query/query_test_service_context.cpp b/src/mongo/db/query/query_test_service_context.cpp index d752db15664..407a0341792 100644 --- a/src/mongo/db/query/query_test_service_context.cpp +++ b/src/mongo/db/query/query_test_service_context.cpp @@ -46,9 +46,12 @@ ServiceContext::UniqueOperationContext QueryTestServiceContext::makeOperationCon } ServiceContext::UniqueOperationContext QueryTestServiceContext::makeOperationContext( - LogicalSessionId lsid) { + LogicalSessionId lsid, boost::optional<TxnNumber> txnNumber) { auto opCtx = makeOperationContext(); opCtx->setLogicalSessionId(lsid); + if (txnNumber) { + opCtx->setTxnNumber(*txnNumber); + } return opCtx; } diff --git a/src/mongo/db/query/query_test_service_context.h b/src/mongo/db/query/query_test_service_context.h index c52944cf2f5..bbf265a06ac 100644 --- a/src/mongo/db/query/query_test_service_context.h +++ b/src/mongo/db/query/query_test_service_context.h @@ -45,7 +45,8 @@ public: ServiceContext::UniqueOperationContext makeOperationContext(); - ServiceContext::UniqueOperationContext makeOperationContext(LogicalSessionId lsid); + ServiceContext::UniqueOperationContext makeOperationContext( + LogicalSessionId lsid, boost::optional<TxnNumber> txnNumber); Client* getClient() const; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index b9cd7dee465..aed9b92127a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -210,11 +210,16 @@ public: /** * Kills all operations that have a Client that is associated with an incoming user - * connection. Used during stepdown. + * connection. Also kills stashed transaction resources. Used during stepdown. */ virtual void killAllUserOperations(OperationContext* opCtx) = 0; /** + * Kills all transaction owned client cursors. Used during stepdown. + */ + virtual void killAllTransactionCursors(OperationContext* opCtx) = 0; + + /** * Resets any active sharding metadata on this server and stops any sharding-related threads * (such as the balancer). It is called after stepDown to ensure that if the node becomes * primary again in the future it will recover its state from a clean slate. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 5ec7ca490b7..c14bda1f3ec 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -643,7 +643,14 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon // Destroy all stashed transaction resources, in order to release locks. SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsLocalKillTransactions(opCtx, matcherAllSessions); + bool killCursors = false; + killSessionsLocalKillTransactions(opCtx, matcherAllSessions, killCursors); +} + +void ReplicationCoordinatorExternalStateImpl::killAllTransactionCursors(OperationContext* opCtx) { + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + killSessionsLocalKillTransactionCursors(opCtx, matcherAllSessions); } void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 242071f9cb7..f5cd7bbf82b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -92,6 +92,7 @@ public: virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); + virtual void killAllTransactionCursors(OperationContext* opCtx); virtual void shardingOnStepDownHook(); virtual void signalApplierToChooseNewSyncSource(); virtual void stopProducer(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index e0bbea95a66..f100f191f46 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -201,6 +201,8 @@ void ReplicationCoordinatorExternalStateMock::closeConnections() { void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationContext* opCtx) {} +void ReplicationCoordinatorExternalStateMock::killAllTransactionCursors(OperationContext* opCtx) {} + void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {} void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {} diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 4beafc38d5b..139402949d1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -81,6 +81,7 @@ public: virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); + virtual void killAllTransactionCursors(OperationContext* opCtx); virtual void shardingOnStepDownHook(); virtual void signalApplierToChooseNewSyncSource(); virtual void stopProducer(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 58a8b513ef9..1437bfa6991 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1583,6 +1583,10 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, "specified that we should step down for"}; } + // TODO SERVER-34395: Remove this method and kill cursors as part of killAllUserOperations call + // when the CursorManager no longer requires collection locks to kill cursors. + _externalState->killAllTransactionCursors(opCtx); + stdx::unique_lock<stdx::mutex> lk(_mutex); auto status = opCtx->checkForInterruptNoAssert(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index a73bad469fd..dd731fdcacb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -400,6 +400,10 @@ void ReplicationCoordinatorImpl::_stepDownFinish( globalExclusiveLock.waitForLockUntil(Date_t::max()); invariant(globalExclusiveLock.isLocked()); + // TODO SERVER-34395: Remove this method and kill cursors as part of killAllUserOperations call + // when the CursorManager no longer requires collection locks to kill cursors. + _externalState->killAllTransactionCursors(opCtx.get()); + stdx::unique_lock<stdx::mutex> lk(_mutex); _topCoord->finishUnconditionalStepDown(); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index a7747ccf996..1b1674be4df 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -64,6 +64,12 @@ namespace mongo { server_parameter_storage_type<int, ServerParameterType::kStartupAndRuntime>::value_type transactionLifetimeLimitSeconds(60); +const OperationContext::Decoration<Session::TransactionState> Session::TransactionState::get = + OperationContext::declareDecoration<Session::TransactionState>(); + +Session::CursorKillFunction Session::_cursorKillFunction; +Session::CursorExistsFunction Session::_cursorExistsFunction; + /** * Implements a validation function for server parameter 'transactionLifetimeLimitSeconds' * instantiated above. 'transactionLifetimeLimitSeconds' can only be set to >= 1. @@ -329,16 +335,33 @@ void Session::beginOrContinueTxn(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); - stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginOrContinueTxn(lg, txnNumber, autocommit, startTransaction); + TxnNumber txnNumberAtStart; + bool canKillCursors = false; + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + txnNumberAtStart = _activeTxnNumber; + _beginOrContinueTxn(lg, opCtx, txnNumber, autocommit, startTransaction, &canKillCursors); + } + + if (canKillCursors) { + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); + } } void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber) { invariant(!opCtx->getClient()->isInDirectClient()); invariant(!opCtx->lockState()->isLocked()); - stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginOrContinueTxnOnMigration(lg, txnNumber); + TxnNumber txnNumberAtStart; + bool canKillCursors = false; + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + txnNumberAtStart = _activeTxnNumber; + _beginOrContinueTxnOnMigration(lg, opCtx, txnNumber, &canKillCursors); + } + if (canKillCursors) { + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); + } } @@ -470,9 +493,11 @@ bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtI } void Session::_beginOrContinueTxn(WithLock wl, + OperationContext* opCtx, TxnNumber txnNumber, boost::optional<bool> autocommit, - boost::optional<bool> startTransaction) { + boost::optional<bool> startTransaction, + bool* canKillCursors) { // Check whether the session information needs to be refreshed from disk. _checkValid(wl); @@ -481,6 +506,11 @@ void Session::_beginOrContinueTxn(WithLock wl, // be >= the active transaction number. _checkTxnValid(wl, txnNumber); + ON_BLOCK_EXIT([this, opCtx] { + Session::TransactionState::get(opCtx).requiresIXReadUpgrade = + _txnState == MultiDocumentTransactionState::kInProgress; + }); + // // Continue an active transaction. // @@ -515,7 +545,7 @@ void Session::_beginOrContinueTxn(WithLock wl, // implicitly abort the transaction. It is not safe to continue the transaction, in // particular because we have not saved the readConcern from the first statement of // the transaction. - _abortTransaction(wl); + _abortTransaction(wl, opCtx, canKillCursors); uasserted(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << txnNumber << " has been aborted."); } @@ -548,7 +578,7 @@ void Session::_beginOrContinueTxn(WithLock wl, (serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40)); - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber, canKillCursors); _autocommit = false; _txnState = MultiDocumentTransactionState::kInProgress; _transactionExpireDate = @@ -556,7 +586,7 @@ void Session::_beginOrContinueTxn(WithLock wl, } else { // Execute a retryable write or snapshot read. invariant(startTransaction == boost::none); - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber, canKillCursors); _autocommit = true; _txnState = MultiDocumentTransactionState::kNone; } @@ -637,9 +667,9 @@ void Session::stashTransactionResources(OperationContext* opCtx) { stdx::lock_guard<Client> lk(*opCtx->getClient()); stdx::unique_lock<stdx::mutex> lg(_mutex); - // Always check '_activeTxnNumber', since it can be modified by migration, which does not check - // out the session. We intentionally do not error if _txnState=kAborted, since we expect this - // function to be called at the end of the 'abortTransaction' command. + // Always check '_activeTxnNumber', since it can be modified by migration, which does not + // check out the session. We intentionally do not error if _txnState=kAborted, since we + // expect this function to be called at the end of the 'abortTransaction' command. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); if (_txnState != MultiDocumentTransactionState::kInProgress && @@ -648,15 +678,15 @@ void Session::stashTransactionResources(OperationContext* opCtx) { return; } - if (_txnState == MultiDocumentTransactionState::kInSnapshotRead && !opCtx->hasStashedCursor()) { + if (_txnState == MultiDocumentTransactionState::kInSnapshotRead && + !_cursorExistsFunction(_sessionId, _activeTxnNumber)) { // The snapshot read is complete. invariant(opCtx->getWriteUnitOfWork()); _commitTransaction(std::move(lg), opCtx); - return; + } else { + invariant(!_txnResourceStash); + _txnResourceStash = TxnResources(opCtx); } - - invariant(!_txnResourceStash); - _txnResourceStash = TxnResources(opCtx); } void Session::unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName) { @@ -737,45 +767,85 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st } } -void Session::abortArbitraryTransaction() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _abortArbitraryTransaction(lock); +void Session::abortArbitraryTransaction(OperationContext* opCtx, bool shouldKillClientCursors) { + TxnNumber txnNumberAtStart; + bool canKillCursors = false; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + txnNumberAtStart = _activeTxnNumber; + _abortArbitraryTransaction(lock, opCtx, &canKillCursors); + } + + if (shouldKillClientCursors && canKillCursors) { + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); + } } -void Session::abortArbitraryTransactionIfExpired() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) { - return; +void Session::abortArbitraryTransactionIfExpired(OperationContext* opCtx) { + TxnNumber txnNumberAtStart; + bool canKillCursors = false; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) { + return; + } + txnNumberAtStart = _activeTxnNumber; + _abortArbitraryTransaction(lock, opCtx, &canKillCursors); + } + + if (canKillCursors) { + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); } - _abortArbitraryTransaction(lock); } -void Session::_abortArbitraryTransaction(WithLock lock) { +void Session::_abortArbitraryTransaction(WithLock lock, + OperationContext* opCtx, + bool* canKillCursors) { if (_txnState != MultiDocumentTransactionState::kInProgress && _txnState != MultiDocumentTransactionState::kInSnapshotRead) { return; } - _abortTransaction(lock); + + _abortTransaction(lock, opCtx, canKillCursors); } void Session::abortActiveTransaction(OperationContext* opCtx) { - stdx::lock_guard<Client> clientLock(*opCtx->getClient()); - stdx::lock_guard<stdx::mutex> lock(_mutex); + TxnNumber txnNumberAtStart; + bool canKillCursors = false; + { + stdx::unique_lock<Client> clientLock(*opCtx->getClient()); + stdx::lock_guard<stdx::mutex> lock(_mutex); + txnNumberAtStart = _activeTxnNumber; - if (_txnState != MultiDocumentTransactionState::kInProgress && - _txnState != MultiDocumentTransactionState::kInSnapshotRead) { - return; - } + if (_txnState != MultiDocumentTransactionState::kInProgress && + _txnState != MultiDocumentTransactionState::kInSnapshotRead) { + return; + } - _abortTransaction(lock); + _abortTransaction(lock, opCtx, &canKillCursors); - // Abort the WUOW. We should be able to abort empty transactions that don't have WUOW. - if (opCtx->getWriteUnitOfWork()) { - opCtx->setWriteUnitOfWork(nullptr); + // Abort the WUOW. We should be able to abort empty transactions that don't have WUOW. + if (opCtx->getWriteUnitOfWork()) { + opCtx->setWriteUnitOfWork(nullptr); + } + } + if (canKillCursors) { + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); } } -void Session::_abortTransaction(WithLock wl) { +void Session::killTransactionCursors(OperationContext* opCtx) { + TxnNumber txnNumberAtStart; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + txnNumberAtStart = _activeTxnNumber; + } + + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); +} + +void Session::_abortTransaction(WithLock wl, OperationContext* opCtx, bool* canKillCursors) { + invariant(canKillCursors); // TODO SERVER-33432 Disallow aborting committed transaction after we implement implicit abort. // A transaction in kCommitting state will either commit or abort for storage-layer reasons; it // is too late to abort externally. @@ -787,9 +857,13 @@ void Session::_abortTransaction(WithLock wl) { _transactionOperationBytes = 0; _transactionOperations.clear(); _txnState = MultiDocumentTransactionState::kAborted; + *canKillCursors = true; } -void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { +void Session::_beginOrContinueTxnOnMigration(WithLock wl, + OperationContext* opCtx, + TxnNumber txnNumber, + bool* canKillCursors) { _checkValid(wl); _checkTxnValid(wl, txnNumber); @@ -797,14 +871,17 @@ void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { if (txnNumber == _activeTxnNumber) return; - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber, canKillCursors); } -void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) { +void Session::_setActiveTxn(WithLock wl, + OperationContext* opCtx, + TxnNumber txnNumber, + bool* canKillCursors) { // Abort the existing transaction if it's not committed or aborted. if (_txnState == MultiDocumentTransactionState::kInProgress || _txnState == MultiDocumentTransactionState::kInSnapshotRead) { - _abortTransaction(wl); + _abortTransaction(wl, opCtx, canKillCursors); } _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); @@ -851,14 +928,19 @@ std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations( } void Session::commitTransaction(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + TxnNumber txnNumberAtStart; + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + txnNumberAtStart = _activeTxnNumber; - // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill - // and migration, which do not check out the session. - _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); + // Always check '_activeTxnNumber' and '_txnState', since they can be modified by + // session kill and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); - invariant(_txnState != MultiDocumentTransactionState::kCommitted); - _commitTransaction(std::move(lk), opCtx); + invariant(_txnState != MultiDocumentTransactionState::kCommitted); + _commitTransaction(std::move(lk), opCtx); + } + _killTransactionCursors(opCtx, _sessionId, txnNumberAtStart); } void Session::_commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx) { @@ -930,6 +1012,19 @@ void Session::reportStashedState(BSONObjBuilder* builder) const { } } +// TODO SERVER-34395: Remove opCtx from this interface once no longer required. +void Session::_killTransactionCursors(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + invariant(_cursorKillFunction); + + if (!opCtx) { + return; + } + + _cursorKillFunction(opCtx, lsid, txnNumber); +} + void Session::_checkValid(WithLock) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Session " << getSessionId() @@ -1035,7 +1130,10 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, // entry gets invalidated and immediately refreshed while there were no writes for // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber // and we will fail to update the cache even though the write was successful. - _beginOrContinueTxn(lg, newTxnNumber, boost::none, boost::none); + OperationContext* opCtx = nullptr; + bool ignoredCanKillCursors = false; + _beginOrContinueTxn( + lg, opCtx, newTxnNumber, boost::none, boost::none, &ignoredCanKillCursors); } if (newTxnNumber == _activeTxnNumber) { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index a8b10f9da66..9240e8fe0af 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -62,6 +62,12 @@ class Session { MONGO_DISALLOW_COPYING(Session); public: + struct TransactionState { + static const OperationContext::Decoration<TransactionState> get; + + bool requiresIXReadUpgrade = false; + }; + /** * Holds state for a snapshot read or multi-statement transaction in between network operations. */ @@ -100,6 +106,9 @@ public: }; using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; + using CursorKillFunction = + std::function<size_t(OperationContext*, LogicalSessionId, TxnNumber)>; + using CursorExistsFunction = std::function<bool(LogicalSessionId, TxnNumber)>; static const BSONObj kDeadEndSentinel; @@ -239,6 +248,20 @@ public: void unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName); /** + * Registers a function that will be used to kill client cursors on transaction commit or abort. + * TODO SERVER-34395: Move cursor kill function into Session instead of registering. + */ + static void registerCursorKillFunction(CursorKillFunction cursorKillFunc) { + _cursorKillFunction = cursorKillFunc; + } + + // TODO SERVER-34113: Remove the "cursor exists" mechanism from both Session and CursorManager + // once snapshot reads outside of multi-statement transcactions are no longer supported. + static void registerCursorExistsFunction(CursorExistsFunction cursorExistsFunc) { + _cursorExistsFunction = cursorExistsFunc; + } + + /** * Commits the transaction, including committing the write unit of work and updating * transaction state. */ @@ -247,13 +270,13 @@ public: /** * Aborts the transaction outside the transaction, releasing transaction resources. */ - void abortArbitraryTransaction(); + void abortArbitraryTransaction(OperationContext* opCtx, bool shouldKillClientCursors); /** * Same as abortArbitraryTransaction, except only executes if _transactionExpireDate indicates * that the transaction has expired. */ - void abortArbitraryTransactionIfExpired(); + void abortArbitraryTransactionIfExpired(OperationContext* opCtx); /* * Aborts the transaction inside the transaction, releasing transaction resources. @@ -262,6 +285,11 @@ public: */ void abortActiveTransaction(OperationContext* opCtx); + /** + * Kills any open client cursors associated with the current transaction. + */ + void killTransactionCursors(OperationContext* opCtx); + bool getAutocommit() const { return _autocommit; } @@ -347,12 +375,23 @@ public: const repl::OplogEntry& entry); private: + // Holds function to be used to kill client cursors. + static CursorKillFunction _cursorKillFunction; + // Holds function which determines whether the CursorManager has client cursor references for a + // given transaction. + static CursorExistsFunction _cursorExistsFunction; + void _beginOrContinueTxn(WithLock, + OperationContext* opCtx, TxnNumber txnNumber, boost::optional<bool> autocommit, - boost::optional<bool> startTransaction); + boost::optional<bool> startTransaction, + bool* canKillCursors); - void _beginOrContinueTxnOnMigration(WithLock, TxnNumber txnNumber); + void _beginOrContinueTxnOnMigration(WithLock, + OperationContext* opCtx, + TxnNumber txnNumber, + bool* canKillCursors); // Checks if there is a conflicting operation on the current Session void _checkValid(WithLock) const; @@ -361,7 +400,10 @@ private: // we don't start a txn that is too old. void _checkTxnValid(WithLock, TxnNumber txnNumber) const; - void _setActiveTxn(WithLock, TxnNumber txnNumber); + void _setActiveTxn(WithLock, + OperationContext* opCtx, + TxnNumber txnNumber, + bool* canKillCursors); void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber, bool checkAbort) const; @@ -379,10 +421,12 @@ private: std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); - void _abortArbitraryTransaction(WithLock); + void _abortArbitraryTransaction(WithLock, OperationContext* opCtx, bool* canKillCursors); // Releases stashed transaction resources to abort the transaction. - void _abortTransaction(WithLock); + // 'canKillCursors' is an output parameter, which when set to true indicates that transaction + // client cursors may be killed. + void _abortTransaction(WithLock, OperationContext* opCtx, bool* canKillCursors); // Committing a transaction first changes its state to "Committing" and writes to the oplog, // then it changes the state to "Committed". @@ -398,6 +442,10 @@ private: // 3) Migration. Should be able to skip committing transactions. void _commitTransaction(stdx::unique_lock<stdx::mutex> lk, OperationContext* opCtx); + void _killTransactionCursors(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber); + const LogicalSessionId _sessionId; // Protects the member variables below. diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 68151010483..922e4143130 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -200,9 +200,9 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) { DirectClientSetter inDirectClient(opCtx()); OperationContextSession innerScopedSession(opCtx(), true, boost::none, boost::none); - // Indicate that there is a stashed cursor. If we were not in a nested session, this - // would ensure that stashing is not a noop. - opCtx()->setStashedCursor(); + // Report to Session that there is a stashed cursor. If we were not in a nested session, + // this would ensure that stashing is not a noop. + Session::registerCursorExistsFunction([](LogicalSessionId, TxnNumber) { return true; }); OperationContextSession::get(opCtx())->stashTransactionResources(opCtx()); diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index fd4a5674139..9c48a73e8e0 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/stats/fill_locker_info.h" #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/sock.h" @@ -49,6 +50,7 @@ namespace { const NamespaceString kNss("TestDB", "TestColl"); const OptionalCollectionUUID kUUID; +const bool kKillCursors = true; /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. @@ -160,6 +162,14 @@ protected: } }; +size_t noopKillCursorFunction(OperationContext*, LogicalSessionId, TxnNumber) { + return 0; +}; + +bool noopCursorExistsFunction(LogicalSessionId, TxnNumber) { + return false; +}; + TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); @@ -569,6 +579,21 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); } +DEATH_TEST_F(SessionTest, CommitWithoutCursorKillFunctionInvariants, "_cursorKillFunction") { + Session::registerCursorKillFunction(nullptr); + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false, true); + session.unstashTransactionResources(opCtx(), "find"); + + session.commitTransaction(opCtx()); +} + TEST_F(SessionTest, StashAndUnstashResources) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 20; @@ -580,10 +605,12 @@ TEST_F(SessionTest, StashAndUnstashResources) { ASSERT(originalLocker); ASSERT(originalRecoveryUnit); + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); - session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); + session.beginOrContinueTxn(opCtx(), txnNum, false, true); repl::ReadConcernArgs readConcernArgs; ASSERT_OK(readConcernArgs.initialize(BSON("find" @@ -604,7 +631,6 @@ TEST_F(SessionTest, StashAndUnstashResources) { ASSERT(lk.isLocked()); // Stash resources. The original Locker and RecoveryUnit now belong to the stash. - opCtx()->setStashedCursor(); session.stashTransactionResources(opCtx()); ASSERT_NOT_EQUALS(originalLocker, opCtx()->lockState()); ASSERT_NOT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit()); @@ -625,6 +651,8 @@ TEST_F(SessionTest, StashAndUnstashResources) { } TEST_F(SessionTest, ReportStashedResources) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 20; opCtx()->setLogicalSessionId(sessionId); @@ -636,7 +664,7 @@ TEST_F(SessionTest, ReportStashedResources) { Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); - session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); + session.beginOrContinueTxn(opCtx(), txnNum, false, true); repl::ReadConcernArgs readConcernArgs; ASSERT_OK(readConcernArgs.initialize(BSON("find" @@ -673,7 +701,6 @@ TEST_F(SessionTest, ReportStashedResources) { fillLockerInfo(*lockerInfo, reportBuilder); // Stash resources. The original Locker and RecoveryUnit now belong to the stash. - opCtx()->setStashedCursor(); session.stashTransactionResources(opCtx()); ASSERT(!opCtx()->getWriteUnitOfWork()); @@ -696,6 +723,8 @@ TEST_F(SessionTest, ReportStashedResources) { } TEST_F(SessionTest, CannotSpecifyStartTransactionOnInProgressTxn) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -818,6 +847,8 @@ TEST_F(SessionTest, TransactionsOnlyPermitAllowedReadPreferences) { } TEST_F(SessionTest, SameTransactionPreservesStoredStatements) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -845,6 +876,8 @@ TEST_F(SessionTest, SameTransactionPreservesStoredStatements) { } TEST_F(SessionTest, AbortClearsStoredStatements) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -860,7 +893,7 @@ TEST_F(SessionTest, AbortClearsStoredStatements) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); ASSERT_TRUE(session.transactionOperationsForTest().empty()); ASSERT_TRUE(session.transactionIsAborted()); } @@ -868,6 +901,8 @@ TEST_F(SessionTest, AbortClearsStoredStatements) { // This test makes sure the commit machinery works even when no operations are done on the // transaction. TEST_F(SessionTest, EmptyTransactionCommit) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -887,6 +922,8 @@ TEST_F(SessionTest, EmptyTransactionCommit) { // This test makes sure the abort machinery works even when no operations are done on the // transaction. TEST_F(SessionTest, EmptyTransactionAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -899,11 +936,13 @@ TEST_F(SessionTest, EmptyTransactionAbort) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); ASSERT_TRUE(session.transactionIsAborted()); } TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -914,7 +953,7 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { session.beginOrContinueTxn(opCtx(), txnNum, false, true); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); // An unstash after an abort should uassert. ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx(), "find"), @@ -923,6 +962,8 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { } TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -950,6 +991,8 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) { } TEST_F(SessionTest, ConcurrencyOfStashAndAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -962,13 +1005,15 @@ TEST_F(SessionTest, ConcurrencyOfStashAndAbort) { session.unstashTransactionResources(opCtx(), "find"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); // A stash after an abort should be a noop. session.stashTransactionResources(opCtx()); } TEST_F(SessionTest, ConcurrencyOfStashAndMigration) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -993,6 +1038,8 @@ TEST_F(SessionTest, ConcurrencyOfStashAndMigration) { } TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1005,7 +1052,7 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); // An addTransactionOperation() after an abort should uassert. auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); @@ -1015,6 +1062,8 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { } TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1040,6 +1089,8 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) { } TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1052,7 +1103,7 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); // An endTransactionAndRetrieveOperations() after an abort should uassert. ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), @@ -1061,6 +1112,8 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { } TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1086,6 +1139,8 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration } TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); @@ -1098,7 +1153,7 @@ TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { session.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx(), kKillCursors); // An commitTransaction() after an abort should uassert. ASSERT_THROWS_CODE( @@ -1106,6 +1161,8 @@ TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { } TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndMigration) { + Session::registerCursorKillFunction(noopKillCursorFunction); + Session::registerCursorExistsFunction(noopCursorExistsFunction); const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index ee3933dfa35..48271d24758 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -541,7 +541,7 @@ TEST_F(CursorManagerTestCustomOpCtx, LogicalSessionIdOnOperationCtxTest) { // Cursors created on an op ctx with a session id have a session id. { auto lsid = makeLogicalSessionIdForTest(); - auto opCtx2 = _queryServiceContext->makeOperationContext(lsid); + auto opCtx2 = _queryServiceContext->makeOperationContext(lsid, boost::none); auto pinned2 = makeCursor(opCtx2.get()); ASSERT_EQUALS(pinned2.getCursor()->getSessionId(), lsid); @@ -569,7 +569,7 @@ TEST_F(CursorManagerTestCustomOpCtx, CursorsWithoutSessions) { TEST_F(CursorManagerTestCustomOpCtx, OneCursorWithASession) { // Add a cursor with a session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); - auto opCtx = _queryServiceContext->makeOperationContext(lsid); + auto opCtx = _queryServiceContext->makeOperationContext(lsid, boost::none); auto pinned = makeCursor(opCtx.get()); // Retrieve all sessions active in manager - set should contain just lsid. @@ -595,13 +595,145 @@ TEST_F(CursorManagerTestCustomOpCtx, OneCursorWithASession) { ASSERT(useCursorManager()->getCursorsForSession(lsid).empty()); } +TEST_F(CursorManagerTestCustomOpCtx, KillCursorRespectsSessionId) { + // Add a cursor with a session to the cursor manager. + auto lsid = makeLogicalSessionIdForTest(); + auto opCtx = _queryServiceContext->makeOperationContext(lsid, boost::none); + auto pinned = makeCursor(opCtx.get()); + auto cursorId = pinned.getCursor()->cursorid(); + + // Killing the cursor with incorrect LogicalSessionId fails. + pinned.release(); + auto wrongLsid = makeLogicalSessionIdForTest(); + auto status = + useCursorManager()->killCursor(opCtx.get(), cursorId, false, wrongLsid, boost::none); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::CursorNotFound); + + // Killing the cursor with the correct LogicalSessionId works. + ASSERT_OK(useCursorManager()->killCursor(opCtx.get(), cursorId, false, lsid, boost::none)); +} + +TEST_F(CursorManagerTestCustomOpCtx, + KillCursorWithSessionDoesNotKillCursorCreatedOutsideOfSession) { + // Add a cursor with a session to the cursor manager. + auto opCtx = _queryServiceContext->makeOperationContext(); + auto pinned = makeCursor(opCtx.get()); + auto cursorId = pinned.getCursor()->cursorid(); + + // Killing the cursor with the correct cursorId but with an unrelated LogicalSessionId fails. + auto lsid = makeLogicalSessionIdForTest(); + auto status = useCursorManager()->killCursor(opCtx.get(), cursorId, false, lsid, boost::none); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::CursorNotFound); +} + +TEST_F(CursorManagerTestCustomOpCtx, KillCursorRespectsTxnNumber) { + // Add a cursor with a session to the cursor manager. + auto lsid = makeLogicalSessionIdForTest(); + const TxnNumber txnNumber = 0; + auto opCtx = _queryServiceContext->makeOperationContext(lsid, txnNumber); + auto pinned = makeCursor(opCtx.get()); + auto cursorId = pinned.getCursor()->cursorid(); + + // Killing the cursor with incorrect TxnNumber fails. + pinned.release(); + const TxnNumber wrongTxnNumber = 1; + auto status = + useCursorManager()->killCursor(opCtx.get(), cursorId, false, lsid, wrongTxnNumber); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::CursorNotFound); + + // Kill the cursor with the correct TxnNumber works. + ASSERT_OK(useCursorManager()->killCursor(opCtx.get(), cursorId, false, lsid, txnNumber)); +} + +TEST_F(CursorManagerTestCustomOpCtx, + KillAllCursorsForTransactionRemovesCorrectEntryFromTransactionMap) { + CursorManager* cursorManager = CursorManager::getGlobalCursorManager(); + + // Create 3 sets of cursors, each with a unique LogicalSessionId/TxnNumber pair, but each + // sharing either LogicalSessionId or TxnNumber with another set. + auto lsid1 = makeLogicalSessionIdForTest(); + TxnNumber txnNumber1 = 0; + { + auto opCtx = _queryServiceContext->makeOperationContext(lsid1, txnNumber1); + auto pinned = cursorManager->registerCursor(opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); + pinned.release(); + } + + auto lsid2 = lsid1; + TxnNumber txnNumber2 = 1; + { + auto opCtx = _queryServiceContext->makeOperationContext(lsid2, txnNumber2); + auto pinned = cursorManager->registerCursor(opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); + pinned.release(); + } + + auto lsid3 = makeLogicalSessionIdForTest(); + TxnNumber txnNumber3 = txnNumber1; + { + auto opCtx = _queryServiceContext->makeOperationContext(lsid3, txnNumber3); + // Create 2 cursors for the third set to confirm multiple cursor deregistration. + auto pinned = cursorManager->registerCursor(opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); + pinned.release(); + pinned = cursorManager->registerCursor(opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + repl::ReadConcernLevel::kLocalReadConcern, + BSONObj()}); + pinned.release(); + } + + auto opCtx = _queryServiceContext->makeOperationContext(); + + // Transaction reference exists for all 3 sets. + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid1, txnNumber1)); + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid2, txnNumber2)); + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid3, txnNumber3)); + + // Transaction reference does not exist for LogicalSessionId/TxnNumber that has no cursors. + ASSERT_FALSE(cursorManager->hasTransactionCursorReference(makeLogicalSessionIdForTest(), 99)); + + // Kill cursors for set 1. + ASSERT_EQ(1ul, cursorManager->killAllCursorsForTransaction(opCtx.get(), lsid1, txnNumber1)); + ASSERT_FALSE(cursorManager->hasTransactionCursorReference(lsid1, txnNumber1)); + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid2, txnNumber2)); + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid3, txnNumber3)); + + // Kill cursors for set 2. + ASSERT_EQ(1ul, cursorManager->killAllCursorsForTransaction(opCtx.get(), lsid2, txnNumber2)); + ASSERT_FALSE(cursorManager->hasTransactionCursorReference(lsid2, txnNumber2)); + ASSERT_TRUE(cursorManager->hasTransactionCursorReference(lsid3, txnNumber3)); + + // Kill cursors for set 3. + ASSERT_EQ(2ul, cursorManager->killAllCursorsForTransaction(opCtx.get(), lsid3, txnNumber3)); + ASSERT_FALSE(cursorManager->hasTransactionCursorReference(lsid3, txnNumber3)); +} + /** * Test a manager with multiple cursors running inside of the same session. */ TEST_F(CursorManagerTestCustomOpCtx, MultipleCursorsWithSameSession) { // Add two cursors on the same session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); - auto opCtx = _queryServiceContext->makeOperationContext(lsid); + auto opCtx = _queryServiceContext->makeOperationContext(lsid, boost::none); auto pinned = makeCursor(opCtx.get()); auto pinned2 = makeCursor(opCtx.get()); @@ -648,13 +780,13 @@ TEST_F(CursorManagerTestCustomOpCtx, MultipleCursorsMultipleSessions) { // Cursor with session 1. { - auto opCtx1 = _queryServiceContext->makeOperationContext(lsid1); + auto opCtx1 = _queryServiceContext->makeOperationContext(lsid1, boost::none); cursor1 = makeCursor(opCtx1.get()).getCursor()->cursorid(); } // Cursor with session 2. { - auto opCtx2 = _queryServiceContext->makeOperationContext(lsid2); + auto opCtx2 = _queryServiceContext->makeOperationContext(lsid2, boost::none); cursor2 = makeCursor(opCtx2.get()).getCursor()->cursorid(); } |