/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/base/data_cursor.h"
#include "mongo/base/init.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/cursor_server_params.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/kill_sessions_common.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/startup_test.h"
namespace mongo {
using std::vector;
constexpr int CursorManager::kNumPartitions;
namespace {
uint32_t idFromCursorId(CursorId id) {
uint64_t x = static_cast(id);
x = x >> 32;
return static_cast(x);
}
CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
// The leading two bits of a non-global CursorId should be 0.
invariant((collectionIdentifier & (0b11 << 30)) == 0);
CursorId x = static_cast(collectionIdentifier) << 32;
x |= cursor;
return x;
}
class GlobalCursorIdCache {
public:
GlobalCursorIdCache();
~GlobalCursorIdCache();
/**
* Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
* new CursorManager.
*/
uint32_t registerCursorManager(const NamespaceString& nss);
/**
* Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
* registerCursorManager().
*/
void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
/**
* works globally
*/
bool killCursor(OperationContext* opCtx, CursorId id, bool checkAuth);
void appendStats(BSONObjBuilder& builder);
std::size_t timeoutCursors(OperationContext* opCtx, Date_t now);
template
void visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor);
int64_t nextSeed();
private:
// '_mutex' must not be held when acquiring a CursorManager mutex to avoid deadlock.
SimpleMutex _mutex;
using CursorIdToNssMap = stdx::unordered_map;
using IdToNssMap = stdx::unordered_map;
IdToNssMap _idToNss;
unsigned _nextId;
std::unique_ptr _secureRandom;
};
// Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter
// calls into the former during destruction.
std::unique_ptr globalCursorIdCache;
std::unique_ptr globalCursorManager;
MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
globalCursorIdCache.reset(new GlobalCursorIdCache());
return Status::OK();
}
MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
(InitializerContext* context) {
globalCursorManager.reset(new CursorManager({}));
return Status::OK();
}
GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {}
GlobalCursorIdCache::~GlobalCursorIdCache() {}
int64_t GlobalCursorIdCache::nextSeed() {
stdx::lock_guard lk(_mutex);
if (!_secureRandom)
_secureRandom = SecureRandom::create();
return _secureRandom->nextInt64();
}
uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
static const uint32_t kMaxIds = 1000 * 1000 * 1000;
static_assert((kMaxIds & (0b11 << 30)) == 0,
"the first two bits of a collection identifier must always be zeroes");
stdx::lock_guard lk(_mutex);
fassert(17359, _idToNss.size() < kMaxIds);
for (uint32_t i = 0; i <= kMaxIds; i++) {
uint32_t id = ++_nextId;
if (id == 0)
continue;
if (_idToNss.count(id) > 0)
continue;
_idToNss[id] = nss;
return id;
}
MONGO_UNREACHABLE;
}
void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
stdx::lock_guard lk(_mutex);
invariant(nss == _idToNss[id]);
_idToNss.erase(id);
}
bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
// Figure out what the namespace of this cursor is.
NamespaceString nss;
if (CursorManager::isGloballyManagedCursor(id)) {
auto pin = globalCursorManager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
if (!pin.isOK()) {
invariant(pin == ErrorCodes::CursorNotFound || pin == ErrorCodes::Unauthorized);
// No such cursor. TODO: Consider writing to audit log here (even though we don't
// have a namespace).
return false;
}
nss = pin.getValue().getCursor()->nss();
} else {
stdx::lock_guard lk(_mutex);
uint32_t nsid = idFromCursorId(id);
IdToNssMap::const_iterator it = _idToNss.find(nsid);
if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
return false;
}
nss = it->second;
}
invariant(nss.isValid());
// Check if we are authorized to kill this cursor.
if (checkAuth) {
auto status = CursorManager::withCursorManager(
opCtx, id, nss, [nss, id, opCtx](CursorManager* manager) {
auto ccPin = manager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
if (!ccPin.isOK()) {
return ccPin.getStatus();
}
AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient());
auto cursorOwner = ccPin.getValue().getCursor()->getAuthenticatedUsers();
return as->checkAuthForKillCursors(nss, cursorOwner);
});
if (!status.isOK()) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, status.code());
return false;
}
}
// If this cursor is owned by the global cursor manager, ask it to kill the cursor for us.
if (CursorManager::isGloballyManagedCursor(id)) {
Status killStatus = globalCursorManager->killCursor(opCtx, id, checkAuth);
massert(28697,
killStatus.reason(),
killStatus.code() == ErrorCodes::OK ||
killStatus.code() == ErrorCodes::CursorNotFound);
return killStatus.isOK();
}
// If not, then the cursor must be owned by a collection. Kill the cursor under the
// collection lock (to prevent the collection from going away during the erase).
AutoGetCollectionForReadCommand ctx(opCtx, nss);
Collection* collection = ctx.getCollection();
if (!collection) {
if (checkAuth)
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound);
return false;
}
Status eraseStatus = collection->getCursorManager()->killCursor(opCtx, id, checkAuth);
uassert(16089,
eraseStatus.reason(),
eraseStatus.code() == ErrorCodes::OK ||
eraseStatus.code() == ErrorCodes::CursorNotFound);
return eraseStatus.isOK();
}
std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) {
size_t totalTimedOut = 0;
// Time out the cursors from the global cursor manager.
totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now);
// Compute the set of collection names that we have to time out cursors for.
vector todo;
{
stdx::lock_guard lk(_mutex);
for (auto&& entry : _idToNss) {
todo.push_back(entry.second);
}
}
// For each collection, time out its cursors under the collection lock (to prevent the
// collection from going away during the erase).
for (const auto& nsTodo : todo) {
// We need to be careful to not use an AutoGet* helper, since we only need the lock to
// protect potential access to the Collection's CursorManager, and those helpers may
// do things we don't want here, like check the shard version or throw an exception if this
// namespace has since turned into a view. Using Database::getCollection() will simply
// return nullptr if the collection has since turned into a view. In this case, the cursors
// will already have been cleaned up when the collection was dropped, so there will be none
// left to time out.
//
// Additionally, we need to use the UninterruptibleLockGuard to ensure the lock acquisition
// will not throw due to an interrupt. This method can be called from a background thread so
// we do not want to throw any exceptions.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
AutoGetDb dbLock(opCtx, nsTodo.db(), MODE_IS);
Lock::CollectionLock collLock(opCtx->lockState(), nsTodo.ns(), MODE_IS);
if (!dbLock.getDb()) {
continue;
}
Collection* const collection = dbLock.getDb()->getCollection(opCtx, nsTodo);
if (!collection) {
// The 'nsTodo' collection has been dropped since we held _mutex. We can safely skip it.
continue;
}
totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now);
}
return totalTimedOut;
}
} // namespace
template
void GlobalCursorIdCache::visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor) {
(*visitor)(*globalCursorManager);
// Compute the set of collection names that we have to get sessions for
vector namespaces;
{
stdx::lock_guard lk(_mutex);
for (auto&& entry : _idToNss) {
namespaces.push_back(entry.second);
}
}
// For each collection, get its sessions under the collection lock (to prevent the
// collection from going away during the erase).
for (auto&& ns : namespaces) {
AutoGetCollection ctx(opCtx, ns, MODE_IS);
if (!ctx.getDb()) {
continue;
}
Collection* collection = ctx.getCollection();
if (!collection) {
continue;
}
(*visitor)(*(collection->getCursorManager()));
}
}
// ---
CursorManager* CursorManager::getGlobalCursorManager() {
return globalCursorManager.get();
}
void CursorManager::appendAllActiveSessions(OperationContext* opCtx, LogicalSessionIdSet* lsids) {
auto visitor = [&](CursorManager& mgr) { mgr.appendActiveSessions(lsids); };
globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
}
std::vector CursorManager::getAllCursors(OperationContext* opCtx) {
std::vector cursors;
auto visitor = [&](CursorManager& mgr) { mgr.appendActiveCursors(&cursors); };
globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
return cursors;
}
std::pair CursorManager::killCursorsWithMatchingSessions(
OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
auto eraser = [&](CursorManager& mgr, CursorId id) {
uassertStatusOK(mgr.killCursor(opCtx, id, true));
};
auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
}
std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
return globalCursorIdCache->timeoutCursors(opCtx, now);
}
int CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
ConstDataCursor ids(_ids);
int numDeleted = 0;
for (int i = 0; i < n; i++) {
if (killCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance>()))
numDeleted++;
if (globalInShutdownDeprecated())
break;
}
return numDeleted;
}
bool CursorManager::killCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) {
return globalCursorIdCache->killCursor(opCtx, id, true);
}
bool CursorManager::killCursorGlobal(OperationContext* opCtx, CursorId id) {
return globalCursorIdCache->killCursor(opCtx, id, false);
}
Status CursorManager::withCursorManager(OperationContext* opCtx,
CursorId id,
const NamespaceString& nss,
stdx::function callback) {
boost::optional readLock;
CursorManager* cursorManager = nullptr;
if (CursorManager::isGloballyManagedCursor(id)) {
cursorManager = CursorManager::getGlobalCursorManager();
} else {
readLock.emplace(opCtx, nss);
Collection* collection = readLock->getCollection();
if (!collection) {
return {ErrorCodes::CursorNotFound,
str::stream() << "collection does not exist: " << nss.ns()};
}
cursorManager = collection->getCursorManager();
}
invariant(cursorManager);
return callback(cursorManager);
}
// --------------------------
std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec,
const std::size_t nPartitions) {
auto token = exec->getRegistrationToken();
invariant(token);
return (*token) % nPartitions;
}
CursorManager::CursorManager(NamespaceString nss)
: _nss(std::move(nss)),
_collectionCacheRuntimeId(_nss.isEmpty() ? 0
: globalCursorIdCache->registerCursorManager(_nss)),
_random(stdx::make_unique(globalCursorIdCache->nextSeed())),
_registeredPlanExecutors(),
_cursorMap(stdx::make_unique>>()) {}
CursorManager::~CursorManager() {
// All cursors and PlanExecutors should have been deleted already.
invariant(_registeredPlanExecutors.empty());
invariant(_cursorMap->empty());
if (!isGlobalManager()) {
globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
}
}
void CursorManager::invalidateAll(OperationContext* opCtx,
bool collectionGoingAway,
const std::string& reason) {
invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors.
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
fassert(28819, !BackgroundOperation::inProgForNs(_nss));
auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
for (auto&& partition : allExecPartitions) {
for (auto&& exec : partition) {
// The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be
// cleaned up later.
exec->markAsKilled({ErrorCodes::QueryPlanKilled, reason});
}
}
allExecPartitions.clear();
// Mark all cursors as killed, but keep around those we can in order to provide a useful error
// message to the user when they attempt to use it next time.
std::vector> toDisposeWithoutMutex;
{
auto allCurrentPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allCurrentPartitions) {
for (auto it = partition.begin(); it != partition.end();) {
auto* cursor = it->second;
cursor->markAsKilled({ErrorCodes::QueryPlanKilled, reason});
// If there's an operation actively using the cursor, then that operation is now
// responsible for cleaning it up. Otherwise we can immediately dispose of it.
if (cursor->_operationUsingCursor) {
it = partition.erase(it);
continue;
}
if (!collectionGoingAway) {
// We keep around unpinned cursors so that future attempts to use the cursor
// will result in a useful error message.
++it;
} else {
toDisposeWithoutMutex.emplace_back(cursor);
it = partition.erase(it);
}
}
}
}
// Dispose of the cursors we can now delete. This might involve lock acquisitions for safe
// cleanup, so avoid doing it while holding mutexes.
for (auto&& cursor : toDisposeWithoutMutex) {
cursor->dispose(opCtx);
}
}
void CursorManager::invalidateDocument(OperationContext* opCtx,
const RecordId& dl,
InvalidationType type) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations.
if (supportsDocLocking()) {
// If a storage engine supports doc locking, then we do not need to invalidate.
// The transactional boundaries of the operation protect us.
return;
}
auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
for (auto&& partition : allExecPartitions) {
for (auto&& exec : partition) {
exec->invalidate(opCtx, dl, type);
}
}
auto allPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allPartitions) {
for (auto&& entry : partition) {
auto exec = entry.second->getExecutor();
exec->invalidate(opCtx, dl, type);
}
}
}
bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) {
if (cursor->isNoTimeout() || cursor->_operationUsingCursor) {
return false;
}
return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis());
}
std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
std::vector> toDisposeWithoutMutex;
for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) {
auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId);
for (auto it = lockedPartition->begin(); it != lockedPartition->end();) {
auto* cursor = it->second;
if (cursorShouldTimeout_inlock(cursor, now)) {
toDisposeWithoutMutex.emplace_back(cursor);
it = lockedPartition->erase(it);
} else {
++it;
}
}
}
// Be careful not to dispose of cursors while holding the partition lock.
for (auto&& cursor : toDisposeWithoutMutex) {
cursor->dispose(opCtx);
}
return toDisposeWithoutMutex.size();
}
namespace {
static AtomicUInt32 registeredPlanExecutorId;
} // namespace
Partitioned>::PartitionId CursorManager::registerExecutor(
PlanExecutor* exec) {
auto partitionId = registeredPlanExecutorId.fetchAndAdd(1);
exec->setRegistrationToken(partitionId);
_registeredPlanExecutors.insert(exec);
return partitionId;
}
void CursorManager::deregisterExecutor(PlanExecutor* exec) {
if (auto partitionId = exec->getRegistrationToken()) {
_registeredPlanExecutors.erase(exec);
}
}
StatusWith CursorManager::pinCursor(OperationContext* opCtx,
CursorId id,
AuthCheck checkSessionAuth) {
auto lockedPartition = _cursorMap->lockOnePartition(id);
auto it = lockedPartition->find(id);
if (it == lockedPartition->end()) {
return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
}
ClientCursor* cursor = it->second;
uassert(ErrorCodes::CursorInUse,
str::stream() << "cursor id " << id << " is already in use",
!cursor->_operationUsingCursor);
if (cursor->getExecutor()->isMarkedAsKilled()) {
// This cursor was killed while it was idle.
Status error = cursor->getExecutor()->getKillStatus();
deregisterAndDestroyCursor(std::move(lockedPartition),
opCtx,
std::unique_ptr(cursor));
return error;
}
if (checkSessionAuth == kCheckSession) {
auto cursorPrivilegeStatus = checkCursorSessionPrivilege(opCtx, cursor->getSessionId());
if (!cursorPrivilegeStatus.isOK()) {
return cursorPrivilegeStatus;
}
}
cursor->_operationUsingCursor = opCtx;
// We use pinning of a cursor as a proxy for active, user-initiated use of a cursor. Therefor,
// we pass down to the logical session cache and vivify the record (updating last use).
if (cursor->getSessionId()) {
auto vivifyCursorStatus =
LogicalSessionCache::get(opCtx)->vivify(opCtx, cursor->getSessionId().get());
if (!vivifyCursorStatus.isOK()) {
return vivifyCursorStatus;
}
}
return ClientCursorPin(opCtx, cursor);
}
void CursorManager::unpin(OperationContext* opCtx,
std::unique_ptr cursor) {
// Avoid computing the current time within the critical section.
auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
auto partition = _cursorMap->lockOnePartition(cursor->cursorid());
invariant(cursor->_operationUsingCursor);
// We must verify that no interrupts have occurred since we finished building the current
// batch. Otherwise, the cursor will be checked back in, the interrupted opCtx will be
// destroyed, and subsequent getMores with a fresh opCtx will succeed.
auto interruptStatus = cursor->_operationUsingCursor->checkForInterruptNoAssert();
cursor->_operationUsingCursor = nullptr;
cursor->_lastUseDate = now;
// If someone was trying to kill this cursor with a killOp or a killCursors, they are likely
// interesting in proactively cleaning up that cursor's resources. In these cases, we
// proactively delete the cursor. In other cases we preserve the error code so that the client
// will see the reason the cursor was killed when asking for the next batch.
if (interruptStatus == ErrorCodes::Interrupted || interruptStatus == ErrorCodes::CursorKilled) {
LOG(0) << "removing cursor " << cursor->cursorid()
<< " after completing batch: " << interruptStatus;
return deregisterAndDestroyCursor(std::move(partition), opCtx, std::move(cursor));
} else if (!interruptStatus.isOK()) {
cursor->markAsKilled(interruptStatus);
}
// The cursor will stay around in '_cursorMap', so release the unique pointer to avoid deleting
// it.
cursor.release();
}
void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const {
auto allPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allPartitions) {
for (auto&& entry : partition) {
auto cursor = entry.second;
if (auto id = cursor->getSessionId()) {
lsids->insert(id.value());
}
}
}
}
void CursorManager::appendActiveCursors(std::vector* cursors) const {
auto allPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allPartitions) {
for (auto&& entry : partition) {
auto cursor = entry.second;
cursors->emplace_back();
auto& gc = cursors->back();
gc.setId(cursor->_cursorid);
gc.setNs(cursor->nss());
gc.setLsid(cursor->getSessionId());
}
}
}
stdx::unordered_set CursorManager::getCursorsForSession(LogicalSessionId lsid) const {
stdx::unordered_set cursors;
auto allPartitions = _cursorMap->lockAllPartitions();
for (auto&& partition : allPartitions) {
for (auto&& entry : partition) {
auto cursor = entry.second;
if (cursor->getSessionId() == lsid) {
cursors.insert(cursor->cursorid());
}
}
}
return cursors;
}
size_t CursorManager::numCursors() const {
return _cursorMap->size();
}
CursorId CursorManager::allocateCursorId_inlock() {
for (int i = 0; i < 10000; i++) {
// The leading two bits of a CursorId are used to determine if the cursor is registered on
// the global cursor manager.
CursorId id;
if (isGlobalManager()) {
// This is the global cursor manager, so generate a random number and make sure the
// first two bits are 01.
uint64_t mask = 0x3FFFFFFFFFFFFFFF;
uint64_t bitToSet = 1ULL << 62;
id = ((_random->nextInt64() & mask) | bitToSet);
} else {
// The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
// bits are random.
uint32_t myPart = static_cast(_random->nextInt32());
id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
}
auto partition = _cursorMap->lockOnePartition(id);
if (partition->count(id) == 0)
return id;
}
fassertFailed(17360);
}
ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
ClientCursorParams&& cursorParams) {
// Avoid computing the current time within the critical section.
auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
// Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping
// it.
invariant(cursorParams.exec);
deregisterExecutor(cursorParams.exec.get());
cursorParams.exec.get_deleter().dismissDisposal();
cursorParams.exec->unsetRegistered();
// Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure
// we don't insert two cursors with the same cursor id.
stdx::lock_guard lock(_registrationLock);
CursorId cursorId = allocateCursorId_inlock();
std::unique_ptr clientCursor(
new ClientCursor(std::move(cursorParams), this, cursorId, opCtx, now));
// Register this cursor for lookup by transaction.
if (opCtx->getLogicalSessionId() && opCtx->getTxnNumber()) {
invariant(opCtx->getLogicalSessionId());
}
// Transfer ownership of the cursor to '_cursorMap'.
auto partition = _cursorMap->lockOnePartition(cursorId);
ClientCursor* unownedCursor = clientCursor.release();
partition->emplace(cursorId, unownedCursor);
return ClientCursorPin(opCtx, unownedCursor);
}
void CursorManager::deregisterCursor(ClientCursor* cursor) {
_cursorMap->erase(cursor->cursorid());
}
void CursorManager::deregisterAndDestroyCursor(
Partitioned, kNumPartitions>::OnePartition&& lk,
OperationContext* opCtx,
std::unique_ptr cursor) {
{
auto lockWithRestrictedScope = std::move(lk);
lockWithRestrictedScope->erase(cursor->cursorid());
}
// Dispose of the cursor without holding any cursor manager mutexes. Disposal of a cursor can
// require taking lock manager locks, which we want to avoid while holding a mutex. If we did
// so, any caller of a CursorManager method which already held a lock manager lock could induce
// a deadlock when trying to acquire a CursorManager lock.
cursor->dispose(opCtx);
}
Status CursorManager::killCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
auto lockedPartition = _cursorMap->lockOnePartition(id);
auto it = lockedPartition->find(id);
if (it == lockedPartition->end()) {
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound);
}
return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
}
auto cursor = it->second;
if (cursor->_operationUsingCursor) {
// Rather than removing the cursor directly, kill the operation that's currently using the
// cursor. It will stop on its own (and remove the cursor) when it sees that it's been
// interrupted.
{
stdx::unique_lock lk(*cursor->_operationUsingCursor->getClient());
cursor->_operationUsingCursor->getServiceContext()->killOperation(
cursor->_operationUsingCursor, ErrorCodes::CursorKilled);
}
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
}
return Status::OK();
}
std::unique_ptr ownedCursor(cursor);
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
}
deregisterAndDestroyCursor(std::move(lockedPartition), opCtx, std::move(ownedCursor));
return Status::OK();
}
Status CursorManager::checkAuthForKillCursors(OperationContext* opCtx, CursorId id) {
auto lockedPartition = _cursorMap->lockOnePartition(id);
auto it = lockedPartition->find(id);
if (it == lockedPartition->end()) {
return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
}
ClientCursor* cursor = it->second;
// Note that we're accessing the cursor without having pinned it! This is okay since we're only
// accessing nss() and getAuthenticatedUsers() both of which return values that don't change
// after the cursor's creation. We're guaranteed that the cursor won't get destroyed while we're
// reading from it because we hold the partition's lock.
AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient());
return as->checkAuthForKillCursors(cursor->nss(), cursor->getAuthenticatedUsers());
}
} // namespace mongo