// cursor_manager.cpp
/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/catalog/cursor_manager.h"
#include "mongo/base/data_cursor.h"
#include "mongo/base/init.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/random.h"
#include "mongo/util/exit.h"
#include "mongo/util/startup_test.h"
namespace mongo {
using std::vector;
namespace {
uint32_t idFromCursorId(CursorId id) {
uint64_t x = static_cast(id);
x = x >> 32;
return static_cast(x);
}
CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
// The leading two bits of a non-global CursorId should be 0.
invariant((collectionIdentifier & (0b11 << 30)) == 0);
CursorId x = static_cast(collectionIdentifier) << 32;
x |= cursor;
return x;
}
} // namespace
class GlobalCursorIdCache {
public:
GlobalCursorIdCache();
~GlobalCursorIdCache();
/**
* Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
* new CursorManager.
*/
uint32_t registerCursorManager(const NamespaceString& nss);
/**
* Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
* registerCursorManager().
*/
void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
/**
* works globally
*/
bool eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth);
void appendStats(BSONObjBuilder& builder);
std::size_t timeoutCursors(OperationContext* opCtx, int millisSinceLastCall);
int64_t nextSeed();
private:
SimpleMutex _mutex;
typedef unordered_map Map;
Map _idToNss;
unsigned _nextId;
std::unique_ptr _secureRandom;
};
// Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter
// calls into the former during destruction.
std::unique_ptr globalCursorIdCache;
std::unique_ptr globalCursorManager;
MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
globalCursorIdCache.reset(new GlobalCursorIdCache());
return Status::OK();
}
MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
(InitializerContext* context) {
globalCursorManager.reset(new CursorManager({}));
return Status::OK();
}
GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {}
GlobalCursorIdCache::~GlobalCursorIdCache() {}
int64_t GlobalCursorIdCache::nextSeed() {
stdx::lock_guard lk(_mutex);
if (!_secureRandom)
_secureRandom.reset(SecureRandom::create());
return _secureRandom->nextInt64();
}
uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
static const uint32_t kMaxIds = 1000 * 1000 * 1000;
static_assert((kMaxIds & (0b11 << 30)) == 0,
"the first two bits of a collection identifier must always be zeroes");
stdx::lock_guard lk(_mutex);
fassert(17359, _idToNss.size() < kMaxIds);
for (uint32_t i = 0; i <= kMaxIds; i++) {
uint32_t id = ++_nextId;
if (id == 0)
continue;
if (_idToNss.count(id) > 0)
continue;
_idToNss[id] = nss;
return id;
}
MONGO_UNREACHABLE;
}
void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
stdx::lock_guard lk(_mutex);
invariant(nss == _idToNss[id]);
_idToNss.erase(id);
}
bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
// Figure out what the namespace of this cursor is.
NamespaceString nss;
if (CursorManager::isGloballyManagedCursor(id)) {
auto pin = globalCursorManager->pinCursor(opCtx, id);
if (!pin.isOK()) {
invariant(pin == ErrorCodes::CursorNotFound);
// No such cursor. TODO: Consider writing to audit log here (even though we don't
// have a namespace).
return false;
}
nss = pin.getValue().getCursor()->nss();
} else {
stdx::lock_guard lk(_mutex);
uint32_t nsid = idFromCursorId(id);
Map::const_iterator it = _idToNss.find(nsid);
if (it == _idToNss.end()) {
// No namespace corresponding to this cursor id prefix. TODO: Consider writing to
// audit log here (even though we don't have a namespace).
return false;
}
nss = it->second;
}
invariant(nss.isValid());
// Check if we are authorized to erase this cursor.
if (checkAuth) {
AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient());
Status authorizationStatus = as->checkAuthForKillCursors(nss, id);
if (!authorizationStatus.isOK()) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, ErrorCodes::Unauthorized);
return false;
}
}
// If this cursor is owned by the global cursor manager, ask it to erase the cursor for us.
if (CursorManager::isGloballyManagedCursor(id)) {
Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth);
massert(28697,
eraseStatus.reason(),
eraseStatus.code() == ErrorCodes::OK ||
eraseStatus.code() == ErrorCodes::CursorNotFound);
return eraseStatus.isOK();
}
// If not, then the cursor must be owned by a collection. Erase the cursor under the
// collection lock (to prevent the collection from going away during the erase).
AutoGetCollectionForReadCommand ctx(opCtx, nss);
Collection* collection = ctx.getCollection();
if (!collection) {
if (checkAuth)
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound);
return false;
}
Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth);
massert(16089,
eraseStatus.reason(),
eraseStatus.code() == ErrorCodes::OK ||
eraseStatus.code() == ErrorCodes::CursorNotFound);
return eraseStatus.isOK();
}
std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, int millisSinceLastCall) {
size_t totalTimedOut = 0;
// Time out the cursors from the global cursor manager.
totalTimedOut += globalCursorManager->timeoutCursors(opCtx, millisSinceLastCall);
// Compute the set of collection names that we have to time out cursors for.
vector todo;
{
stdx::lock_guard lk(_mutex);
for (auto&& entry : _idToNss) {
todo.push_back(entry.second);
}
}
// For each collection, time out its cursors under the collection lock (to prevent the
// collection from going away during the erase).
for (unsigned i = 0; i < todo.size(); i++) {
AutoGetCollectionOrViewForReadCommand ctx(opCtx, NamespaceString(todo[i]));
if (!ctx.getDb()) {
continue;
}
Collection* collection = ctx.getCollection();
if (collection == NULL) {
continue;
}
totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, millisSinceLastCall);
}
return totalTimedOut;
}
// ---
CursorManager* CursorManager::getGlobalCursorManager() {
return globalCursorManager.get();
}
std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, int millisSinceLastCall) {
return globalCursorIdCache->timeoutCursors(opCtx, millisSinceLastCall);
}
int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
ConstDataCursor ids(_ids);
int numDeleted = 0;
for (int i = 0; i < n; i++) {
if (eraseCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance>()))
numDeleted++;
if (globalInShutdownDeprecated())
break;
}
return numDeleted;
}
bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) {
return globalCursorIdCache->eraseCursor(opCtx, id, true);
}
bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
return globalCursorIdCache->eraseCursor(opCtx, id, false);
}
// --------------------------
CursorManager::CursorManager(NamespaceString nss) : _nss(std::move(nss)) {
if (!isGlobalManager()) {
// Generate a unique id for this collection.
_collectionCacheRuntimeId = globalCursorIdCache->registerCursorManager(_nss);
}
_random.reset(new PseudoRandom(globalCursorIdCache->nextSeed()));
}
CursorManager::~CursorManager() {
if (!isGlobalManager()) {
globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
}
invariant(_cursors.empty());
invariant(_nonCachedExecutors.empty());
}
void CursorManager::invalidateAll(OperationContext* opCtx,
bool collectionGoingAway,
const std::string& reason) {
invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors.
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
stdx::lock_guard lk(_mutex);
fassert(28819, !BackgroundOperation::inProgForNs(_nss));
for (auto&& exec : _nonCachedExecutors) {
// We kill the executor, but it deletes itself.
exec->markAsKilled(reason);
}
_nonCachedExecutors.clear();
CursorMap newMap;
for (auto&& entry : _cursors) {
auto* cursor = entry.second;
cursor->markAsKilled(reason);
if (cursor->_isPinned) {
// There is an active user of this cursor, who is now responsible for cleaning it up.
// This CursorManager will no longer track this cursor.
continue;
}
if (!collectionGoingAway) {
// We keep around unpinned cursors so that future attempts to use the cursor will result
// in a useful error message.
newMap.insert(entry);
} else {
// The collection is going away, so there's no point in keeping any state.
cursor->dispose(opCtx);
delete cursor;
}
}
_cursors = newMap;
}
void CursorManager::invalidateDocument(OperationContext* opCtx,
const RecordId& dl,
InvalidationType type) {
invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations.
if (supportsDocLocking()) {
// If a storage engine supports doc locking, then we do not need to invalidate.
// The transactional boundaries of the operation protect us.
return;
}
stdx::lock_guard lk(_mutex);
for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end();
++it) {
PlanExecutor* exec = *it;
exec->invalidate(opCtx, dl, type);
}
for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
PlanExecutor* exec = i->second->getExecutor();
if (exec) {
exec->invalidate(opCtx, dl, type);
}
}
}
std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, int millisSinceLastCall) {
vector toDelete;
stdx::lock_guard lk(_mutex);
for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
ClientCursor* cc = i->second;
// shouldTimeout() ensures that we skip pinned cursors.
if (cc->shouldTimeout(millisSinceLastCall))
toDelete.push_back(cc);
}
// Properly dispose of each cursor that was timed out.
for (vector::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) {
ClientCursor* cc = *i;
_deregisterCursor_inlock(cc);
cc->dispose(opCtx);
delete cc;
}
return toDelete.size();
}
void CursorManager::registerExecutor(PlanExecutor* exec) {
stdx::lock_guard lk(_mutex);
const std::pair result = _nonCachedExecutors.insert(exec);
invariant(result.second); // make sure this was inserted
}
void CursorManager::deregisterExecutor(PlanExecutor* exec) {
stdx::lock_guard lk(_mutex);
_nonCachedExecutors.erase(exec);
}
StatusWith CursorManager::pinCursor(OperationContext* opCtx, CursorId id) {
stdx::lock_guard lk(_mutex);
CursorMap::const_iterator it = _cursors.find(id);
if (it == _cursors.end()) {
return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
}
ClientCursor* cursor = it->second;
uassert(12051, str::stream() << "cursor id " << id << " is already in use", !cursor->_isPinned);
if (cursor->_killed) {
// This cursor was killed while it was idle.
invariant(cursor->getExecutor()); // We should never unpin RangePreserver cursors.
Status error{ErrorCodes::QueryPlanKilled,
str::stream() << "cursor killed because: "
<< cursor->getExecutor()->getKillReason()};
_deregisterCursor_inlock(cursor);
cursor->dispose(opCtx);
delete cursor;
return error;
}
cursor->_isPinned = true;
return ClientCursorPin(opCtx, cursor);
}
void CursorManager::unpin(ClientCursor* cursor) {
stdx::lock_guard lk(_mutex);
invariant(cursor->_isPinned);
cursor->_isPinned = false;
}
void CursorManager::getCursorIds(std::set* openCursors) const {
stdx::lock_guard lk(_mutex);
for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
ClientCursor* cc = i->second;
openCursors->insert(cc->cursorid());
}
}
size_t CursorManager::numCursors() const {
stdx::lock_guard lk(_mutex);
return _cursors.size();
}
CursorId CursorManager::_allocateCursorId_inlock() {
for (int i = 0; i < 10000; i++) {
// The leading two bits of a CursorId are used to determine if the cursor is registered on
// the global cursor manager.
CursorId id;
if (isGlobalManager()) {
// This is the global cursor manager, so generate a random number and make sure the
// first two bits are 01.
uint64_t mask = 0x3FFFFFFFFFFFFFFF;
uint64_t bitToSet = 1ULL << 62;
id = ((_random->nextInt64() & mask) | bitToSet);
} else {
// The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
// bits are random.
uint32_t myPart = static_cast(_random->nextInt32());
id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
}
if (_cursors.count(id) == 0)
return id;
}
fassertFailed(17360);
}
ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
ClientCursorParams&& cursorParams) {
stdx::lock_guard lk(_mutex);
// Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping
// it.
invariant(cursorParams.exec);
_nonCachedExecutors.erase(cursorParams.exec.get());
cursorParams.exec.get_deleter().dismissDisposal();
cursorParams.exec->unsetRegistered();
CursorId cursorId = _allocateCursorId_inlock();
std::unique_ptr clientCursor(
new ClientCursor(std::move(cursorParams), this, cursorId));
return _registerCursor_inlock(opCtx, std::move(clientCursor));
}
ClientCursorPin CursorManager::registerRangePreserverCursor(OperationContext* opCtx,
const Collection* collection) {
stdx::lock_guard lk(_mutex);
CursorId cursorId = _allocateCursorId_inlock();
std::unique_ptr clientCursor(
new ClientCursor(collection, this, cursorId));
return _registerCursor_inlock(opCtx, std::move(clientCursor));
}
ClientCursorPin CursorManager::_registerCursor_inlock(
OperationContext* opCtx, std::unique_ptr clientCursor) {
CursorId cursorId = clientCursor->cursorid();
invariant(cursorId);
// Transfer ownership of the cursor to '_cursors'.
ClientCursor* unownedCursor = clientCursor.release();
_cursors[cursorId] = unownedCursor;
return ClientCursorPin(opCtx, unownedCursor);
}
void CursorManager::deregisterCursor(ClientCursor* cc) {
stdx::lock_guard lk(_mutex);
_deregisterCursor_inlock(cc);
}
Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
stdx::lock_guard lk(_mutex);
CursorMap::iterator it = _cursors.find(id);
if (it == _cursors.end()) {
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound);
}
return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
}
auto cursor = it->second;
if (cursor->_isPinned) {
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(
opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed);
}
return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id};
}
if (shouldAudit) {
audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
}
_deregisterCursor_inlock(cursor);
cursor->dispose(opCtx);
delete cursor;
return Status::OK();
}
void CursorManager::_deregisterCursor_inlock(ClientCursor* cc) {
invariant(cc);
CursorId id = cc->cursorid();
_cursors.erase(id);
}
} // namespace mongo