From 7226d27bb57301fcfb995b22bcb3b0eee23ba540 Mon Sep 17 00:00:00 2001 From: ADAM David Alan Martin Date: Mon, 5 Jun 2017 11:07:00 -0400 Subject: SERVER-29253 Migrate cursor manager to the client cursor lib. Putting `CursorManager` and `ClientCursor` in the same library allows us to remove a cycle exemption in cursor and to fully resolve mmap_v1 against dependencies. --- src/mongo/db/SConscript | 7 +- src/mongo/db/assemble_response.cpp | 2 +- src/mongo/db/catalog/SConscript | 3 +- src/mongo/db/catalog/collection.h | 4 +- src/mongo/db/catalog/cursor_manager.cpp | 575 ----------------------------- src/mongo/db/catalog/cursor_manager.h | 245 ------------ src/mongo/db/clientcursor.cpp | 2 +- src/mongo/db/commands/getmore_cmd.cpp | 2 +- src/mongo/db/commands/killcursors_cmd.cpp | 2 +- src/mongo/db/commands/list_collections.cpp | 2 +- src/mongo/db/commands/list_indexes.cpp | 2 +- src/mongo/db/cursor_manager.cpp | 573 ++++++++++++++++++++++++++++ src/mongo/db/cursor_manager.h | 243 ++++++++++++ src/mongo/db/storage/mmap_v1/SConscript | 9 +- src/mongo/dbtests/cursor_manager_test.cpp | 2 +- 15 files changed, 832 insertions(+), 841 deletions(-) delete mode 100644 src/mongo/db/catalog/cursor_manager.cpp delete mode 100644 src/mongo/db/catalog/cursor_manager.h create mode 100644 src/mongo/db/cursor_manager.cpp create mode 100644 src/mongo/db/cursor_manager.h (limited to 'src/mongo') diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e6b85a9329e..90cf669054b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -606,16 +606,13 @@ env.Library( target="clientcursor", source=[ "clientcursor.cpp", + "cursor_manager.cpp", ], LIBDEPS=[ "$BUILD_DIR/mongo/base", "$BUILD_DIR/mongo/util/background_job", "query/query", - #"catalog/catalog", # CYCLE (CursorManager) - ], - LIBDEPS_TAGS=[ - # TODO(ADAM, 2017-03-10): See `CYCLE` tags above - 'illegal_cyclic_or_unresolved_dependencies_whitelisted', + "background", ], ) diff --git a/src/mongo/db/assemble_response.cpp b/src/mongo/db/assemble_response.cpp index 1eda99f8c94..abbb666e8d0 100644 --- a/src/mongo/db/assemble_response.cpp +++ b/src/mongo/db/assemble_response.cpp @@ -34,10 +34,10 @@ #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/curop.h" #include "mongo/db/curop_metrics.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/diag_log.h" #include "mongo/db/introspect.h" diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 7e19fa604cb..20180689ef7 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -154,7 +154,6 @@ env.Library( "collection_compact.cpp", "collection_impl.cpp", "collection_info_cache_impl.cpp", - "cursor_manager.cpp", "database_impl.cpp", "database_holder_impl.cpp", "index_catalog_impl.cpp", @@ -173,6 +172,7 @@ env.Library( 'index_key_validate', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmongod', + '$BUILD_DIR/mongo/db/clientcursor', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/query/query', @@ -187,7 +187,6 @@ env.Library( '$BUILD_DIR/mongo/db/background', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/index/index_access_methods', - '$BUILD_DIR/mongo/db/clientcursor', '$BUILD_DIR/mongo/db/s/balancer', '$BUILD_DIR/mongo/db/views/views_mongod', ], diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index caf3a32288e..002a5c21366 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -1,4 +1,4 @@ -/*- +/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify @@ -38,8 +38,8 @@ #include "mongo/bson/mutable/damage_vector.h" #include "mongo/db/catalog/coll_mod.h" #include "mongo/db/catalog/collection_info_cache.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/exec/collection_scan_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp deleted file mode 100644 index 918134b057d..00000000000 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ /dev/null @@ -1,575 +0,0 @@ -// cursor_manager.cpp - -/** -* Copyright (C) 2013 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/db/catalog/cursor_manager.h" - -#include "mongo/base/data_cursor.h" -#include "mongo/base/init.h" -#include "mongo/db/audit.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/background.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/client.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/query/plan_executor.h" -#include "mongo/db/server_parameters.h" -#include "mongo/db/service_context.h" -#include "mongo/platform/random.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/exit.h" -#include "mongo/util/startup_test.h" - -namespace mongo { -using std::vector; - -constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes; - -MONGO_EXPORT_SERVER_PARAMETER( - cursorTimeoutMillis, - int, - durationCount(CursorManager::kDefaultCursorTimeoutMinutes)); - -constexpr int CursorManager::kNumPartitions; - -namespace { -uint32_t idFromCursorId(CursorId id) { - uint64_t x = static_cast(id); - x = x >> 32; - return static_cast(x); -} - -CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) { - // The leading two bits of a non-global CursorId should be 0. - invariant((collectionIdentifier & (0b11 << 30)) == 0); - CursorId x = static_cast(collectionIdentifier) << 32; - x |= cursor; - return x; -} -} // namespace - -class GlobalCursorIdCache { -public: - GlobalCursorIdCache(); - ~GlobalCursorIdCache(); - - /** - * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a - * new CursorManager. - */ - uint32_t registerCursorManager(const NamespaceString& nss); - - /** - * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by - * registerCursorManager(). - */ - void deregisterCursorManager(uint32_t id, const NamespaceString& nss); - - /** - * works globally - */ - bool eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth); - - void appendStats(BSONObjBuilder& builder); - - std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); - - int64_t nextSeed(); - -private: - SimpleMutex _mutex; - - typedef unordered_map Map; - Map _idToNss; - unsigned _nextId; - - std::unique_ptr _secureRandom; -}; - -// Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter -// calls into the former during destruction. -std::unique_ptr globalCursorIdCache; -std::unique_ptr globalCursorManager; - -MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { - globalCursorIdCache.reset(new GlobalCursorIdCache()); - return Status::OK(); -} - -MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) -(InitializerContext* context) { - globalCursorManager.reset(new CursorManager({})); - return Status::OK(); -} - -GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {} - -GlobalCursorIdCache::~GlobalCursorIdCache() {} - -int64_t GlobalCursorIdCache::nextSeed() { - stdx::lock_guard lk(_mutex); - if (!_secureRandom) - _secureRandom.reset(SecureRandom::create()); - return _secureRandom->nextInt64(); -} - -uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { - static const uint32_t kMaxIds = 1000 * 1000 * 1000; - static_assert((kMaxIds & (0b11 << 30)) == 0, - "the first two bits of a collection identifier must always be zeroes"); - - stdx::lock_guard lk(_mutex); - - fassert(17359, _idToNss.size() < kMaxIds); - - for (uint32_t i = 0; i <= kMaxIds; i++) { - uint32_t id = ++_nextId; - if (id == 0) - continue; - if (_idToNss.count(id) > 0) - continue; - _idToNss[id] = nss; - return id; - } - - MONGO_UNREACHABLE; -} - -void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) { - stdx::lock_guard lk(_mutex); - invariant(nss == _idToNss[id]); - _idToNss.erase(id); -} - -bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) { - // Figure out what the namespace of this cursor is. - NamespaceString nss; - if (CursorManager::isGloballyManagedCursor(id)) { - auto pin = globalCursorManager->pinCursor(opCtx, id); - if (!pin.isOK()) { - invariant(pin == ErrorCodes::CursorNotFound); - // No such cursor. TODO: Consider writing to audit log here (even though we don't - // have a namespace). - return false; - } - nss = pin.getValue().getCursor()->nss(); - } else { - stdx::lock_guard lk(_mutex); - uint32_t nsid = idFromCursorId(id); - Map::const_iterator it = _idToNss.find(nsid); - if (it == _idToNss.end()) { - // No namespace corresponding to this cursor id prefix. TODO: Consider writing to - // audit log here (even though we don't have a namespace). - return false; - } - nss = it->second; - } - invariant(nss.isValid()); - - // Check if we are authorized to erase this cursor. - if (checkAuth) { - AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient()); - Status authorizationStatus = as->checkAuthForKillCursors(nss, id); - if (!authorizationStatus.isOK()) { - audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, ErrorCodes::Unauthorized); - return false; - } - } - - // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us. - if (CursorManager::isGloballyManagedCursor(id)) { - Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth); - massert(28697, - eraseStatus.reason(), - eraseStatus.code() == ErrorCodes::OK || - eraseStatus.code() == ErrorCodes::CursorNotFound); - return eraseStatus.isOK(); - } - - // If not, then the cursor must be owned by a collection. Erase the cursor under the - // collection lock (to prevent the collection from going away during the erase). - AutoGetCollectionForReadCommand ctx(opCtx, nss); - Collection* collection = ctx.getCollection(); - if (!collection) { - if (checkAuth) - audit::logKillCursorsAuthzCheck( - opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound); - return false; - } - - Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth); - uassert(16089, - eraseStatus.reason(), - eraseStatus.code() == ErrorCodes::OK || - eraseStatus.code() == ErrorCodes::CursorNotFound); - return eraseStatus.isOK(); -} - -std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) { - size_t totalTimedOut = 0; - - // Time out the cursors from the global cursor manager. - totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now); - - // Compute the set of collection names that we have to time out cursors for. - vector todo; - { - stdx::lock_guard lk(_mutex); - for (auto&& entry : _idToNss) { - todo.push_back(entry.second); - } - } - - // For each collection, time out its cursors under the collection lock (to prevent the - // collection from going away during the erase). - for (unsigned i = 0; i < todo.size(); i++) { - AutoGetCollectionOrViewForReadCommand ctx(opCtx, NamespaceString(todo[i])); - if (!ctx.getDb()) { - continue; - } - - Collection* collection = ctx.getCollection(); - if (collection == NULL) { - continue; - } - - totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now); - } - - return totalTimedOut; -} - -// --- - -CursorManager* CursorManager::getGlobalCursorManager() { - return globalCursorManager.get(); -} - -std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { - return globalCursorIdCache->timeoutCursors(opCtx, now); -} - -int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) { - ConstDataCursor ids(_ids); - int numDeleted = 0; - for (int i = 0; i < n; i++) { - if (eraseCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance>())) - numDeleted++; - if (globalInShutdownDeprecated()) - break; - } - return numDeleted; -} -bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) { - return globalCursorIdCache->eraseCursor(opCtx, id, true); -} -bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) { - return globalCursorIdCache->eraseCursor(opCtx, id, false); -} - - -// -------------------------- - -std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec, - const std::size_t nPartitions) { - auto token = exec->getRegistrationToken(); - invariant(token); - return (*token) % nPartitions; -} - -CursorManager::CursorManager(NamespaceString nss) - : _nss(std::move(nss)), - _collectionCacheRuntimeId(_nss.isEmpty() ? 0 - : globalCursorIdCache->registerCursorManager(_nss)), - _random(stdx::make_unique(globalCursorIdCache->nextSeed())), - _registeredPlanExecutors(), - _cursorMap(stdx::make_unique>>()) {} - -CursorManager::~CursorManager() { - // All cursors and PlanExecutors should have been deleted already. - invariant(_registeredPlanExecutors.empty()); - invariant(_cursorMap->empty()); - - if (!isGlobalManager()) { - globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); - } -} - -void CursorManager::invalidateAll(OperationContext* opCtx, - bool collectionGoingAway, - const std::string& reason) { - invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors. - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); - fassert(28819, !BackgroundOperation::inProgForNs(_nss)); - auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); - for (auto&& partition : allExecPartitions) { - for (auto&& exec : partition) { - // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be - // cleaned up later. - exec->markAsKilled(reason); - } - } - allExecPartitions.clear(); - - // Mark all cursors as killed, but keep around those we can in order to provide a useful error - // message to the user when they attempt to use it next time. - auto allCurrentPartitions = _cursorMap->lockAllPartitions(); - for (auto&& partition : allCurrentPartitions) { - for (auto it = partition.begin(); it != partition.end();) { - auto* cursor = it->second; - cursor->markAsKilled(reason); - - // If pinned, there is an active user of this cursor, who is now responsible for - // cleaning it up. Otherwise, we can immediately dispose of it. - if (cursor->_isPinned) { - it = partition.erase(it); - continue; - } - - if (!collectionGoingAway) { - // We keep around unpinned cursors so that future attempts to use the cursor will - // result in a useful error message. - ++it; - } else { - cursor->dispose(opCtx); - delete cursor; - it = partition.erase(it); - } - } - } -} - -void CursorManager::invalidateDocument(OperationContext* opCtx, - const RecordId& dl, - InvalidationType type) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations. - if (supportsDocLocking()) { - // If a storage engine supports doc locking, then we do not need to invalidate. - // The transactional boundaries of the operation protect us. - return; - } - - auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); - for (auto&& partition : allExecPartitions) { - for (auto&& exec : partition) { - exec->invalidate(opCtx, dl, type); - } - } - - auto allPartitions = _cursorMap->lockAllPartitions(); - for (auto&& partition : allPartitions) { - for (auto&& entry : partition) { - auto exec = entry.second->getExecutor(); - exec->invalidate(opCtx, dl, type); - } - } -} - -bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) { - if (cursor->isNoTimeout() || cursor->_isPinned) { - return false; - } - return (now - cursor->_lastUseDate) >= Milliseconds(cursorTimeoutMillis.load()); -} - -std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { - std::vector> toDelete; - - for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) { - auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId); - for (auto it = lockedPartition->begin(); it != lockedPartition->end();) { - auto* cursor = it->second; - if (cursorShouldTimeout_inlock(cursor, now)) { - // Dispose of the cursor and remove it from the partition. - cursor->dispose(opCtx); - toDelete.push_back(std::unique_ptr{cursor}); - it = lockedPartition->erase(it); - } else { - ++it; - } - } - } - - return toDelete.size(); -} - -namespace { -static AtomicUInt32 registeredPlanExecutorId; -} // namespace - -Partitioned>::PartitionId CursorManager::registerExecutor( - PlanExecutor* exec) { - auto partitionId = registeredPlanExecutorId.fetchAndAdd(1); - exec->setRegistrationToken(partitionId); - _registeredPlanExecutors.insert(exec); - return partitionId; -} - -void CursorManager::deregisterExecutor(PlanExecutor* exec) { - if (auto partitionId = exec->getRegistrationToken()) { - _registeredPlanExecutors.erase(exec); - } -} - -StatusWith CursorManager::pinCursor(OperationContext* opCtx, CursorId id) { - auto lockedPartition = _cursorMap->lockOnePartition(id); - auto it = lockedPartition->find(id); - if (it == lockedPartition->end()) { - return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"}; - } - - ClientCursor* cursor = it->second; - uassert(12051, str::stream() << "cursor id " << id << " is already in use", !cursor->_isPinned); - if (cursor->getExecutor()->isMarkedAsKilled()) { - // This cursor was killed while it was idle. - Status error{ErrorCodes::QueryPlanKilled, - str::stream() << "cursor killed because: " - << cursor->getExecutor()->getKillReason()}; - lockedPartition->erase(cursor->cursorid()); - cursor->dispose(opCtx); - delete cursor; - return error; - } - cursor->_isPinned = true; - return ClientCursorPin(opCtx, cursor); -} - -void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { - // Avoid computing the current time within the critical section. - auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); - - auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid()); - invariant(cursor->_isPinned); - cursor->_isPinned = false; - cursor->_lastUseDate = now; -} - -void CursorManager::getCursorIds(std::set* openCursors) const { - auto allPartitions = _cursorMap->lockAllPartitions(); - for (auto&& partition : allPartitions) { - for (auto&& entry : partition) { - openCursors->insert(entry.first); - } - } -} - -size_t CursorManager::numCursors() const { - return _cursorMap->size(); -} - -CursorId CursorManager::allocateCursorId_inlock() { - for (int i = 0; i < 10000; i++) { - // The leading two bits of a CursorId are used to determine if the cursor is registered on - // the global cursor manager. - CursorId id; - if (isGlobalManager()) { - // This is the global cursor manager, so generate a random number and make sure the - // first two bits are 01. - uint64_t mask = 0x3FFFFFFFFFFFFFFF; - uint64_t bitToSet = 1ULL << 62; - id = ((_random->nextInt64() & mask) | bitToSet); - } else { - // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32 - // bits are random. - uint32_t myPart = static_cast(_random->nextInt32()); - id = cursorIdFromParts(_collectionCacheRuntimeId, myPart); - } - auto partition = _cursorMap->lockOnePartition(id); - if (partition->count(id) == 0) - return id; - } - fassertFailed(17360); -} - -ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, - ClientCursorParams&& cursorParams) { - // Avoid computing the current time within the critical section. - auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); - - // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping - // it. - invariant(cursorParams.exec); - deregisterExecutor(cursorParams.exec.get()); - cursorParams.exec.get_deleter().dismissDisposal(); - cursorParams.exec->unsetRegistered(); - - // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure - // we don't insert two cursors with the same cursor id. - stdx::lock_guard lock(_registrationLock); - CursorId cursorId = allocateCursorId_inlock(); - std::unique_ptr clientCursor( - new ClientCursor(std::move(cursorParams), this, cursorId, now)); - - // Transfer ownership of the cursor to '_cursorMap'. - auto partition = _cursorMap->lockOnePartition(cursorId); - ClientCursor* unownedCursor = clientCursor.release(); - partition->emplace(cursorId, unownedCursor); - return ClientCursorPin(opCtx, unownedCursor); -} - -void CursorManager::deregisterCursor(ClientCursor* cc) { - _cursorMap->erase(cc->cursorid()); -} - -Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { - auto lockedPartition = _cursorMap->lockOnePartition(id); - auto it = lockedPartition->find(id); - if (it == lockedPartition->end()) { - if (shouldAudit) { - audit::logKillCursorsAuthzCheck( - opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); - } - return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; - } - auto cursor = it->second; - - if (cursor->_isPinned) { - if (shouldAudit) { - audit::logKillCursorsAuthzCheck( - opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed); - } - return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id}; - } - std::unique_ptr ownedCursor(cursor); - - if (shouldAudit) { - audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); - } - - lockedPartition->erase(ownedCursor->cursorid()); - ownedCursor->dispose(opCtx); - return Status::OK(); -} - -} // namespace mongo diff --git a/src/mongo/db/catalog/cursor_manager.h b/src/mongo/db/catalog/cursor_manager.h deleted file mode 100644 index 70fce8f3804..00000000000 --- a/src/mongo/db/catalog/cursor_manager.h +++ /dev/null @@ -1,245 +0,0 @@ -// cursor_manager.h - -/** -* Copyright (C) 2013 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/catalog/util/partitioned.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/cursor_id.h" -#include "mongo/db/invalidation_type.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/record_id.h" -#include "mongo/platform/unordered_map.h" -#include "mongo/platform/unordered_set.h" -#include "mongo/util/concurrency/mutex.h" -#include "mongo/util/duration.h" - -namespace mongo { - -class OperationContext; -class PseudoRandom; -class PlanExecutor; - -/** - * A container which owns ClientCursor objects. This class is used to create, access, and delete - * ClientCursors. It is also responsible for allocating the cursor ids that are passed back to - * clients. - * - * In addition to managing the lifetime of ClientCursors, the CursorManager is responsible for - * notifying yielded queries of write operations and collection drops. For this reason, query - * PlanExecutor objects which are not contained within a ClientCursor are also registered with the - * CursorManager. Query executors must be registered with the CursorManager, either as a bare - * PlanExecutor or inside a ClientCursor (but cannot be registered in both ways). - * - * There is a CursorManager per-collection and a global CursorManager. The global CursorManager owns - * cursors whose lifetime is not tied to that of the collection and which do not need to receive - * notifications about writes for a particular collection. In contrast, cursors owned by a - * collection's CursorManager, unless pinned, are destroyed when the collection is destroyed. Such - * cursors receive notifications about writes to the collection. - * - * Callers must hold the collection lock in at least MODE_IS in order to access a collection's - * CursorManager, which guards against the CursorManager being concurrently deleted due to a - * catalog-level operation such as a collection drop. No locks are required to access the global - * cursor manager. - * - * The CursorManager is internally synchronized; operations on a given collection may call methods - * concurrently on that collection's CursorManager. - * - * See clientcursor.h for more information. - */ -class CursorManager { -public: - // The number of minutes a cursor is allowed to be idle before timing out. - static constexpr Minutes kDefaultCursorTimeoutMinutes{10}; - using RegistrationToken = Partitioned>::PartitionId; - - CursorManager(NamespaceString nss); - - /** - * Destroys the CursorManager. All cursors and PlanExecutors must be cleaned up via - * invalidateAll() before destruction. - */ - ~CursorManager(); - - /** - * Kills all managed query executors and ClientCursors. Callers must have exclusive access to - * the collection (i.e. must have the collection, databse, or global resource locked in MODE_X). - * - * 'collectionGoingAway' indicates whether the Collection instance is being deleted. This could - * be because the db is being closed, or the collection/db is being dropped. - * - * The 'reason' is the motivation for invalidating all cursors. This will be used for error - * reporting and logging when an operation finds that the cursor it was operating on has been - * killed. - */ - void invalidateAll(OperationContext* opCtx, - bool collectionGoingAway, - const std::string& reason); - - /** - * Broadcast a document invalidation to all relevant PlanExecutor(s). invalidateDocument - * must called *before* the provided RecordId is about to be deleted or mutated. - */ - void invalidateDocument(OperationContext* opCtx, const RecordId& dl, InvalidationType type); - - /** - * Destroys cursors that have been inactive for too long. - * - * Returns the number of cursors that were timed out. - */ - std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); - - /** - * Register an executor so that it can be notified of deletions, invalidations, collection - * drops, or the like during yields. Must be called before an executor yields. Registration - * happens automatically for yielding PlanExecutors, so this should only be called by a - * PlanExecutor itself. Returns a token that must be stored for use during deregistration. - */ - Partitioned>::PartitionId registerExecutor(PlanExecutor* exec); - - /** - * Remove an executor from the registry. It is legal to call this even if 'exec' is not - * registered. - */ - void deregisterExecutor(PlanExecutor* exec); - - /** - * Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically - * registered with the manager and returned in pinned state. - */ - ClientCursorPin registerCursor(OperationContext* opCtx, ClientCursorParams&& cursorParams); - - /** - * Pins and returns the cursor with the given id. - * - * Returns ErrorCodes::CursorNotFound if the cursor does not exist or - * ErrorCodes::QueryPlanKilled if the cursor was killed in between uses. - * - * Throws a UserException if the cursor is already pinned. Callers need not specially handle - * this error, as it should only happen if a misbehaving client attempts to simultaneously issue - * two operations against the same cursor id. - */ - StatusWith pinCursor(OperationContext* opCtx, CursorId id); - - /** - * Returns an OK status if the cursor was successfully erased. - * - * Returns ErrorCodes::CursorNotFound if the cursor id is not owned by this manager. Returns - * ErrorCodes::OperationFailed if attempting to erase a pinned cursor. - * - * If 'shouldAudit' is true, will perform audit logging. - */ - Status eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit); - - void getCursorIds(std::set* openCursors) const; - - /** - * Returns the number of ClientCursors currently registered. Excludes any registered bare - * PlanExecutors. - */ - std::size_t numCursors() const; - - static CursorManager* getGlobalCursorManager(); - - /** - * Returns true if this CursorId would be registered with the global CursorManager. Note that if - * this method returns true it does not imply the cursor exists. - */ - static bool isGloballyManagedCursor(CursorId cursorId) { - // The first two bits are 01 for globally managed cursors, and 00 for cursors owned by a - // collection. The leading bit is always 0 so that CursorIds do not appear as negative. - const long long mask = static_cast(0b11) << 62; - return (cursorId & mask) == (static_cast(0b01) << 62); - } - - static int eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* ids); - - static bool eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id); - - static bool eraseCursorGlobal(OperationContext* opCtx, CursorId id); - - /** - * Deletes inactive cursors from the global cursor manager and from all per-collection cursor - * managers. Returns the number of cursors that were timed out. - */ - static std::size_t timeoutCursorsGlobal(OperationContext* opCtx, Date_t now); - -private: - static constexpr int kNumPartitions = 16; - friend class ClientCursorPin; - - struct PlanExecutorPartitioner { - std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions); - }; - CursorId allocateCursorId_inlock(); - - ClientCursorPin _registerCursor( - OperationContext* opCtx, std::unique_ptr clientCursor); - - void deregisterCursor(ClientCursor* cc); - - void unpin(OperationContext* opCtx, ClientCursor* cursor); - - bool cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now); - - bool isGlobalManager() const { - return _nss.isEmpty(); - } - - // No locks are needed to consult these data members. - const NamespaceString _nss; - const uint32_t _collectionCacheRuntimeId; - - // A CursorManager holds a pointer to all open PlanExecutors and all open ClientCursors. All - // pointers to PlanExecutors are unowned, and a PlanExecutor will notify the CursorManager when - // it is being destroyed. ClientCursors are owned by the CursorManager, except when they are in - // use by a ClientCursorPin. When in use by a pin, an unowned pointer remains to ensure they - // still receive invalidations while in use. - // - // There are several mutexes at work to protect concurrent access to data structures managed by - // this cursor manager. The two registration data structures '_registeredPlanExecutors' and - // '_cursorMap' are partitioned to decrease contention, and each partition of the structure is - // protected by its own mutex. Separately, there is a '_registrationLock' which protects - // concurrent access to '_random' for cursor id generation, and must be held from cursor id - // generation until insertion into '_cursorMap'. If you ever need to acquire more than one of - // these mutexes at once, you must follow the following rules: - // - '_registrationLock' must be acquired first, if at all. - // - Mutex(es) for '_registeredPlanExecutors' must be acquired next. - // - Mutex(es) for '_cursorMap' must be acquired next. - // - If you need to access multiple partitions within '_registeredPlanExecutors' or '_cursorMap' - // at once, you must acquire the mutexes for those partitions in ascending order, or use the - // partition helpers to acquire mutexes for all partitions. - mutable SimpleMutex _registrationLock; - std::unique_ptr _random; - Partitioned, kNumPartitions, PlanExecutorPartitioner> - _registeredPlanExecutors; - std::unique_ptr, kNumPartitions>> _cursorMap; -}; -} // namespace mongo diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index b181d8d4d6c..b6a686ba717 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -259,6 +259,7 @@ ClientCursor* ClientCursorPin::getCursor() const { return _cursor; } +namespace { // // ClientCursorMonitor // @@ -287,7 +288,6 @@ public: } }; -namespace { // Only one instance of the ClientCursorMonitor exists ClientCursorMonitor clientCursorMonitor; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index d14f77b249b..e9c46da135a 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -36,11 +36,11 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/curop.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/cursor_response.h" diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp index 394039ab124..7e2aaa91cbc 100644 --- a/src/mongo/db/commands/killcursors_cmd.cpp +++ b/src/mongo/db/commands/killcursors_cmd.cpp @@ -30,9 +30,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/commands/killcursors_common.h" #include "mongo/db/curop.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/query/killcursors_request.h" #include "mongo/db/stats/top.h" diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 13bb793ff8b..bbf9e37e18d 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -37,12 +37,12 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_catalog_entry.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/list_collections_filter.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/working_set.h" diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 0e639d07108..3d2b5577388 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -31,13 +31,13 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/catalog/database.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/working_set.h" diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp new file mode 100644 index 00000000000..f1bdd8a9442 --- /dev/null +++ b/src/mongo/db/cursor_manager.cpp @@ -0,0 +1,573 @@ +/** +* Copyright (C) 2013 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/db/cursor_manager.h" + +#include "mongo/base/data_cursor.h" +#include "mongo/base/init.h" +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/background.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/client.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/plan_executor.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/random.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/exit.h" +#include "mongo/util/startup_test.h" + +namespace mongo { +using std::vector; + +constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes; + +MONGO_EXPORT_SERVER_PARAMETER( + cursorTimeoutMillis, + int, + durationCount(CursorManager::kDefaultCursorTimeoutMinutes)); + +constexpr int CursorManager::kNumPartitions; + +namespace { +uint32_t idFromCursorId(CursorId id) { + uint64_t x = static_cast(id); + x = x >> 32; + return static_cast(x); +} + +CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) { + // The leading two bits of a non-global CursorId should be 0. + invariant((collectionIdentifier & (0b11 << 30)) == 0); + CursorId x = static_cast(collectionIdentifier) << 32; + x |= cursor; + return x; +} + +class GlobalCursorIdCache { +public: + GlobalCursorIdCache(); + ~GlobalCursorIdCache(); + + /** + * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a + * new CursorManager. + */ + uint32_t registerCursorManager(const NamespaceString& nss); + + /** + * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by + * registerCursorManager(). + */ + void deregisterCursorManager(uint32_t id, const NamespaceString& nss); + + /** + * works globally + */ + bool eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth); + + void appendStats(BSONObjBuilder& builder); + + std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); + + int64_t nextSeed(); + +private: + SimpleMutex _mutex; + + typedef unordered_map Map; + Map _idToNss; + unsigned _nextId; + + std::unique_ptr _secureRandom; +}; + +// Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter +// calls into the former during destruction. +std::unique_ptr globalCursorIdCache; +std::unique_ptr globalCursorManager; + +MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { + globalCursorIdCache.reset(new GlobalCursorIdCache()); + return Status::OK(); +} + +MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) +(InitializerContext* context) { + globalCursorManager.reset(new CursorManager({})); + return Status::OK(); +} + +GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {} + +GlobalCursorIdCache::~GlobalCursorIdCache() {} + +int64_t GlobalCursorIdCache::nextSeed() { + stdx::lock_guard lk(_mutex); + if (!_secureRandom) + _secureRandom.reset(SecureRandom::create()); + return _secureRandom->nextInt64(); +} + +uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) { + static const uint32_t kMaxIds = 1000 * 1000 * 1000; + static_assert((kMaxIds & (0b11 << 30)) == 0, + "the first two bits of a collection identifier must always be zeroes"); + + stdx::lock_guard lk(_mutex); + + fassert(17359, _idToNss.size() < kMaxIds); + + for (uint32_t i = 0; i <= kMaxIds; i++) { + uint32_t id = ++_nextId; + if (id == 0) + continue; + if (_idToNss.count(id) > 0) + continue; + _idToNss[id] = nss; + return id; + } + + MONGO_UNREACHABLE; +} + +void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) { + stdx::lock_guard lk(_mutex); + invariant(nss == _idToNss[id]); + _idToNss.erase(id); +} + +bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) { + // Figure out what the namespace of this cursor is. + NamespaceString nss; + if (CursorManager::isGloballyManagedCursor(id)) { + auto pin = globalCursorManager->pinCursor(opCtx, id); + if (!pin.isOK()) { + invariant(pin == ErrorCodes::CursorNotFound); + // No such cursor. TODO: Consider writing to audit log here (even though we don't + // have a namespace). + return false; + } + nss = pin.getValue().getCursor()->nss(); + } else { + stdx::lock_guard lk(_mutex); + uint32_t nsid = idFromCursorId(id); + Map::const_iterator it = _idToNss.find(nsid); + if (it == _idToNss.end()) { + // No namespace corresponding to this cursor id prefix. TODO: Consider writing to + // audit log here (even though we don't have a namespace). + return false; + } + nss = it->second; + } + invariant(nss.isValid()); + + // Check if we are authorized to erase this cursor. + if (checkAuth) { + AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient()); + Status authorizationStatus = as->checkAuthForKillCursors(nss, id); + if (!authorizationStatus.isOK()) { + audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, ErrorCodes::Unauthorized); + return false; + } + } + + // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us. + if (CursorManager::isGloballyManagedCursor(id)) { + Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth); + massert(28697, + eraseStatus.reason(), + eraseStatus.code() == ErrorCodes::OK || + eraseStatus.code() == ErrorCodes::CursorNotFound); + return eraseStatus.isOK(); + } + + // If not, then the cursor must be owned by a collection. Erase the cursor under the + // collection lock (to prevent the collection from going away during the erase). + AutoGetCollectionForReadCommand ctx(opCtx, nss); + Collection* collection = ctx.getCollection(); + if (!collection) { + if (checkAuth) + audit::logKillCursorsAuthzCheck( + opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound); + return false; + } + + Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth); + uassert(16089, + eraseStatus.reason(), + eraseStatus.code() == ErrorCodes::OK || + eraseStatus.code() == ErrorCodes::CursorNotFound); + return eraseStatus.isOK(); +} + +std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) { + size_t totalTimedOut = 0; + + // Time out the cursors from the global cursor manager. + totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now); + + // Compute the set of collection names that we have to time out cursors for. + vector todo; + { + stdx::lock_guard lk(_mutex); + for (auto&& entry : _idToNss) { + todo.push_back(entry.second); + } + } + + // For each collection, time out its cursors under the collection lock (to prevent the + // collection from going away during the erase). + for (unsigned i = 0; i < todo.size(); i++) { + AutoGetCollectionOrViewForReadCommand ctx(opCtx, NamespaceString(todo[i])); + if (!ctx.getDb()) { + continue; + } + + Collection* collection = ctx.getCollection(); + if (collection == NULL) { + continue; + } + + totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now); + } + + return totalTimedOut; +} +} // namespace + +// --- + +CursorManager* CursorManager::getGlobalCursorManager() { + return globalCursorManager.get(); +} + +std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { + return globalCursorIdCache->timeoutCursors(opCtx, now); +} + +int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) { + ConstDataCursor ids(_ids); + int numDeleted = 0; + for (int i = 0; i < n; i++) { + if (eraseCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance>())) + numDeleted++; + if (globalInShutdownDeprecated()) + break; + } + return numDeleted; +} +bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) { + return globalCursorIdCache->eraseCursor(opCtx, id, true); +} +bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) { + return globalCursorIdCache->eraseCursor(opCtx, id, false); +} + + +// -------------------------- + +std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec, + const std::size_t nPartitions) { + auto token = exec->getRegistrationToken(); + invariant(token); + return (*token) % nPartitions; +} + +CursorManager::CursorManager(NamespaceString nss) + : _nss(std::move(nss)), + _collectionCacheRuntimeId(_nss.isEmpty() ? 0 + : globalCursorIdCache->registerCursorManager(_nss)), + _random(stdx::make_unique(globalCursorIdCache->nextSeed())), + _registeredPlanExecutors(), + _cursorMap(stdx::make_unique>>()) {} + +CursorManager::~CursorManager() { + // All cursors and PlanExecutors should have been deleted already. + invariant(_registeredPlanExecutors.empty()); + invariant(_cursorMap->empty()); + + if (!isGlobalManager()) { + globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss); + } +} + +void CursorManager::invalidateAll(OperationContext* opCtx, + bool collectionGoingAway, + const std::string& reason) { + invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors. + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); + fassert(28819, !BackgroundOperation::inProgForNs(_nss)); + auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); + for (auto&& partition : allExecPartitions) { + for (auto&& exec : partition) { + // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be + // cleaned up later. + exec->markAsKilled(reason); + } + } + allExecPartitions.clear(); + + // Mark all cursors as killed, but keep around those we can in order to provide a useful error + // message to the user when they attempt to use it next time. + auto allCurrentPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allCurrentPartitions) { + for (auto it = partition.begin(); it != partition.end();) { + auto* cursor = it->second; + cursor->markAsKilled(reason); + + // If pinned, there is an active user of this cursor, who is now responsible for + // cleaning it up. Otherwise, we can immediately dispose of it. + if (cursor->_isPinned) { + it = partition.erase(it); + continue; + } + + if (!collectionGoingAway) { + // We keep around unpinned cursors so that future attempts to use the cursor will + // result in a useful error message. + ++it; + } else { + cursor->dispose(opCtx); + delete cursor; + it = partition.erase(it); + } + } + } +} + +void CursorManager::invalidateDocument(OperationContext* opCtx, + const RecordId& dl, + InvalidationType type) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations. + if (supportsDocLocking()) { + // If a storage engine supports doc locking, then we do not need to invalidate. + // The transactional boundaries of the operation protect us. + return; + } + + auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions(); + for (auto&& partition : allExecPartitions) { + for (auto&& exec : partition) { + exec->invalidate(opCtx, dl, type); + } + } + + auto allPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allPartitions) { + for (auto&& entry : partition) { + auto exec = entry.second->getExecutor(); + exec->invalidate(opCtx, dl, type); + } + } +} + +bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) { + if (cursor->isNoTimeout() || cursor->_isPinned) { + return false; + } + return (now - cursor->_lastUseDate) >= Milliseconds(cursorTimeoutMillis.load()); +} + +std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { + std::vector> toDelete; + + for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) { + auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId); + for (auto it = lockedPartition->begin(); it != lockedPartition->end();) { + auto* cursor = it->second; + if (cursorShouldTimeout_inlock(cursor, now)) { + // Dispose of the cursor and remove it from the partition. + cursor->dispose(opCtx); + toDelete.push_back(std::unique_ptr{cursor}); + it = lockedPartition->erase(it); + } else { + ++it; + } + } + } + + return toDelete.size(); +} + +namespace { +static AtomicUInt32 registeredPlanExecutorId; +} // namespace + +Partitioned>::PartitionId CursorManager::registerExecutor( + PlanExecutor* exec) { + auto partitionId = registeredPlanExecutorId.fetchAndAdd(1); + exec->setRegistrationToken(partitionId); + _registeredPlanExecutors.insert(exec); + return partitionId; +} + +void CursorManager::deregisterExecutor(PlanExecutor* exec) { + if (auto partitionId = exec->getRegistrationToken()) { + _registeredPlanExecutors.erase(exec); + } +} + +StatusWith CursorManager::pinCursor(OperationContext* opCtx, CursorId id) { + auto lockedPartition = _cursorMap->lockOnePartition(id); + auto it = lockedPartition->find(id); + if (it == lockedPartition->end()) { + return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"}; + } + + ClientCursor* cursor = it->second; + uassert(12051, str::stream() << "cursor id " << id << " is already in use", !cursor->_isPinned); + if (cursor->getExecutor()->isMarkedAsKilled()) { + // This cursor was killed while it was idle. + Status error{ErrorCodes::QueryPlanKilled, + str::stream() << "cursor killed because: " + << cursor->getExecutor()->getKillReason()}; + lockedPartition->erase(cursor->cursorid()); + cursor->dispose(opCtx); + delete cursor; + return error; + } + cursor->_isPinned = true; + return ClientCursorPin(opCtx, cursor); +} + +void CursorManager::unpin(OperationContext* opCtx, ClientCursor* cursor) { + // Avoid computing the current time within the critical section. + auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); + + auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid()); + invariant(cursor->_isPinned); + cursor->_isPinned = false; + cursor->_lastUseDate = now; +} + +void CursorManager::getCursorIds(std::set* openCursors) const { + auto allPartitions = _cursorMap->lockAllPartitions(); + for (auto&& partition : allPartitions) { + for (auto&& entry : partition) { + openCursors->insert(entry.first); + } + } +} + +size_t CursorManager::numCursors() const { + return _cursorMap->size(); +} + +CursorId CursorManager::allocateCursorId_inlock() { + for (int i = 0; i < 10000; i++) { + // The leading two bits of a CursorId are used to determine if the cursor is registered on + // the global cursor manager. + CursorId id; + if (isGlobalManager()) { + // This is the global cursor manager, so generate a random number and make sure the + // first two bits are 01. + uint64_t mask = 0x3FFFFFFFFFFFFFFF; + uint64_t bitToSet = 1ULL << 62; + id = ((_random->nextInt64() & mask) | bitToSet); + } else { + // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32 + // bits are random. + uint32_t myPart = static_cast(_random->nextInt32()); + id = cursorIdFromParts(_collectionCacheRuntimeId, myPart); + } + auto partition = _cursorMap->lockOnePartition(id); + if (partition->count(id) == 0) + return id; + } + fassertFailed(17360); +} + +ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx, + ClientCursorParams&& cursorParams) { + // Avoid computing the current time within the critical section. + auto now = opCtx->getServiceContext()->getPreciseClockSource()->now(); + + // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping + // it. + invariant(cursorParams.exec); + deregisterExecutor(cursorParams.exec.get()); + cursorParams.exec.get_deleter().dismissDisposal(); + cursorParams.exec->unsetRegistered(); + + // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure + // we don't insert two cursors with the same cursor id. + stdx::lock_guard lock(_registrationLock); + CursorId cursorId = allocateCursorId_inlock(); + std::unique_ptr clientCursor( + new ClientCursor(std::move(cursorParams), this, cursorId, now)); + + // Transfer ownership of the cursor to '_cursorMap'. + auto partition = _cursorMap->lockOnePartition(cursorId); + ClientCursor* unownedCursor = clientCursor.release(); + partition->emplace(cursorId, unownedCursor); + return ClientCursorPin(opCtx, unownedCursor); +} + +void CursorManager::deregisterCursor(ClientCursor* cc) { + _cursorMap->erase(cc->cursorid()); +} + +Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) { + auto lockedPartition = _cursorMap->lockOnePartition(id); + auto it = lockedPartition->find(id); + if (it == lockedPartition->end()) { + if (shouldAudit) { + audit::logKillCursorsAuthzCheck( + opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound); + } + return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id}; + } + auto cursor = it->second; + + if (cursor->_isPinned) { + if (shouldAudit) { + audit::logKillCursorsAuthzCheck( + opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed); + } + return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id}; + } + std::unique_ptr ownedCursor(cursor); + + if (shouldAudit) { + audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK); + } + + lockedPartition->erase(ownedCursor->cursorid()); + ownedCursor->dispose(opCtx); + return Status::OK(); +} + +} // namespace mongo diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h new file mode 100644 index 00000000000..40408c48830 --- /dev/null +++ b/src/mongo/db/cursor_manager.h @@ -0,0 +1,243 @@ +/** + * Copyright (C) 2013 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/catalog/util/partitioned.h" +#include "mongo/db/clientcursor.h" +#include "mongo/db/cursor_id.h" +#include "mongo/db/invalidation_type.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/record_id.h" +#include "mongo/platform/unordered_map.h" +#include "mongo/platform/unordered_set.h" +#include "mongo/util/concurrency/mutex.h" +#include "mongo/util/duration.h" + +namespace mongo { + +class OperationContext; +class PseudoRandom; +class PlanExecutor; + +/** + * A container which owns ClientCursor objects. This class is used to create, access, and delete + * ClientCursors. It is also responsible for allocating the cursor ids that are passed back to + * clients. + * + * In addition to managing the lifetime of ClientCursors, the CursorManager is responsible for + * notifying yielded queries of write operations and collection drops. For this reason, query + * PlanExecutor objects which are not contained within a ClientCursor are also registered with the + * CursorManager. Query executors must be registered with the CursorManager, either as a bare + * PlanExecutor or inside a ClientCursor (but cannot be registered in both ways). + * + * There is a CursorManager per-collection and a global CursorManager. The global CursorManager owns + * cursors whose lifetime is not tied to that of the collection and which do not need to receive + * notifications about writes for a particular collection. In contrast, cursors owned by a + * collection's CursorManager, unless pinned, are destroyed when the collection is destroyed. Such + * cursors receive notifications about writes to the collection. + * + * Callers must hold the collection lock in at least MODE_IS in order to access a collection's + * CursorManager, which guards against the CursorManager being concurrently deleted due to a + * catalog-level operation such as a collection drop. No locks are required to access the global + * cursor manager. + * + * The CursorManager is internally synchronized; operations on a given collection may call methods + * concurrently on that collection's CursorManager. + * + * See clientcursor.h for more information. + */ +class CursorManager { +public: + // The number of minutes a cursor is allowed to be idle before timing out. + static constexpr Minutes kDefaultCursorTimeoutMinutes{10}; + using RegistrationToken = Partitioned>::PartitionId; + + CursorManager(NamespaceString nss); + + /** + * Destroys the CursorManager. All cursors and PlanExecutors must be cleaned up via + * invalidateAll() before destruction. + */ + ~CursorManager(); + + /** + * Kills all managed query executors and ClientCursors. Callers must have exclusive access to + * the collection (i.e. must have the collection, databse, or global resource locked in MODE_X). + * + * 'collectionGoingAway' indicates whether the Collection instance is being deleted. This could + * be because the db is being closed, or the collection/db is being dropped. + * + * The 'reason' is the motivation for invalidating all cursors. This will be used for error + * reporting and logging when an operation finds that the cursor it was operating on has been + * killed. + */ + void invalidateAll(OperationContext* opCtx, + bool collectionGoingAway, + const std::string& reason); + + /** + * Broadcast a document invalidation to all relevant PlanExecutor(s). invalidateDocument + * must called *before* the provided RecordId is about to be deleted or mutated. + */ + void invalidateDocument(OperationContext* opCtx, const RecordId& dl, InvalidationType type); + + /** + * Destroys cursors that have been inactive for too long. + * + * Returns the number of cursors that were timed out. + */ + std::size_t timeoutCursors(OperationContext* opCtx, Date_t now); + + /** + * Register an executor so that it can be notified of deletions, invalidations, collection + * drops, or the like during yields. Must be called before an executor yields. Registration + * happens automatically for yielding PlanExecutors, so this should only be called by a + * PlanExecutor itself. Returns a token that must be stored for use during deregistration. + */ + Partitioned>::PartitionId registerExecutor(PlanExecutor* exec); + + /** + * Remove an executor from the registry. It is legal to call this even if 'exec' is not + * registered. + */ + void deregisterExecutor(PlanExecutor* exec); + + /** + * Constructs a new ClientCursor according to the given 'cursorParams'. The cursor is atomically + * registered with the manager and returned in pinned state. + */ + ClientCursorPin registerCursor(OperationContext* opCtx, ClientCursorParams&& cursorParams); + + /** + * Pins and returns the cursor with the given id. + * + * Returns ErrorCodes::CursorNotFound if the cursor does not exist or + * ErrorCodes::QueryPlanKilled if the cursor was killed in between uses. + * + * Throws a UserException if the cursor is already pinned. Callers need not specially handle + * this error, as it should only happen if a misbehaving client attempts to simultaneously issue + * two operations against the same cursor id. + */ + StatusWith pinCursor(OperationContext* opCtx, CursorId id); + + /** + * Returns an OK status if the cursor was successfully erased. + * + * Returns ErrorCodes::CursorNotFound if the cursor id is not owned by this manager. Returns + * ErrorCodes::OperationFailed if attempting to erase a pinned cursor. + * + * If 'shouldAudit' is true, will perform audit logging. + */ + Status eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit); + + void getCursorIds(std::set* openCursors) const; + + /** + * Returns the number of ClientCursors currently registered. Excludes any registered bare + * PlanExecutors. + */ + std::size_t numCursors() const; + + static CursorManager* getGlobalCursorManager(); + + /** + * Returns true if this CursorId would be registered with the global CursorManager. Note that if + * this method returns true it does not imply the cursor exists. + */ + static bool isGloballyManagedCursor(CursorId cursorId) { + // The first two bits are 01 for globally managed cursors, and 00 for cursors owned by a + // collection. The leading bit is always 0 so that CursorIds do not appear as negative. + const long long mask = static_cast(0b11) << 62; + return (cursorId & mask) == (static_cast(0b01) << 62); + } + + static int eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* ids); + + static bool eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id); + + static bool eraseCursorGlobal(OperationContext* opCtx, CursorId id); + + /** + * Deletes inactive cursors from the global cursor manager and from all per-collection cursor + * managers. Returns the number of cursors that were timed out. + */ + static std::size_t timeoutCursorsGlobal(OperationContext* opCtx, Date_t now); + +private: + static constexpr int kNumPartitions = 16; + friend class ClientCursorPin; + + struct PlanExecutorPartitioner { + std::size_t operator()(const PlanExecutor* exec, std::size_t nPartitions); + }; + CursorId allocateCursorId_inlock(); + + ClientCursorPin _registerCursor( + OperationContext* opCtx, std::unique_ptr clientCursor); + + void deregisterCursor(ClientCursor* cc); + + void unpin(OperationContext* opCtx, ClientCursor* cursor); + + bool cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now); + + bool isGlobalManager() const { + return _nss.isEmpty(); + } + + // No locks are needed to consult these data members. + const NamespaceString _nss; + const uint32_t _collectionCacheRuntimeId; + + // A CursorManager holds a pointer to all open PlanExecutors and all open ClientCursors. All + // pointers to PlanExecutors are unowned, and a PlanExecutor will notify the CursorManager when + // it is being destroyed. ClientCursors are owned by the CursorManager, except when they are in + // use by a ClientCursorPin. When in use by a pin, an unowned pointer remains to ensure they + // still receive invalidations while in use. + // + // There are several mutexes at work to protect concurrent access to data structures managed by + // this cursor manager. The two registration data structures '_registeredPlanExecutors' and + // '_cursorMap' are partitioned to decrease contention, and each partition of the structure is + // protected by its own mutex. Separately, there is a '_registrationLock' which protects + // concurrent access to '_random' for cursor id generation, and must be held from cursor id + // generation until insertion into '_cursorMap'. If you ever need to acquire more than one of + // these mutexes at once, you must follow the following rules: + // - '_registrationLock' must be acquired first, if at all. + // - Mutex(es) for '_registeredPlanExecutors' must be acquired next. + // - Mutex(es) for '_cursorMap' must be acquired next. + // - If you need to access multiple partitions within '_registeredPlanExecutors' or '_cursorMap' + // at once, you must acquire the mutexes for those partitions in ascending order, or use the + // partition helpers to acquire mutexes for all partitions. + mutable SimpleMutex _registrationLock; + std::unique_ptr _random; + Partitioned, kNumPartitions, PlanExecutorPartitioner> + _registeredPlanExecutors; + std::unique_ptr, kNumPartitions>> _cursorMap; +}; +} // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/SConscript b/src/mongo/db/storage/mmap_v1/SConscript index dfea5c764da..fd2b47b426f 100644 --- a/src/mongo/db/storage/mmap_v1/SConscript +++ b/src/mongo/db/storage/mmap_v1/SConscript @@ -57,20 +57,19 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/catalog/index_catalog', '$BUILD_DIR/mongo/db/catalog/index_create', + '$BUILD_DIR/mongo/db/clientcursor', '$BUILD_DIR/mongo/db/commands', + '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/diag_log', '$BUILD_DIR/mongo/db/index_names', '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/db/storage/journal_listener', + '$BUILD_DIR/mongo/db/storage/kv/kv_prefix', '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', '$BUILD_DIR/mongo/db/index/index_access_methods', - #'$BUILD_DIR/mongo/db/catalog/catalog', # CYCLE (CursorManager & applyUpdateOperators) - ], - LIBDEPS_TAGS=[ - # TODO(ADAM, 2017-04-25): See `CYCLE` tags above - 'illegal_cyclic_or_unresolved_dependencies_whitelisted', + '$BUILD_DIR/mongo/db/ops/write_ops', ], ) diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index dc26a74b2e0..07ec00df8fc 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -28,9 +28,9 @@ #include "mongo/platform/basic.h" -#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" +#include "mongo/db/cursor_manager.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" -- cgit v1.2.1