diff options
Diffstat (limited to 'src/mongo/db/catalog/cursor_manager.cpp')
-rw-r--r-- | src/mongo/db/catalog/cursor_manager.cpp | 785 |
1 files changed, 375 insertions, 410 deletions
diff --git a/src/mongo/db/catalog/cursor_manager.cpp b/src/mongo/db/catalog/cursor_manager.cpp index b5bf0f49f73..97426228ca9 100644 --- a/src/mongo/db/catalog/cursor_manager.cpp +++ b/src/mongo/db/catalog/cursor_manager.cpp @@ -48,514 +48,479 @@ namespace mongo { - using std::string; - using std::vector; - - namespace { - unsigned idFromCursorId( CursorId id ) { - uint64_t x = static_cast<uint64_t>(id); - x = x >> 32; - return static_cast<unsigned>( x ); - } - - CursorId cursorIdFromParts( unsigned collection, - unsigned cursor ) { - CursorId x = static_cast<CursorId>( collection ) << 32; - x |= cursor; - return x; - } - - class IdWorkTest : public StartupTest { - public: - void _run( unsigned a, unsigned b) { - CursorId x = cursorIdFromParts( a, b ); - invariant( a == idFromCursorId( x ) ); - CursorId y = cursorIdFromParts( a, b + 1 ); - invariant( x != y ); - } +using std::string; +using std::vector; + +namespace { +unsigned idFromCursorId(CursorId id) { + uint64_t x = static_cast<uint64_t>(id); + x = x >> 32; + return static_cast<unsigned>(x); +} - void run() { - _run( 123, 456 ); - _run( 0xdeadbeef, 0xcafecafe ); - _run( 0, 0 ); - _run( 99999999, 999 ); - _run( 0xFFFFFFFF, 1 ); - _run( 0xFFFFFFFF, 0 ); - _run( 0xFFFFFFFF, 0xFFFFFFFF ); - } - } idWorkTest; - } +CursorId cursorIdFromParts(unsigned collection, unsigned cursor) { + CursorId x = static_cast<CursorId>(collection) << 32; + x |= cursor; + return x; +} - class GlobalCursorIdCache { - public: +class IdWorkTest : public StartupTest { +public: + void _run(unsigned a, unsigned b) { + CursorId x = cursorIdFromParts(a, b); + invariant(a == idFromCursorId(x)); + CursorId y = cursorIdFromParts(a, b + 1); + invariant(x != y); + } + + void run() { + _run(123, 456); + _run(0xdeadbeef, 0xcafecafe); + _run(0, 0); + _run(99999999, 999); + _run(0xFFFFFFFF, 1); + _run(0xFFFFFFFF, 0); + _run(0xFFFFFFFF, 0xFFFFFFFF); + } +} idWorkTest; +} - GlobalCursorIdCache(); - ~GlobalCursorIdCache(); +class GlobalCursorIdCache { +public: + GlobalCursorIdCache(); + ~GlobalCursorIdCache(); - /** - * this gets called when a CursorManager gets created - * @return the id the CursorManager should use when generating - * cursor ids - */ - unsigned created( const std::string& ns ); + /** + * this gets called when a CursorManager gets created + * @return the id the CursorManager should use when generating + * cursor ids + */ + unsigned created(const std::string& ns); - /** - * called by CursorManager when its going away - */ - void destroyed( unsigned id, const std::string& ns ); + /** + * called by CursorManager when its going away + */ + void destroyed(unsigned id, const std::string& ns); - /** - * works globally - */ - bool eraseCursor(OperationContext* txn, CursorId id, bool checkAuth); + /** + * works globally + */ + bool eraseCursor(OperationContext* txn, CursorId id, bool checkAuth); - void appendStats( BSONObjBuilder& builder ); + void appendStats(BSONObjBuilder& builder); - std::size_t timeoutCursors(OperationContext* txn, int millisSinceLastCall); + std::size_t timeoutCursors(OperationContext* txn, int millisSinceLastCall); - int64_t nextSeed(); - private: - SimpleMutex _mutex; + int64_t nextSeed(); - typedef unordered_map<unsigned,string> Map; - Map _idToNS; - unsigned _nextId; +private: + SimpleMutex _mutex; - std::unique_ptr<SecureRandom> _secureRandom; - }; + typedef unordered_map<unsigned, string> Map; + Map _idToNS; + unsigned _nextId; - // Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter - // calls into the former during destruction. - std::unique_ptr<GlobalCursorIdCache> globalCursorIdCache; - std::unique_ptr<CursorManager> globalCursorManager; + std::unique_ptr<SecureRandom> _secureRandom; +}; - MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { - globalCursorIdCache.reset(new GlobalCursorIdCache()); - return Status::OK(); - } +// Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter +// calls into the former during destruction. +std::unique_ptr<GlobalCursorIdCache> globalCursorIdCache; +std::unique_ptr<CursorManager> globalCursorManager; - MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) - (InitializerContext* context) { - globalCursorManager.reset(new CursorManager("")); - return Status::OK(); - } +MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) { + globalCursorIdCache.reset(new GlobalCursorIdCache()); + return Status::OK(); +} - GlobalCursorIdCache::GlobalCursorIdCache() - : _nextId( 0 ), - _secureRandom() { - } +MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache")) +(InitializerContext* context) { + globalCursorManager.reset(new CursorManager("")); + return Status::OK(); +} - GlobalCursorIdCache::~GlobalCursorIdCache() { - } +GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {} - int64_t GlobalCursorIdCache::nextSeed() { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - if ( !_secureRandom ) - _secureRandom.reset(SecureRandom::create()); - return _secureRandom->nextInt64(); - } +GlobalCursorIdCache::~GlobalCursorIdCache() {} - unsigned GlobalCursorIdCache::created( const std::string& ns ) { - static const unsigned MAX_IDS = 1000 * 1000 * 1000; +int64_t GlobalCursorIdCache::nextSeed() { + stdx::lock_guard<SimpleMutex> lk(_mutex); + if (!_secureRandom) + _secureRandom.reset(SecureRandom::create()); + return _secureRandom->nextInt64(); +} - stdx::lock_guard<SimpleMutex> lk( _mutex ); +unsigned GlobalCursorIdCache::created(const std::string& ns) { + static const unsigned MAX_IDS = 1000 * 1000 * 1000; - fassert( 17359, _idToNS.size() < MAX_IDS ); + stdx::lock_guard<SimpleMutex> lk(_mutex); - for ( unsigned i = 0; i <= MAX_IDS; i++ ) { - unsigned id = ++_nextId; - if ( id == 0 ) - continue; - if ( _idToNS.count( id ) > 0 ) - continue; - _idToNS[id] = ns; - return id; - } + fassert(17359, _idToNS.size() < MAX_IDS); - invariant( false ); + for (unsigned i = 0; i <= MAX_IDS; i++) { + unsigned id = ++_nextId; + if (id == 0) + continue; + if (_idToNS.count(id) > 0) + continue; + _idToNS[id] = ns; + return id; } - void GlobalCursorIdCache::destroyed( unsigned id, const std::string& ns ) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - invariant( ns == _idToNS[id] ); - _idToNS.erase( id ); - } + invariant(false); +} - bool GlobalCursorIdCache::eraseCursor(OperationContext* txn, CursorId id, bool checkAuth) { - // Figure out what the namespace of this cursor is. - std::string ns; - if (globalCursorManager->ownsCursorId(id)) { - ClientCursorPin pin(globalCursorManager.get(), id); - if (!pin.c()) { - // No such cursor. TODO: Consider writing to audit log here (even though we don't - // have a namespace). - return false; - } - ns = pin.c()->ns(); - } - else { - stdx::lock_guard<SimpleMutex> lk(_mutex); - unsigned nsid = idFromCursorId(id); - Map::const_iterator it = _idToNS.find(nsid); - if (it == _idToNS.end()) { - // No namespace corresponding to this cursor id prefix. TODO: Consider writing to - // audit log here (even though we don't have a namespace). - return false; - } - ns = it->second; - } - const NamespaceString nss(ns); - invariant(nss.isValid()); - - // Check if we are authorized to erase this cursor. - if (checkAuth) { - AuthorizationSession* as = AuthorizationSession::get(txn->getClient()); - Status authorizationStatus = as->checkAuthForKillCursors(nss, id); - if (!authorizationStatus.isOK()) { - audit::logKillCursorsAuthzCheck(txn->getClient(), - nss, - id, - ErrorCodes::Unauthorized); - return false; - } - } +void GlobalCursorIdCache::destroyed(unsigned id, const std::string& ns) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + invariant(ns == _idToNS[id]); + _idToNS.erase(id); +} - // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us. - if (globalCursorManager->ownsCursorId(id)) { - return globalCursorManager->eraseCursor(txn, id, checkAuth); +bool GlobalCursorIdCache::eraseCursor(OperationContext* txn, CursorId id, bool checkAuth) { + // Figure out what the namespace of this cursor is. + std::string ns; + if (globalCursorManager->ownsCursorId(id)) { + ClientCursorPin pin(globalCursorManager.get(), id); + if (!pin.c()) { + // No such cursor. TODO: Consider writing to audit log here (even though we don't + // have a namespace). + return false; } - - // If not, then the cursor must be owned by a collection. Erase the cursor under the - // collection lock (to prevent the collection from going away during the erase). - AutoGetCollectionForRead ctx(txn, nss); - Collection* collection = ctx.getCollection(); - if (!collection) { - if (checkAuth) - audit::logKillCursorsAuthzCheck(txn->getClient(), - nss, - id, - ErrorCodes::CursorNotFound); + ns = pin.c()->ns(); + } else { + stdx::lock_guard<SimpleMutex> lk(_mutex); + unsigned nsid = idFromCursorId(id); + Map::const_iterator it = _idToNS.find(nsid); + if (it == _idToNS.end()) { + // No namespace corresponding to this cursor id prefix. TODO: Consider writing to + // audit log here (even though we don't have a namespace). return false; } - return collection->getCursorManager()->eraseCursor(txn, id, checkAuth); + ns = it->second; } + const NamespaceString nss(ns); + invariant(nss.isValid()); - std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* txn, int millisSinceLastCall) { - size_t totalTimedOut = 0; - - // Time out the cursors from the global cursor manager. - totalTimedOut += globalCursorManager->timeoutCursors( millisSinceLastCall ); - - // Compute the set of collection names that we have to time out cursors for. - vector<string> todo; - { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - for ( Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i ) { - if (globalCursorManager->ownsCursorId(cursorIdFromParts(i->first, 0))) { - // Skip the global cursor manager, since we handle it above (and it's not - // associated with a collection). - continue; - } - todo.push_back( i->second ); - } + // Check if we are authorized to erase this cursor. + if (checkAuth) { + AuthorizationSession* as = AuthorizationSession::get(txn->getClient()); + Status authorizationStatus = as->checkAuthForKillCursors(nss, id); + if (!authorizationStatus.isOK()) { + audit::logKillCursorsAuthzCheck(txn->getClient(), nss, id, ErrorCodes::Unauthorized); + return false; } + } + + // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us. + if (globalCursorManager->ownsCursorId(id)) { + return globalCursorManager->eraseCursor(txn, id, checkAuth); + } - // For each collection, time out its cursors under the collection lock (to prevent the - // collection from going away during the erase). - for ( unsigned i = 0; i < todo.size(); i++ ) { - const std::string& ns = todo[i]; + // If not, then the cursor must be owned by a collection. Erase the cursor under the + // collection lock (to prevent the collection from going away during the erase). + AutoGetCollectionForRead ctx(txn, nss); + Collection* collection = ctx.getCollection(); + if (!collection) { + if (checkAuth) + audit::logKillCursorsAuthzCheck(txn->getClient(), nss, id, ErrorCodes::CursorNotFound); + return false; + } + return collection->getCursorManager()->eraseCursor(txn, id, checkAuth); +} - AutoGetCollectionForRead ctx(txn, ns); - if (!ctx.getDb()) { - continue; - } +std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* txn, int millisSinceLastCall) { + size_t totalTimedOut = 0; + + // Time out the cursors from the global cursor manager. + totalTimedOut += globalCursorManager->timeoutCursors(millisSinceLastCall); - Collection* collection = ctx.getCollection(); - if ( collection == NULL ) { + // Compute the set of collection names that we have to time out cursors for. + vector<string> todo; + { + stdx::lock_guard<SimpleMutex> lk(_mutex); + for (Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i) { + if (globalCursorManager->ownsCursorId(cursorIdFromParts(i->first, 0))) { + // Skip the global cursor manager, since we handle it above (and it's not + // associated with a collection). continue; } + todo.push_back(i->second); + } + } + + // For each collection, time out its cursors under the collection lock (to prevent the + // collection from going away during the erase). + for (unsigned i = 0; i < todo.size(); i++) { + const std::string& ns = todo[i]; + + AutoGetCollectionForRead ctx(txn, ns); + if (!ctx.getDb()) { + continue; + } - totalTimedOut += collection->getCursorManager()->timeoutCursors( millisSinceLastCall ); + Collection* collection = ctx.getCollection(); + if (collection == NULL) { + continue; } - return totalTimedOut; + totalTimedOut += collection->getCursorManager()->timeoutCursors(millisSinceLastCall); } - // --- + return totalTimedOut; +} - CursorManager* CursorManager::getGlobalCursorManager() { - return globalCursorManager.get(); - } +// --- - std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* txn, - int millisSinceLastCall) { - return globalCursorIdCache->timeoutCursors(txn, millisSinceLastCall); - } +CursorManager* CursorManager::getGlobalCursorManager() { + return globalCursorManager.get(); +} - int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* txn, int n, - const char* _ids) { - ConstDataCursor ids(_ids); - int numDeleted = 0; - for ( int i = 0; i < n; i++ ) { - if ( eraseCursorGlobalIfAuthorized(txn, ids.readAndAdvance<LittleEndian<int64_t>>())) - numDeleted++; - if ( inShutdown() ) - break; - } - return numDeleted; - } - bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* txn, CursorId id) { - return globalCursorIdCache->eraseCursor(txn, id, true); - } - bool CursorManager::eraseCursorGlobal(OperationContext* txn, CursorId id) { - return globalCursorIdCache->eraseCursor(txn, id, false ); +std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* txn, int millisSinceLastCall) { + return globalCursorIdCache->timeoutCursors(txn, millisSinceLastCall); +} + +int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* txn, int n, const char* _ids) { + ConstDataCursor ids(_ids); + int numDeleted = 0; + for (int i = 0; i < n; i++) { + if (eraseCursorGlobalIfAuthorized(txn, ids.readAndAdvance<LittleEndian<int64_t>>())) + numDeleted++; + if (inShutdown()) + break; } + return numDeleted; +} +bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* txn, CursorId id) { + return globalCursorIdCache->eraseCursor(txn, id, true); +} +bool CursorManager::eraseCursorGlobal(OperationContext* txn, CursorId id) { + return globalCursorIdCache->eraseCursor(txn, id, false); +} - // -------------------------- +// -------------------------- - CursorManager::CursorManager( StringData ns ) - : _nss( ns ) { - _collectionCacheRuntimeId = globalCursorIdCache->created( _nss.ns() ); - _random.reset( new PseudoRandom( globalCursorIdCache->nextSeed() ) ); - } +CursorManager::CursorManager(StringData ns) : _nss(ns) { + _collectionCacheRuntimeId = globalCursorIdCache->created(_nss.ns()); + _random.reset(new PseudoRandom(globalCursorIdCache->nextSeed())); +} - CursorManager::~CursorManager() { - invalidateAll(true, "collection going away"); - globalCursorIdCache->destroyed( _collectionCacheRuntimeId, _nss.ns() ); - } +CursorManager::~CursorManager() { + invalidateAll(true, "collection going away"); + globalCursorIdCache->destroyed(_collectionCacheRuntimeId, _nss.ns()); +} - void CursorManager::invalidateAll(bool collectionGoingAway, - const std::string& reason) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); +void CursorManager::invalidateAll(bool collectionGoingAway, const std::string& reason) { + stdx::lock_guard<SimpleMutex> lk(_mutex); - for ( ExecSet::iterator it = _nonCachedExecutors.begin(); - it != _nonCachedExecutors.end(); - ++it ) { + for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); + ++it) { + // we kill the executor, but it deletes itself + PlanExecutor* exec = *it; + exec->kill(reason); + invariant(exec->collection() == NULL); + } + _nonCachedExecutors.clear(); - // we kill the executor, but it deletes itself - PlanExecutor* exec = *it; - exec->kill(reason); - invariant( exec->collection() == NULL ); - } - _nonCachedExecutors.clear(); + if (collectionGoingAway) { + // we're going to wipe out the world + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + ClientCursor* cc = i->second; - if ( collectionGoingAway ) { - // we're going to wipe out the world - for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - ClientCursor* cc = i->second; + cc->kill(); - cc->kill(); + invariant(cc->getExecutor() == NULL || cc->getExecutor()->collection() == NULL); - invariant( cc->getExecutor() == NULL || cc->getExecutor()->collection() == NULL ); - - // If the CC is pinned, somebody is actively using it and we do not delete it. - // Instead we notify the holder that we killed it. The holder will then delete the - // CC. - // - // If the CC is not pinned, there is nobody actively holding it. We can safely - // delete it. - if (!cc->isPinned()) { - delete cc; - } + // If the CC is pinned, somebody is actively using it and we do not delete it. + // Instead we notify the holder that we killed it. The holder will then delete the + // CC. + // + // If the CC is not pinned, there is nobody actively holding it. We can safely + // delete it. + if (!cc->isPinned()) { + delete cc; } } - else { - CursorMap newMap; - - // collection will still be around, just all PlanExecutors are invalid - for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - ClientCursor* cc = i->second; - - // Note that a valid ClientCursor state is "no cursor no executor." This is because - // the set of active cursor IDs in ClientCursor is used as representation of query - // state. See sharding_block.h. TODO(greg,hk): Move this out. - if (NULL == cc->getExecutor() ) { - newMap.insert( *i ); - continue; - } - - if (cc->isPinned() || cc->isAggCursor()) { - // Pinned cursors need to stay alive, so we leave them around. Aggregation - // cursors also can stay alive (since they don't have their lifetime bound to - // the underlying collection). However, if they have an associated executor, we - // need to kill it, because it's now invalid. - if ( cc->getExecutor() ) - cc->getExecutor()->kill(reason); - newMap.insert( *i ); - } - else { - cc->kill(); - delete cc; - } + } else { + CursorMap newMap; + + // collection will still be around, just all PlanExecutors are invalid + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + ClientCursor* cc = i->second; + // Note that a valid ClientCursor state is "no cursor no executor." This is because + // the set of active cursor IDs in ClientCursor is used as representation of query + // state. See sharding_block.h. TODO(greg,hk): Move this out. + if (NULL == cc->getExecutor()) { + newMap.insert(*i); + continue; } - _cursors = newMap; + if (cc->isPinned() || cc->isAggCursor()) { + // Pinned cursors need to stay alive, so we leave them around. Aggregation + // cursors also can stay alive (since they don't have their lifetime bound to + // the underlying collection). However, if they have an associated executor, we + // need to kill it, because it's now invalid. + if (cc->getExecutor()) + cc->getExecutor()->kill(reason); + newMap.insert(*i); + } else { + cc->kill(); + delete cc; + } } + + _cursors = newMap; } +} - void CursorManager::invalidateDocument( OperationContext* txn, - const RecordId& dl, - InvalidationType type ) { - if ( supportsDocLocking() ) { - // If a storage engine supports doc locking, then we do not need to invalidate. - // The transactional boundaries of the operation protect us. - return; - } +void CursorManager::invalidateDocument(OperationContext* txn, + const RecordId& dl, + InvalidationType type) { + if (supportsDocLocking()) { + // If a storage engine supports doc locking, then we do not need to invalidate. + // The transactional boundaries of the operation protect us. + return; + } - stdx::lock_guard<SimpleMutex> lk( _mutex ); + stdx::lock_guard<SimpleMutex> lk(_mutex); - for ( ExecSet::iterator it = _nonCachedExecutors.begin(); - it != _nonCachedExecutors.end(); - ++it ) { + for (ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); + ++it) { + PlanExecutor* exec = *it; + exec->invalidate(txn, dl, type); + } - PlanExecutor* exec = *it; + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + PlanExecutor* exec = i->second->getExecutor(); + if (exec) { exec->invalidate(txn, dl, type); } - - for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - PlanExecutor* exec = i->second->getExecutor(); - if ( exec ) { - exec->invalidate(txn, dl, type); - } - } } +} - std::size_t CursorManager::timeoutCursors( int millisSinceLastCall ) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - - vector<ClientCursor*> toDelete; - - for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - ClientCursor* cc = i->second; - if ( cc->shouldTimeout( millisSinceLastCall ) ) - toDelete.push_back( cc ); - } +std::size_t CursorManager::timeoutCursors(int millisSinceLastCall) { + stdx::lock_guard<SimpleMutex> lk(_mutex); - for ( vector<ClientCursor*>::const_iterator i = toDelete.begin(); - i != toDelete.end(); ++i ) { - ClientCursor* cc = *i; - _deregisterCursor_inlock( cc ); - cc->kill(); - delete cc; - } + vector<ClientCursor*> toDelete; - return toDelete.size(); + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + ClientCursor* cc = i->second; + if (cc->shouldTimeout(millisSinceLastCall)) + toDelete.push_back(cc); } - void CursorManager::registerExecutor( PlanExecutor* exec ) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - const std::pair<ExecSet::iterator, bool> result = _nonCachedExecutors.insert(exec); - invariant(result.second); // make sure this was inserted + for (vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i) { + ClientCursor* cc = *i; + _deregisterCursor_inlock(cc); + cc->kill(); + delete cc; } - void CursorManager::deregisterExecutor( PlanExecutor* exec ) { - stdx::lock_guard<SimpleMutex> lk(_mutex); - _nonCachedExecutors.erase(exec); - } + return toDelete.size(); +} - ClientCursor* CursorManager::find( CursorId id, bool pin ) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - CursorMap::const_iterator it = _cursors.find( id ); - if ( it == _cursors.end() ) - return NULL; - - ClientCursor* cursor = it->second; - if ( pin ) { - uassert( 12051, - "clientcursor already in use? driver problem?", - !cursor->isPinned() ); - cursor->setPinned(); - } +void CursorManager::registerExecutor(PlanExecutor* exec) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + const std::pair<ExecSet::iterator, bool> result = _nonCachedExecutors.insert(exec); + invariant(result.second); // make sure this was inserted +} - return cursor; - } +void CursorManager::deregisterExecutor(PlanExecutor* exec) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + _nonCachedExecutors.erase(exec); +} - void CursorManager::unpin( ClientCursor* cursor ) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); +ClientCursor* CursorManager::find(CursorId id, bool pin) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + CursorMap::const_iterator it = _cursors.find(id); + if (it == _cursors.end()) + return NULL; - invariant( cursor->isPinned() ); - cursor->unsetPinned(); + ClientCursor* cursor = it->second; + if (pin) { + uassert(12051, "clientcursor already in use? driver problem?", !cursor->isPinned()); + cursor->setPinned(); } - bool CursorManager::ownsCursorId( CursorId cursorId ) const { - return _collectionCacheRuntimeId == idFromCursorId( cursorId ); - } + return cursor; +} - void CursorManager::getCursorIds( std::set<CursorId>* openCursors ) const { - stdx::lock_guard<SimpleMutex> lk( _mutex ); +void CursorManager::unpin(ClientCursor* cursor) { + stdx::lock_guard<SimpleMutex> lk(_mutex); - for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - ClientCursor* cc = i->second; - openCursors->insert( cc->cursorid() ); - } - } + invariant(cursor->isPinned()); + cursor->unsetPinned(); +} - size_t CursorManager::numCursors() const { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - return _cursors.size(); - } +bool CursorManager::ownsCursorId(CursorId cursorId) const { + return _collectionCacheRuntimeId == idFromCursorId(cursorId); +} - CursorId CursorManager::_allocateCursorId_inlock() { - for ( int i = 0; i < 10000; i++ ) { - unsigned mypart = static_cast<unsigned>( _random->nextInt32() ); - CursorId id = cursorIdFromParts( _collectionCacheRuntimeId, mypart ); - if ( _cursors.count( id ) == 0 ) - return id; - } - fassertFailed( 17360 ); - } +void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const { + stdx::lock_guard<SimpleMutex> lk(_mutex); - CursorId CursorManager::registerCursor( ClientCursor* cc ) { - invariant( cc ); - stdx::lock_guard<SimpleMutex> lk( _mutex ); - CursorId id = _allocateCursorId_inlock(); - _cursors[id] = cc; - return id; + for (CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + ClientCursor* cc = i->second; + openCursors->insert(cc->cursorid()); } +} + +size_t CursorManager::numCursors() const { + stdx::lock_guard<SimpleMutex> lk(_mutex); + return _cursors.size(); +} - void CursorManager::deregisterCursor( ClientCursor* cc ) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); - _deregisterCursor_inlock( cc ); +CursorId CursorManager::_allocateCursorId_inlock() { + for (int i = 0; i < 10000; i++) { + unsigned mypart = static_cast<unsigned>(_random->nextInt32()); + CursorId id = cursorIdFromParts(_collectionCacheRuntimeId, mypart); + if (_cursors.count(id) == 0) + return id; } + fassertFailed(17360); +} - bool CursorManager::eraseCursor(OperationContext* txn, CursorId id, bool checkAuth) { - stdx::lock_guard<SimpleMutex> lk( _mutex ); +CursorId CursorManager::registerCursor(ClientCursor* cc) { + invariant(cc); + stdx::lock_guard<SimpleMutex> lk(_mutex); + CursorId id = _allocateCursorId_inlock(); + _cursors[id] = cc; + return id; +} - CursorMap::iterator it = _cursors.find( id ); - if ( it == _cursors.end() ) { - if ( checkAuth ) - audit::logKillCursorsAuthzCheck( txn->getClient(), - _nss, - id, - ErrorCodes::CursorNotFound ); - return false; - } +void CursorManager::deregisterCursor(ClientCursor* cc) { + stdx::lock_guard<SimpleMutex> lk(_mutex); + _deregisterCursor_inlock(cc); +} - ClientCursor* cursor = it->second; +bool CursorManager::eraseCursor(OperationContext* txn, CursorId id, bool checkAuth) { + stdx::lock_guard<SimpleMutex> lk(_mutex); - if ( checkAuth ) - audit::logKillCursorsAuthzCheck( txn->getClient(), - _nss, - id, - ErrorCodes::OK ); + CursorMap::iterator it = _cursors.find(id); + if (it == _cursors.end()) { + if (checkAuth) + audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::CursorNotFound); + return false; + } - massert( 16089, - str::stream() << "Cannot kill active cursor " << id, - !cursor->isPinned() ); + ClientCursor* cursor = it->second; - cursor->kill(); - _deregisterCursor_inlock( cursor ); - delete cursor; - return true; - } + if (checkAuth) + audit::logKillCursorsAuthzCheck(txn->getClient(), _nss, id, ErrorCodes::OK); - void CursorManager::_deregisterCursor_inlock( ClientCursor* cc ) { - invariant( cc ); - CursorId id = cc->cursorid(); - _cursors.erase( id ); - } + massert(16089, str::stream() << "Cannot kill active cursor " << id, !cursor->isPinned()); + + cursor->kill(); + _deregisterCursor_inlock(cursor); + delete cursor; + return true; +} +void CursorManager::_deregisterCursor_inlock(ClientCursor* cc) { + invariant(cc); + CursorId id = cc->cursorid(); + _cursors.erase(id); +} } |