// collection_cursor_cache.h /** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/catalog/collection_cursor_cache.h" #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/query/plan_executor.h" #include "mongo/platform/random.h" #include "mongo/util/startup_test.h" namespace mongo { namespace { unsigned idFromCursorId( CursorId id ) { uint64_t x = static_cast(id); x = x >> 32; return static_cast( x ); } CursorId cursorIdFromParts( unsigned collection, unsigned cursor ) { CursorId x = static_cast( collection ) << 32; x |= cursor; return x; } class IdWorkTest : public StartupTest { public: void _run( unsigned a, unsigned b) { CursorId x = cursorIdFromParts( a, b ); invariant( a == idFromCursorId( x ) ); CursorId y = cursorIdFromParts( a, b + 1 ); invariant( x != y ); } void run() { _run( 123, 456 ); _run( 0xdeadbeef, 0xcafecafe ); _run( 0, 0 ); _run( 99999999, 999 ); _run( 0xFFFFFFFF, 1 ); _run( 0xFFFFFFFF, 0 ); _run( 0xFFFFFFFF, 0xFFFFFFFF ); } } idWorkTest; } class GlobalCursorIdCache { public: GlobalCursorIdCache(); ~GlobalCursorIdCache(); /** * this gets called when a CollectionCursorCache gets created * @return the id the CollectionCursorCache should use when generating * cursor ids */ unsigned created( const std::string& ns ); /** * called by CollectionCursorCache when its going away */ void destroyed( unsigned id, const std::string& ns ); /** * works globally */ bool eraseCursor(OperationContext* txn, CursorId id, bool checkAuth); void appendStats( BSONObjBuilder& builder ); std::size_t timeoutCursors(OperationContext* txn, int millisSinceLastCall); int64_t nextSeed(); private: SimpleMutex _mutex; typedef unordered_map Map; Map _idToNS; unsigned _nextId; SecureRandom* _secureRandom; } _globalCursorIdCache; GlobalCursorIdCache::GlobalCursorIdCache() : _mutex( "GlobalCursorIdCache" ), _nextId( 0 ), _secureRandom( NULL ) { } GlobalCursorIdCache::~GlobalCursorIdCache() { // we're just going to leak everything, as it doesn't matter } int64_t GlobalCursorIdCache::nextSeed() { SimpleMutex::scoped_lock lk( _mutex ); if ( !_secureRandom ) _secureRandom = SecureRandom::create(); return _secureRandom->nextInt64(); } unsigned GlobalCursorIdCache::created( const std::string& ns ) { static const unsigned MAX_IDS = 1000 * 1000 * 1000; SimpleMutex::scoped_lock lk( _mutex ); fassert( 17359, _idToNS.size() < MAX_IDS ); for ( unsigned i = 0; i <= MAX_IDS; i++ ) { unsigned id = ++_nextId; if ( id == 0 ) continue; if ( _idToNS.count( id ) > 0 ) continue; _idToNS[id] = ns; return id; } invariant( false ); } void GlobalCursorIdCache::destroyed( unsigned id, const std::string& ns ) { SimpleMutex::scoped_lock lk( _mutex ); invariant( ns == _idToNS[id] ); _idToNS.erase( id ); } bool GlobalCursorIdCache::eraseCursor(OperationContext* txn, CursorId id, bool checkAuth) { string ns; { SimpleMutex::scoped_lock lk( _mutex ); unsigned nsid = idFromCursorId( id ); Map::const_iterator it = _idToNS.find( nsid ); if ( it == _idToNS.end() ) { return false; } ns = it->second; } NamespaceString nss( ns ); if ( checkAuth ) { AuthorizationSession* as = cc().getAuthorizationSession(); bool isAuthorized = as->isAuthorizedForActionsOnNamespace( nss, ActionType::killCursors); if ( !isAuthorized ) { audit::logKillCursorsAuthzCheck( currentClient.get(), nss, id, ErrorCodes::Unauthorized ); return false; } } Lock::DBRead lock(txn->lockState(), ns); Database* db = dbHolder().get(txn, ns); if ( !db ) return false; Client::Context context(txn, ns, db ); Collection* collection = db->getCollection( txn, ns ); if ( !collection ) { if ( checkAuth ) audit::logKillCursorsAuthzCheck( currentClient.get(), nss, id, ErrorCodes::CursorNotFound ); return false; } return collection->cursorCache()->eraseCursor( id, checkAuth ); } std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* txn, int millisSinceLastCall) { vector todo; { SimpleMutex::scoped_lock lk( _mutex ); for ( Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i ) todo.push_back( i->second ); } size_t totalTimedOut = 0; for ( unsigned i = 0; i < todo.size(); i++ ) { const string& ns = todo[i]; Lock::DBRead lock(txn->lockState(), ns); Database* db = dbHolder().get(txn, ns); if ( !db ) continue; Client::Context context(txn, ns, db ); Collection* collection = db->getCollection( txn, ns ); if ( collection == NULL ) { continue; } totalTimedOut += collection->cursorCache()->timeoutCursors( millisSinceLastCall ); } return totalTimedOut; } // --- std::size_t CollectionCursorCache::timeoutCursorsGlobal(OperationContext* txn, int millisSinceLastCall) {; return _globalCursorIdCache.timeoutCursors(txn, millisSinceLastCall); } int CollectionCursorCache::eraseCursorGlobalIfAuthorized(OperationContext* txn, int n, long long* ids) { int numDeleted = 0; for ( int i = 0; i < n; i++ ) { if ( eraseCursorGlobalIfAuthorized(txn, ids[i] ) ) numDeleted++; if ( inShutdown() ) break; } return numDeleted; } bool CollectionCursorCache::eraseCursorGlobalIfAuthorized(OperationContext* txn, CursorId id) { return _globalCursorIdCache.eraseCursor(txn, id, true); } bool CollectionCursorCache::eraseCursorGlobal(OperationContext* txn, CursorId id) { return _globalCursorIdCache.eraseCursor(txn, id, false ); } // -------------------------- CollectionCursorCache::CollectionCursorCache( const StringData& ns ) : _nss( ns ), _mutex( "CollectionCursorCache" ) { _collectionCacheRuntimeId = _globalCursorIdCache.created( _nss.ns() ); _random.reset( new PseudoRandom( _globalCursorIdCache.nextSeed() ) ); } CollectionCursorCache::~CollectionCursorCache() { invalidateAll( true ); _globalCursorIdCache.destroyed( _collectionCacheRuntimeId, _nss.ns() ); } void CollectionCursorCache::invalidateAll( bool collectionGoingAway ) { SimpleMutex::scoped_lock lk( _mutex ); for ( ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); ++it ) { // we kill the executor, but it deletes itself PlanExecutor* exec = *it; exec->kill(); invariant( exec->collection() == NULL ); } _nonCachedExecutors.clear(); if ( collectionGoingAway ) { // we're going to wipe out the world for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { ClientCursor* cc = i->second; cc->kill(); invariant( cc->getExecutor() == NULL || cc->getExecutor()->collection() == NULL ); // If there is a pinValue >= 100, somebody is actively using the CC and we do // not delete it. Instead we notify the holder that we killed it. The holder // will then delete the CC. // pinvalue is <100, so there is nobody actively holding the CC. We can // safely delete it as nobody is holding the CC. if (cc->pinValue() < 100) { delete cc; } } } else { CursorMap newMap; // collection will still be around, just all PlanExecutors are invalid for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { ClientCursor* cc = i->second; if (cc->isAggCursor) { // Aggregation cursors don't have their lifetime bound to the underlying collection. newMap[i->first] = i->second; continue; } // Note that a valid ClientCursor state is "no cursor no executor." This is because // the set of active cursor IDs in ClientCursor is used as representation of query // state. See sharding_block.h. TODO(greg,hk): Move this out. if (NULL == cc->getExecutor() ) { newMap.insert( *i ); continue; } if (cc->pinValue() < 100) { cc->kill(); delete cc; } else { // this is pinned, so still alive, so we leave around // we kill the PlanExecutor to signal if ( cc->getExecutor() ) cc->getExecutor()->kill(); newMap.insert( *i ); } } _cursors = newMap; } } void CollectionCursorCache::invalidateDocument( const DiskLoc& dl, InvalidationType type ) { SimpleMutex::scoped_lock lk( _mutex ); for ( ExecSet::iterator it = _nonCachedExecutors.begin(); it != _nonCachedExecutors.end(); ++it ) { PlanExecutor* exec = *it; exec->invalidate(dl, type); } for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { PlanExecutor* exec = i->second->getExecutor(); if ( exec ) { exec->invalidate(dl, type); } } } std::size_t CollectionCursorCache::timeoutCursors( int millisSinceLastCall ) { SimpleMutex::scoped_lock lk( _mutex ); vector toDelete; for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { ClientCursor* cc = i->second; if ( cc->shouldTimeout( millisSinceLastCall ) ) toDelete.push_back( cc ); } for ( vector::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i ) { ClientCursor* cc = *i; _deregisterCursor_inlock( cc ); cc->kill(); delete cc; } return toDelete.size(); } void CollectionCursorCache::registerExecutor( PlanExecutor* exec ) { if (!useExperimentalDocLocking) { SimpleMutex::scoped_lock lk(_mutex); const std::pair result = _nonCachedExecutors.insert(exec); invariant(result.second); // make sure this was inserted } } void CollectionCursorCache::deregisterExecutor( PlanExecutor* exec ) { if (!useExperimentalDocLocking) { SimpleMutex::scoped_lock lk(_mutex); _nonCachedExecutors.erase(exec); } } ClientCursor* CollectionCursorCache::find( CursorId id, bool pin ) { SimpleMutex::scoped_lock lk( _mutex ); CursorMap::const_iterator it = _cursors.find( id ); if ( it == _cursors.end() ) return NULL; ClientCursor* cursor = it->second; if ( pin ) { uassert( 12051, "clientcursor already in use? driver problem?", cursor->_pinValue < 100 ); cursor->_pinValue += 100; } return cursor; } void CollectionCursorCache::unpin( ClientCursor* cursor ) { SimpleMutex::scoped_lock lk( _mutex ); invariant( cursor->_pinValue >= 100 ); cursor->_pinValue -= 100; } void CollectionCursorCache::getCursorIds( std::set* openCursors ) { SimpleMutex::scoped_lock lk( _mutex ); for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { ClientCursor* cc = i->second; openCursors->insert( cc->cursorid() ); } } size_t CollectionCursorCache::numCursors(){ SimpleMutex::scoped_lock lk( _mutex ); return _cursors.size(); } CursorId CollectionCursorCache::_allocateCursorId_inlock() { for ( int i = 0; i < 10000; i++ ) { unsigned mypart = static_cast( _random->nextInt32() ); CursorId id = cursorIdFromParts( _collectionCacheRuntimeId, mypart ); if ( _cursors.count( id ) == 0 ) return id; } fassertFailed( 17360 ); } CursorId CollectionCursorCache::registerCursor( ClientCursor* cc ) { invariant( cc ); SimpleMutex::scoped_lock lk( _mutex ); CursorId id = _allocateCursorId_inlock(); _cursors[id] = cc; return id; } void CollectionCursorCache::deregisterCursor( ClientCursor* cc ) { SimpleMutex::scoped_lock lk( _mutex ); _deregisterCursor_inlock( cc ); } bool CollectionCursorCache::eraseCursor( CursorId id, bool checkAuth ) { SimpleMutex::scoped_lock lk( _mutex ); CursorMap::iterator it = _cursors.find( id ); if ( it == _cursors.end() ) { if ( checkAuth ) audit::logKillCursorsAuthzCheck( currentClient.get(), _nss, id, ErrorCodes::CursorNotFound ); return false; } ClientCursor* cursor = it->second; if ( checkAuth ) audit::logKillCursorsAuthzCheck( currentClient.get(), _nss, id, ErrorCodes::OK ); massert( 16089, str::stream() << "Cannot kill active cursor " << id, cursor->pinValue() < 100 ); cursor->kill(); _deregisterCursor_inlock( cursor ); delete cursor; return true; } void CollectionCursorCache::_deregisterCursor_inlock( ClientCursor* cc ) { invariant( cc ); CursorId id = cc->cursorid(); _cursors.erase( id ); } }