/** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/db/cursor_manager.h" #include "mongo/base/data_cursor.h" #include "mongo/base/init.h" #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/background.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/cursor_server_params.h" #include "mongo/db/db_raii.h" #include "mongo/db/kill_sessions_common.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/startup_test.h" namespace mongo { using std::vector; constexpr int CursorManager::kNumPartitions; namespace { uint32_t idFromCursorId(CursorId id) { uint64_t x = static_cast(id); x = x >> 32; return static_cast(x); } CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) { // The leading two bits of a non-global CursorId should be 0. invariant((collectionIdentifier & (0b11 << 30)) == 0); CursorId x = static_cast(collectionIdentifier) << 32; x |= cursor; return x; } class GlobalCursorIdCache { public: GlobalCursorIdCache(); ~GlobalCursorIdCache(); /** * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a * new CursorManager. */ uint32_t registerCursorManager(const NamespaceString& nss); /** * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by * registerCursorManager(). */ void deregisterCursorManager(uint32_t id, const NamespaceString& nss); /** * works globally */ bool killCursor(OperationContext* opCtx, CursorId id, bool checkAuth); void appendStats(BSONObjBuilder& builder); std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); template void visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor); int64_t nextSeed(); private: SimpleMutex _mutex; typedef stdx::unordered_map Map; Map _idToNss; unsigned _nextId; std::unique_ptr _secureRandom; }; // Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter // calls into the former during destruction. std::unique_ptr globalCursorIdCache; std::unique_ptr globalCursorManager; MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { globalCursorIdCache.reset(new GlobalCursorIdCache()); return Status::OK(); } MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) (InitializerContext* context) { globalCursorManager.reset(new CursorManager({})); return Status::OK(); } GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {} GlobalCursorIdCache::~GlobalCursorIdCache() {} int64_t GlobalCursorIdCache::nextSeed() { stdx::lock_guard lk(_mutex); if (!_secureRandom) _secureRandom = SecureRandom::create(); return _secureRandom->nextInt64(); } uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { static const uint32_t kMaxIds = 1000 * 1000 * 1000; static_assert((kMaxIds & (0b11 << 30)) == 0, "the first two bits of a collection identifier must always be zeroes"); stdx::lock_guard lk(_mutex); fassert(17359, _idToNss.size() < kMaxIds); for (uint32_t i = 0; i <= kMaxIds; i++) { uint32_t id = ++_nextId; if (id == 0) continue; if (_idToNss.count(id) > 0) continue; _idToNss[id] = nss; return id; } MONGO_UNREACHABLE; } void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) { stdx::lock_guard lk(_mutex); invariant(nss == _idToNss[id]); _idToNss.erase(id); } bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool checkAuth) { // Figure out what the namespace of this cursor is. NamespaceString nss; if (CursorManager::isGloballyManagedCursor(id)) { auto pin = globalCursorManager->pinCursor(opCtx, id, CursorManager::kNoCheckSession); if (!pin.isOK()) { invariant(pin == ErrorCodes::CursorNotFound || pin == ErrorCodes::Unauthorized); // No such cursor. TODO: Consider writing to audit log here (even though we don't // have a namespace). return false; } nss = pin.getValue().getCursor()->nss(); } else { stdx::lock_guard lk(_mutex); uint32_t nsid = idFromCursorId(id); Map::const_iterator it = _idToNss.find(nsid); if (it == _idToNss.end()) { // No namespace corresponding to this cursor id prefix. TODO: Consider writing to // audit log here (even though we don't have a namespace). return false; } nss = it->second; } invariant(nss.isValid()); // Check if we are authorized to kill this cursor. if (checkAuth) { auto status = CursorManager::withCursorManager( opCtx, id, nss, [nss, id, opCtx](CursorManager* manager) { auto ccPin = manager->pinCursor(opCtx, id, CursorManager::kNoCheckSession); if (!ccPin.isOK()) { return ccPin.getStatus(); } AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient()); auto cursorOwner = ccPin.getValue().getCursor()->getAuthenticatedUsers(); return as->checkAuthForKillCursors(nss, cursorOwner); }); if (!status.isOK()) { audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, status.code()); return false; } } boost::optional> txnToAbort; // If this cursor is owned by the global cursor manager, ask it to kill the cursor for us. if (CursorManager::isGloballyManagedCursor(id)) { auto statusWithTxnToAbort = globalCursorManager->killCursor(opCtx, id, checkAuth); massert(28697, statusWithTxnToAbort.getStatus().reason(), statusWithTxnToAbort.getStatus().code() == ErrorCodes::OK || statusWithTxnToAbort.getStatus().code() == ErrorCodes::CursorNotFound); if (!statusWithTxnToAbort.isOK()) { return false; } txnToAbort = statusWithTxnToAbort.getValue(); } else { // If not, then the cursor must be owned by a collection. Kill the cursor under the // collection lock (to prevent the collection from going away during the erase). AutoGetCollectionForReadCommand ctx(opCtx, nss); Collection* collection = ctx.getCollection(); if (!collection) { if (checkAuth) audit::logKillCursorsAuthzCheck( opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound); return false; } auto statusWithTxnToAbort = collection->getCursorManager()->killCursor(opCtx, id, checkAuth); uassert(16089, statusWithTxnToAbort.getStatus().reason(), statusWithTxnToAbort.getStatus().code() == ErrorCodes::OK || statusWithTxnToAbort.getStatus().code() == ErrorCodes::CursorNotFound); if (!statusWithTxnToAbort.isOK()) { return false; } txnToAbort = statusWithTxnToAbort.getValue(); } // If the cursor has a corresponding transaction, abort that transaction if it is a snapshot // read. This must be done while we are not holding locks. invariant(!opCtx->lockState()->isLocked()); if (txnToAbort) { auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort->first); if (session) { (*session)->abortIfSnapshotRead(opCtx, txnToAbort->second); } } return true; } std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) { size_t totalTimedOut = 0; std::vector> txnsToAbort; // Time out the cursors from the global cursor manager. totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now, &txnsToAbort); // Compute the set of collection names that we have to time out cursors for. vector todo; { stdx::lock_guard lk(_mutex); for (auto&& entry : _idToNss) { todo.push_back(entry.second); } } // For each collection, time out its cursors under the collection lock (to prevent the // collection from going away during the erase). for (const auto& nsTodo : todo) { // Note that we specify 'kViewsPermitted' here, even though we don't expect 'nsTodo' to be a // view. Because we are not holding the mutex anymore, it is possible that the collection we // are trying to access has since been destroyed and a view of the same name has been // created in its place. Without 'kViewsPermitted' here, that would result in a uassert that // would crash the cursor cleaner background thread. AutoGetCollectionForReadCommand ctx( opCtx, nsTodo, AutoGetCollection::ViewMode::kViewsPermitted); if (!ctx.getDb()) { continue; } Collection* const collection = ctx.getCollection(); if (!collection) { // The 'nsTodo' collection has been dropped since we held _mutex. We can safely skip it. continue; } totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now, &txnsToAbort); } // If the cursors had corresponding transactions, abort the transactions if they are snapshot // reads. This must be done while we are not holding locks. invariant(!opCtx->lockState()->isLocked()); for (auto&& txnToAbort : txnsToAbort) { auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort.first); if (session) { (*session)->abortIfSnapshotRead(opCtx, txnToAbort.second); } } return totalTimedOut; } } // namespace template void GlobalCursorIdCache::visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor) { (*visitor)(*globalCursorManager); // Compute the set of collection names that we have to get sessions for vector namespaces; { stdx::lock_guard lk(_mutex); for (auto&& entry : _idToNss) { namespaces.push_back(entry.second); } } // For each collection, get its sessions under the collection lock (to prevent the // collection from going away during the erase). for (auto&& ns : namespaces) { AutoGetCollection ctx(opCtx, ns, MODE_IS); if (!ctx.getDb()) { continue; } Collection* collection = ctx.getCollection(); if (!collection) { continue; } (*visitor)(*(collection->getCursorManager())); } } // --- CursorManager* CursorManager::getGlobalCursorManager() { return globalCursorManager.get(); } void CursorManager::appendAllActiveSessions(OperationContext* opCtx, LogicalSessionIdSet* lsids) { auto visitor = [&](CursorManager& mgr) { mgr.appendActiveSessions(lsids); }; globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor); } std::vector CursorManager::getAllCursors(OperationContext* opCtx) { std::vector cursors; auto visitor = [&](CursorManager& mgr) { mgr.appendActiveCursors(&cursors); }; globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor); return cursors; } std::pair CursorManager::killCursorsWithMatchingSessions( OperationContext* opCtx, const SessionKiller::Matcher& matcher) { auto eraser = [&](CursorManager& mgr, CursorId id) { uassertStatusOK(mgr.killCursor(opCtx, id, true)); }; auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser)); globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor); return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled()); } std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { return globalCursorIdCache->timeoutCursors(opCtx, now); } int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) { ConstDataCursor ids(_ids); int numDeleted = 0; for (int i = 0; i < n; i++) { if (killCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance>())) numDeleted++; if (globalInShutdownDeprecated()) break; } return numDeleted; } bool CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) { return globalCursorIdCache->killCursor(opCtx, id, true); } bool CursorManager::killCursorGlobal(OperationContext* opCtx, CursorId id) { return globalCursorIdCache->killCursor(opCtx, id, false); } Status CursorManager::withCursorManager(OperationContext* opCtx, CursorId id, const NamespaceString& nss, stdx::function callback) { boost::optional readLock; CursorManager* cursorManager = nullptr; if (CursorManager::isGloballyManagedCursor(id)) { cursorManager = CursorManager::getGlobalCursorManager(); } else { readLock.emplace(opCtx, nss); Collection* collection = readLock->getCollection(); if (!collection) { return {ErrorCodes::CursorNotFound, str::stream() << "collection does not exist: " << nss.ns()}; } cursorManager = collection->getCursorManager(); } invariant(cursorManager); return callback(cursorManager); } // -------------------------- std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec, const std::size_t nPartitions) { auto token = exec->getRegistrationToken(); invariant(token); return (*token) % nPartitions; } CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)), _collectionCacheRuntimeId(_nss.isEmpty() ? 0 : globalCursorIdCache->registerCursorManager(_nss)), _random(stdx::make_unique(globalCursorIdCache->nextSeed())), _registeredPlanExecutors(), _cursorMap(stdx::make_unique>>()) {} CursorManager::~CursorManager() { // All cursors and PlanExecutors should have been deleted already. invariant(_registeredPlanExecutors.empty()); invariant(_cursorMap->empty()); if (!isGlobalManager()) { globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); } } void CursorManager::invalidateAll(OperationContext* opCtx, bool collectionGoingAway, const std::string& reason) { invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors. dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); fassert(28819, !BackgroundOperation::inProgForNs(_nss)); auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); for (auto&& partition : allExecPartitions) { for (auto&& exec : partition) { // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be // cleaned up later. exec->markAsKilled({ErrorCodes::QueryPlanKilled, reason}); } } allExecPartitions.clear(); // Mark all cursors as killed, but keep around those we can in order to provide a useful error // message to the user when they attempt to use it next time. auto allCurrentPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allCurrentPartitions) { for (auto it = partition.begin(); it != partition.end();) { auto* cursor = it->second; cursor->markAsKilled({ErrorCodes::QueryPlanKilled, reason}); // If there's an operation actively using the cursor, then that operation is now // responsible for cleaning it up. Otherwise we can immediately dispose of it. if (cursor->_operationUsingCursor) { it = partition.erase(it); continue; } if (!collectionGoingAway) { // We keep around unpinned cursors so that future attempts to use the cursor will // result in a useful error message. ++it; } else { cursor->dispose(opCtx); delete cursor; it = partition.erase(it); } } } } void CursorManager::invalidateDocument(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations. if (supportsDocLocking()) { // If a storage engine supports doc locking, then we do not need to invalidate. // The transactional boundaries of the operation protect us. return; } auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); for (auto&& partition : allExecPartitions) { for (auto&& exec : partition) { exec->invalidate(opCtx, dl, type); } } auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { for (auto&& entry : partition) { auto exec = entry.second->getExecutor(); exec->invalidate(opCtx, dl, type); } } } bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) { if (cursor->isNoTimeout() || cursor->_operationUsingCursor) { return false; } return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis()); } std::size_t CursorManager::timeoutCursors( OperationContext* opCtx, Date_t now, std::vector>* txnsToAbort) { std::vector> toDelete; for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) { auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId); for (auto it = lockedPartition->begin(); it != lockedPartition->end();) { auto* cursor = it->second; if (cursorShouldTimeout_inlock(cursor, now)) { // Dispose of the cursor and remove it from the partition. cursor->dispose(opCtx); toDelete.push_back(std::unique_ptr{cursor}); if (cursor->getTxnNumber()) { invariant(cursor->getSessionId()); txnsToAbort->emplace_back(*cursor->getSessionId(), *cursor->getTxnNumber()); } it = lockedPartition->erase(it); } else { ++it; } } } return toDelete.size(); } namespace { static AtomicUInt32 registeredPlanExecutorId; } // namespace Partitioned>::PartitionId CursorManager::registerExecutor( PlanExecutor* exec) { auto partitionId = registeredPlanExecutorId.fetchAndAdd(1); exec->setRegistrationToken(partitionId); _registeredPlanExecutors.insert(exec); return partitionId; } void CursorManager::deregisterExecutor(PlanExecutor* exec) { if (auto partitionId = exec->getRegistrationToken()) { _registeredPlanExecutors.erase(exec); } } StatusWith CursorManager::pinCursor(OperationContext* opCtx, CursorId id, AuthCheck checkSessionAuth) { auto lockedPartition = _cursorMap->lockOnePartition(id); auto it = lockedPartition->find(id); if (it == lockedPartition->end()) { return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"}; } ClientCursor* cursor = it->second; uassert(ErrorCodes::CursorInUse, str::stream() << "cursor id " << id << " is already in use", !cursor->_operationUsingCursor); if (cursor->getExecutor()->isMarkedAsKilled()) { // This cursor was killed while it was idle. Status error = cursor->getExecutor()->getKillStatus(); lockedPartition->erase(cursor->cursorid()); cursor->dispose(opCtx); delete cursor; return error; } if (checkSessionAuth == kCheckSession) { auto cursorPrivilegeStatus = checkCursorSessionPrivilege(opCtx, cursor->getSessionId()); if (!cursorPrivilegeStatus.isOK()) { return cursorPrivilegeStatus; } } cursor->_operationUsingCursor = opCtx; // We use pinning of a cursor as a proxy for active, user-initiated use of a cursor. Therefor, // we pass down to the logical session cache and vivify the record (updating last use). if (cursor->getSessionId()) { LogicalSessionCache::get(opCtx)->vivify(opCtx, cursor->getSessionId().get()); } return ClientCursorPin(opCtx, cursor); } void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { // Avoid computing the current time within the critical section. auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); auto partition = _cursorMap->lockOnePartition(cursor->cursorid()); invariant(cursor->_operationUsingCursor); // We must verify that no interrupts have occurred since we finished building the current // batch. Otherwise, the cursor will be checked back in, the interrupted opCtx will be // destroyed, and subsequent getMores with a fresh opCtx will succeed. auto interruptStatus = cursor->_operationUsingCursor->checkForInterruptNoAssert(); cursor->_operationUsingCursor = nullptr; cursor->_lastUseDate = now; if (!interruptStatus.isOK()) { // If an interrupt occurred after the batch was completed, we remove the now-unpinned cursor // from the CursorManager, then dispose of and delete it. LOG(0) << "removing cursor " << cursor->cursorid() << " after completing batch: " << interruptStatus; partition->erase(cursor->cursorid()); cursor->dispose(opCtx); delete cursor; } } void CursorManager::getCursorIds(std::set* openCursors) const { auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { for (auto&& entry : partition) { openCursors->insert(entry.first); } } } void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const { auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { for (auto&& entry : partition) { auto cursor = entry.second; if (auto id = cursor->getSessionId()) { lsids->insert(id.value()); } } } } void CursorManager::appendActiveCursors(std::vector* cursors) const { auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { for (auto&& entry : partition) { auto cursor = entry.second; cursors->emplace_back(); auto& gc = cursors->back(); gc.setId(cursor->_cursorid); gc.setNs(cursor->nss()); gc.setLsid(cursor->getSessionId()); } } } stdx::unordered_set CursorManager::getCursorsForSession(LogicalSessionId lsid) const { stdx::unordered_set cursors; auto allPartitions = _cursorMap->lockAllPartitions(); for (auto&& partition : allPartitions) { for (auto&& entry : partition) { auto cursor = entry.second; if (cursor->getSessionId() == lsid) { cursors.insert(cursor->cursorid()); } } } return cursors; } size_t CursorManager::numCursors() const { return _cursorMap->size(); } CursorId CursorManager::allocateCursorId_inlock() { for (int i = 0; i < 10000; i++) { // The leading two bits of a CursorId are used to determine if the cursor is registered on // the global cursor manager. CursorId id; if (isGlobalManager()) { // This is the global cursor manager, so generate a random number and make sure the // first two bits are 01. uint64_t mask = 0x3FFFFFFFFFFFFFFF; uint64_t bitToSet = 1ULL << 62; id = ((_random->nextInt64() & mask) | bitToSet); } else { // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32 // bits are random. uint32_t myPart = static_cast(_random->nextInt32()); id = cursorIdFromParts(_collectionCacheRuntimeId, myPart); } auto partition = _cursorMap->lockOnePartition(id); if (partition->count(id) == 0) return id; } fassertFailed(17360); } ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, ClientCursorParams&& cursorParams) { // Avoid computing the current time within the critical section. auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping // it. invariant(cursorParams.exec); deregisterExecutor(cursorParams.exec.get()); cursorParams.exec.get_deleter().dismissDisposal(); cursorParams.exec->unsetRegistered(); // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure // we don't insert two cursors with the same cursor id. stdx::lock_guard lock(_registrationLock); CursorId cursorId = allocateCursorId_inlock(); std::unique_ptr clientCursor( new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now)); // Transfer ownership of the cursor to '_cursorMap'. auto partition = _cursorMap->lockOnePartition(cursorId); ClientCursor* unownedCursor = clientCursor.release(); partition->emplace(cursorId, unownedCursor); return ClientCursorPin(opCtx, unownedCursor); } void CursorManager::deregisterCursor(ClientCursor* cc) { _cursorMap->erase(cc->cursorid()); } StatusWith>> CursorManager::killCursor( OperationContext* opCtx, CursorId id, bool shouldAudit) { auto lockedPartition = _cursorMap->lockOnePartition(id); auto it = lockedPartition->find(id); if (it == lockedPartition->end()) { if (shouldAudit) { audit::logKillCursorsAuthzCheck( opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); } return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; } auto cursor = it->second; if (cursor->_operationUsingCursor) { // Rather than removing the cursor directly, kill the operation that's currently using the // cursor. It will stop on its own (and remove the cursor) when it sees that it's been // interrupted. { stdx::unique_lock lk(*cursor->_operationUsingCursor->getClient()); cursor->_operationUsingCursor->getServiceContext()->killOperation( cursor->_operationUsingCursor, ErrorCodes::CursorKilled); } if (shouldAudit) { audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); } return {boost::none}; } std::unique_ptr ownedCursor(cursor); boost::optional> toReturn; if (ownedCursor->getTxnNumber()) { invariant(ownedCursor->getSessionId()); toReturn = std::make_pair(*ownedCursor->getSessionId(), *ownedCursor->getTxnNumber()); } if (shouldAudit) { audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); } lockedPartition->erase(ownedCursor->cursorid()); ownedCursor->dispose(opCtx); return toReturn; } Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) { auto lockedPartition = _cursorMap->lockOnePartition(id); auto it = lockedPartition->find(id); if (it == lockedPartition->end()) { return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"}; } ClientCursor* cursor = it->second; // Note that we're accessing the cursor without having pinned it! This is okay since we're only // accessing nss() and getAuthenticatedUsers() both of which return values that don't change // after the cursor's creation. We're guaranteed that the cursor won't get destroyed while we're // reading from it because we hold the partition's lock. AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient()); return as->checkAuthForKillCursors(cursor->nss(), cursor->getAuthenticatedUsers()); } } // namespace mongo