diff options
author | Eliot Horowitz <eliot@10gen.com> | 2014-01-24 15:47:07 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2014-01-24 15:47:07 -0500 |
commit | 7349ba70a0e68627dc322113c561afe3a9ed37a1 (patch) | |
tree | afe597cf004f191288999d8efad785b42833809d | |
parent | ed58b0dfe564253067b4cab11ab75477b7e48388 (diff) | |
download | mongo-7349ba70a0e68627dc322113c561afe3a9ed37a1.tar.gz |
SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache
47 files changed, 1143 insertions, 733 deletions
diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js index 259c6b8ce1a..a71fb2db6be 100644 --- a/jstests/aggregation/testshard1.js +++ b/jstests/aggregation/testshard1.js @@ -206,7 +206,7 @@ assert.eq(db.ts1.find().sort({_id:1}).toArray(), outCollection.find().sort({_id:1}).toArray()); // Make sure we error out if $out collection is sharded -assertErrorCode(db.outCollection, [{$out: db.ts1.getName()}], 17017); +assertErrorCode(outCollection, [{$out: db.ts1.getName()}], 17017); db.literal.save({dollar:false}); diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 78e41bbf7c9..2fd5a3d74ef 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -588,6 +588,7 @@ serverOnlyFiles = [ "db/curop.cpp", "db/catalog/index_create.cpp", "db/catalog/collection.cpp", "db/structure/collection_compact.cpp", + "db/catalog/collection_cursor_cache.cpp", "db/catalog/collection_info_cache.cpp", "db/structure/collection_iterator.cpp", "db/catalog/database_holder.cpp", diff --git a/src/mongo/db/cap.cpp b/src/mongo/db/cap.cpp index b0fb51d69b5..8492c02bfd6 100644 --- a/src/mongo/db/cap.cpp +++ b/src/mongo/db/cap.cpp @@ -466,8 +466,8 @@ namespace mongo { } // Clear all references to this namespace. - ClientCursor::invalidate( ns ); Collection* collection = cc().database()->getCollection( ns ); + collection->cursorCache()->invalidateAll( false ); verify( collection->details() == this ); collection->infoCache()->reset(); diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index e135f6b8e95..018c4c71480 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -75,7 +75,8 @@ namespace mongo { : _ns( fullNS ), _recordStore( _ns.ns() ), _infoCache( this ), - _indexCatalog( this, details ) { + _indexCatalog( this, details ), + _cursorCache( fullNS ) { _details = details; _database = database; _recordStore.init( _details, @@ -232,7 +233,7 @@ namespace mongo { } /* check if any cursors point to us. if so, advance them. */ - ClientCursor::invalidateDocument(_ns.ns(), _details, loc, INVALIDATION_DELETION); + _cursorCache.invalidateDocument(loc, INVALIDATION_DELETION); _indexCatalog.unindexRecord( doc, loc, noWarn); @@ -304,8 +305,7 @@ namespace mongo { // unindex old record, don't delete // this way, if inserting new doc fails, we can re-index this one - ClientCursor::invalidateDocument(_ns.ns(), _details, oldLocation, - INVALIDATION_DELETION); + _cursorCache.invalidateDocument(oldLocation, INVALIDATION_DELETION); _indexCatalog.unindexRecord( objOld, oldLocation, true ); if ( debug ) { @@ -350,7 +350,7 @@ namespace mongo { } // Broadcast the mutation so that query results stay correct. - ClientCursor::invalidateDocument(_ns.ns(), _details, oldLocation, INVALIDATION_MUTATION); + _cursorCache.invalidateDocument(oldLocation, INVALIDATION_MUTATION); // update in place int sz = objNew.objsize(); diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index d2ec9ac5cc9..2472eed68b1 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -33,6 +33,7 @@ #include <string> #include "mongo/base/string_data.h" +#include "mongo/db/catalog/collection_cursor_cache.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/diskloc.h" #include "mongo/db/exec/collection_scan_common.h" @@ -124,6 +125,8 @@ namespace mongo { const IndexCatalog* getIndexCatalog() const { return &_indexCatalog; } IndexCatalog* getIndexCatalog() { return &_indexCatalog; } + CollectionCursorCache* cursorCache() const { return &_cursorCache; } + bool requiresIdIndex() const; BSONObj docFor( const DiskLoc& loc ); @@ -227,6 +230,11 @@ namespace mongo { CollectionInfoCache _infoCache; IndexCatalog _indexCatalog; + // this is mutable because read only users of the Collection class + // use it keep state. This seems valid as const correctness of Collection + // should be about the data. + mutable CollectionCursorCache _cursorCache; + friend class Database; friend class FlatIterator; friend class CappedIterator; diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp new file mode 100644 index 00000000000..4a10e0664dd --- /dev/null +++ b/src/mongo/db/catalog/collection_cursor_cache.cpp @@ -0,0 +1,459 @@ +// collection_cursor_cache.h + +/** +* Copyright (C) 2013 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/db/catalog/collection_cursor_cache.h" + +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/client.h" +#include "mongo/db/query/runner.h" +#include "mongo/platform/random.h" +#include "mongo/util/startup_test.h" + +namespace mongo { + + namespace { + unsigned idFromCursorId( CursorId id ) { + uint64_t x = static_cast<uint64_t>(id); + x = x >> 32; + return static_cast<unsigned>( x ); + } + + CursorId cursorIdFromParts( unsigned collection, + unsigned cursor ) { + CursorId x = static_cast<CursorId>( collection ) << 32; + x |= cursor; + return x; + } + + class IdWorkTest : public StartupTest { + public: + void _run( unsigned a, unsigned b) { + CursorId x = cursorIdFromParts( a, b ); + invariant( a == idFromCursorId( x ) ); + CursorId y = cursorIdFromParts( a, b + 1 ); + invariant( x != y ); + } + + void run() { + _run( 123, 456 ); + _run( 0xdeadbeef, 0xcafecafe ); + _run( 0, 0 ); + _run( 99999999, 999 ); + _run( 0xFFFFFFFF, 1 ); + _run( 0xFFFFFFFF, 0 ); + _run( 0xFFFFFFFF, 0xFFFFFFFF ); + } + } idWorkTest; + } + + class GlobalCursorIdCache { + public: + + GlobalCursorIdCache(); + ~GlobalCursorIdCache(); + + /** + * this gets called when a CollectionCursorCache gets created + * @return the id the CollectionCursorCache should use when generating + * cursor ids + */ + unsigned created( const std::string& ns ); + + /** + * called by CollectionCursorCache when its going away + */ + void destroyed( unsigned id, const std::string& ns ); + + /** + * works globally + */ + bool eraseCursor( CursorId id, bool checkAuth ); + + void appendStats( BSONObjBuilder& builder ); + + std::size_t timeoutCursors( unsigned millisSinceLastCall ); + + int64_t nextSeed(); + + private: + SimpleMutex _mutex; + + typedef unordered_map<unsigned,string> Map; + Map _idToNS; + unsigned _nextId; + + SecureRandom* _secureRandom; + } _globalCursorIdCache; + + GlobalCursorIdCache::GlobalCursorIdCache() + : _mutex( "GlobalCursorIdCache" ), + _nextId( 0 ), + _secureRandom( NULL ) { + } + + GlobalCursorIdCache::~GlobalCursorIdCache() { + // we're just going to leak everything, as it doesn't matter + } + + int64_t GlobalCursorIdCache::nextSeed() { + SimpleMutex::scoped_lock lk( _mutex ); + if ( !_secureRandom ) + _secureRandom = SecureRandom::create(); + return _secureRandom->nextInt64(); + } + + unsigned GlobalCursorIdCache::created( const std::string& ns ) { + static const unsigned MAX_IDS = 1000 * 1000 * 1000; + + SimpleMutex::scoped_lock lk( _mutex ); + + fassert( 17359, _idToNS.size() < MAX_IDS ); + + for ( unsigned i = 0; i <= MAX_IDS; i++ ) { + unsigned id = ++_nextId; + if ( id == 0 ) + continue; + if ( _idToNS.count( id ) > 0 ) + continue; + _idToNS[id] = ns; + return id; + } + + invariant( false ); + } + + void GlobalCursorIdCache::destroyed( unsigned id, const std::string& ns ) { + SimpleMutex::scoped_lock lk( _mutex ); + invariant( ns == _idToNS[id] ); + _idToNS.erase( id ); + } + + bool GlobalCursorIdCache::eraseCursor(CursorId id, bool checkAuth) { + string ns; + { + SimpleMutex::scoped_lock lk( _mutex ); + unsigned nsid = idFromCursorId( id ); + Map::const_iterator it = _idToNS.find( nsid ); + if ( it == _idToNS.end() ) { + return false; + } + ns = it->second; + } + + NamespaceString nss( ns ); + + if ( checkAuth ) { + AuthorizationSession* as = cc().getAuthorizationSession(); + bool isAuthorized = as->isAuthorizedForActionsOnNamespace(nss, + ActionType::killCursors); + if ( !isAuthorized ) { + audit::logKillCursorsAuthzCheck( currentClient.get(), + nss, + id, + ErrorCodes::Unauthorized ); + return false; + } + } + + Client::ReadContext ctx( ns ); + Collection* collection = ctx.ctx().db()->getCollection( ns ); + ClientCursor* cursor = NULL; + if ( collection ) { + cursor = collection->cursorCache()->find( id ); + } + + if ( !cursor ) { + if ( checkAuth ) + audit::logKillCursorsAuthzCheck( currentClient.get(), + nss, + id, + ErrorCodes::CursorNotFound ); + return false; + } + + if ( checkAuth ) + audit::logKillCursorsAuthzCheck( currentClient.get(), + nss, + id, + ErrorCodes::OK ); + + massert( 16089, + str::stream() << "Cannot kill active cursor " << id, + cursor->pinValue() < 100 ); + + cursor->kill(); + collection->cursorCache()->deregisterCursor( cursor ); + return true; + } + + std::size_t GlobalCursorIdCache::timeoutCursors( unsigned millisSinceLastCall ) { + vector<string> todo; + { + SimpleMutex::scoped_lock lk( _mutex ); + for ( Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i ) + todo.push_back( i->second ); + } + + size_t totalTimedOut = 0; + + for ( unsigned i = 0; i < todo.size(); i++ ) { + const string& ns = todo[i]; + Client::ReadContext context( ns ); + Collection* collection = context.ctx().db()->getCollection( ns ); + if ( collection == NULL ) { + continue; + } + + totalTimedOut += collection->cursorCache()->timeoutCursors( millisSinceLastCall ); + + } + + return totalTimedOut; + } + + // --- + + std::size_t CollectionCursorCache::timeoutCursorsGlobal( unsigned millisSinceLastCall ) { + return _globalCursorIdCache.timeoutCursors( millisSinceLastCall ); + } + + int CollectionCursorCache::eraseCursorGlobalIfAuthorized(int n, long long* ids) { + int numDeleted = 0; + for ( int i = 0; i < n; i++ ) { + if ( eraseCursorGlobalIfAuthorized( ids[i] ) ) + numDeleted++; + if ( inShutdown() ) + break; + } + return numDeleted; + } + bool CollectionCursorCache::eraseCursorGlobalIfAuthorized(CursorId id) { + return _globalCursorIdCache.eraseCursor( id, true ); + } + bool CollectionCursorCache::eraseCursorGlobal( CursorId id ) { + return _globalCursorIdCache.eraseCursor( id, false ); + } + + + // -------------------------- + + + CollectionCursorCache::CollectionCursorCache( const StringData& ns ) + : _ns( ns.toString() ), + _mutex( "CollectionCursorCache" ) { + _collectionCacheRuntimeId = _globalCursorIdCache.created( _ns ); + _random.reset( new PseudoRandom( _globalCursorIdCache.nextSeed() ) ); + } + + CollectionCursorCache::~CollectionCursorCache() { + invalidateAll( true ); + _globalCursorIdCache.destroyed( _collectionCacheRuntimeId, _ns ); + } + + void CollectionCursorCache::invalidateAll( bool collectionGoingAway ) { + SimpleMutex::scoped_lock lk( _mutex ); + + for ( RunnerSet::iterator it = _nonCachedRunners.begin(); + it != _nonCachedRunners.end(); + ++it ) { + + // we kill the runner, but it deletes itself + Runner* runner = *it; + runner->kill(); + invariant( runner->collection() == NULL ); + } + _nonCachedRunners.clear(); + + if ( collectionGoingAway ) { + // we're going to wipe out the world + for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { + ClientCursor* cc = i->second; + + cc->kill(); + + invariant( cc->getRunner() == NULL || cc->getRunner()->collection() == NULL ); + + // If there is a pinValue >= 100, somebody is actively using the CC and we do + // not delete it. Instead we notify the holder that we killed it. The holder + // will then delete the CC. + // pinvalue is <100, so there is nobody actively holding the CC. We can + // safely delete it as nobody is holding the CC. + + if (cc->pinValue() < 100) { + delete cc; + } + } + } + else { + CursorMap newMap; + + // collection will still be around, just all Runners are invalid + for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { + ClientCursor* cc = i->second; + + if (cc->isAggCursor) { + // Aggregation cursors don't have their lifetime bound to the underlying collection. + newMap[i->first] = i->second; + continue; + } + + // Note that a valid ClientCursor state is "no cursor no runner." This is because + // the set of active cursor IDs in ClientCursor is used as representation of query + // state. See sharding_block.h. TODO(greg,hk): Move this out. + if (NULL == cc->getRunner() ) { + newMap.insert( *i ); + continue; + } + + if (cc->pinValue() < 100) { + cc->kill(); + delete cc; + } + else { + // this is pinned, so still alive, so we leave around + // we kill the Runner to signal + if ( cc->getRunner() ) + cc->getRunner()->kill(); + newMap.insert( *i ); + } + + } + + _cursors = newMap; + } + } + + void CollectionCursorCache::invalidateDocument( const DiskLoc& dl, + InvalidationType type ) { + SimpleMutex::scoped_lock lk( _mutex ); + + for ( RunnerSet::iterator it = _nonCachedRunners.begin(); + it != _nonCachedRunners.end(); + ++it ) { + + Runner* runner = *it; + runner->invalidate(dl, type); + } + + for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { + Runner* runner = i->second->getRunner(); + if ( runner ) { + runner->invalidate(dl, type); + } + } + } + + std::size_t CollectionCursorCache::timeoutCursors( unsigned millisSinceLastCall ) { + SimpleMutex::scoped_lock lk( _mutex ); + + vector<ClientCursor*> toDelete; + + for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { + ClientCursor* cc = i->second; + if ( cc->shouldTimeout( millisSinceLastCall ) ) + toDelete.push_back( cc ); + } + + for ( vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i ) { + ClientCursor* cc = *i; + _deregisterCursor_inlock( cc ); + cc->kill(); + delete cc; + } + + return toDelete.size(); + } + + void CollectionCursorCache::registerRunner( Runner* runner ) { + SimpleMutex::scoped_lock lk( _mutex ); + const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner); + invariant(result.second); // make sure this was inserted + } + + void CollectionCursorCache::deregisterRunner( Runner* runner ) { + SimpleMutex::scoped_lock lk( _mutex ); + _nonCachedRunners.erase( runner ); + } + + ClientCursor* CollectionCursorCache::find( CursorId id ) { + SimpleMutex::scoped_lock lk( _mutex ); + CursorMap::const_iterator it = _cursors.find( id ); + if ( it == _cursors.end() ) + return NULL; + return it->second; + } + + void CollectionCursorCache::getCursorIds( std::set<CursorId>* openCursors ) { + SimpleMutex::scoped_lock lk( _mutex ); + + for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { + ClientCursor* cc = i->second; + openCursors->insert( cc->cursorid() ); + } + } + + size_t CollectionCursorCache::numCursors(){ + SimpleMutex::scoped_lock lk( _mutex ); + return _cursors.size(); + } + + CursorId CollectionCursorCache::_allocateCursorId_inlock() { + for ( int i = 0; i < 10000; i++ ) { + unsigned mypart = static_cast<unsigned>( _random->nextInt32() ); + CursorId id = cursorIdFromParts( _collectionCacheRuntimeId, mypart ); + if ( _cursors.count( id ) == 0 ) + return id; + } + fassertFailed( 17360 ); + } + + CursorId CollectionCursorCache::registerCursor( ClientCursor* cc ) { + invariant( cc ); + SimpleMutex::scoped_lock lk( _mutex ); + CursorId id = _allocateCursorId_inlock(); + _cursors[id] = cc; + return id; + } + + void CollectionCursorCache::deregisterCursor( ClientCursor* cc ) { + SimpleMutex::scoped_lock lk( _mutex ); + _deregisterCursor_inlock( cc ); + } + + void CollectionCursorCache::_deregisterCursor_inlock( ClientCursor* cc ) { + invariant( cc ); + CursorId id = cc->cursorid(); + _cursors.erase( id ); + } + +} diff --git a/src/mongo/db/catalog/collection_cursor_cache.h b/src/mongo/db/catalog/collection_cursor_cache.h new file mode 100644 index 00000000000..91bb80e84f6 --- /dev/null +++ b/src/mongo/db/catalog/collection_cursor_cache.h @@ -0,0 +1,129 @@ +// collection_cursor_cache.h + +/** +* Copyright (C) 2013 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include "mongo/db/clientcursor.h" +#include "mongo/db/diskloc.h" +#include "mongo/db/invalidation_type.h" +#include "mongo/platform/unordered_set.h" +#include "mongo/util/concurrency/mutex.h" + +namespace mongo { + + class PseudoRandom; + class Runner; + + class CollectionCursorCache { + public: + CollectionCursorCache( const StringData& ns ); + + /** + * will kill() all Runner instances it has + */ + ~CollectionCursorCache(); + + // ----------------- + + /** + * @param collectionGoingAway should be tru if the Collection instance is going away + * this could be because the db is being closed, or the + * collection/db is being dropped. + */ + void invalidateAll( bool collectionGoingAway ); + + /** + * Broadcast a document invalidation to all relevant Runner(s). invalidateDocument must + * called *before* the provided DiskLoc is about to be deleted or mutated. + */ + void invalidateDocument( const DiskLoc& dl, + InvalidationType type ); + + /* + * timesout cursors that have been idle for too long + * note: must have a readlock on the collection + * @return number timed out + */ + std::size_t timeoutCursors( unsigned millisSinceLastCall ); + + // ----------------- + + /** + * Register a runner so that it can be notified of deletion/invalidation during yields. + * Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it + * MUST NOT be registered; the two are mutually exclusive. + */ + void registerRunner(Runner* runner); + + /** + * Remove a runner from the runner registry. + */ + void deregisterRunner(Runner* runner); + + // ----------------- + + CursorId registerCursor( ClientCursor* cc ); + void deregisterCursor( ClientCursor* cc ); + + void getCursorIds( std::set<CursorId>* openCursors ); + std::size_t numCursors(); + + ClientCursor* find( CursorId id ); + + // ---------------------- + + static int eraseCursorGlobalIfAuthorized( int n, long long* ids ); + static bool eraseCursorGlobalIfAuthorized( CursorId id ); + + static bool eraseCursorGlobal( CursorId id ); + + /** + * @return number timed out + */ + static std::size_t timeoutCursorsGlobal( unsigned millisSinceLastCall ); + + private: + CursorId _allocateCursorId_inlock(); + void _deregisterCursor_inlock( ClientCursor* cc ); + + string _ns; + unsigned _collectionCacheRuntimeId; + scoped_ptr<PseudoRandom> _random; + + SimpleMutex _mutex; + + typedef unordered_set<Runner*> RunnerSet; + RunnerSet _nonCachedRunners; + + typedef unordered_map<CursorId,ClientCursor*> CursorMap; + CursorMap _cursors; + }; + +} diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp index 74c5bccbcbc..f6d6815cbee 100644 --- a/src/mongo/db/catalog/database.cpp +++ b/src/mongo/db/catalog/database.cpp @@ -321,7 +321,6 @@ namespace mongo { verify( collection->_details->getTotalIndexCount() == 0 ); LOG(1) << "\t dropIndexes done" << endl; - ClientCursor::invalidate( fullns ); Top::global.collectionDropped( fullns ); Status s = _dropNS( fullns ); @@ -361,7 +360,7 @@ namespace mongo { if ( it == _collections.end() ) return; - delete it->second; + delete it->second; // this also deletes all cursors + runners _collections.erase( it ); } @@ -475,9 +474,6 @@ namespace mongo { _clearCollectionCache_inlock( toNSString ); } - ClientCursor::invalidate( fromNSString.c_str() ); - ClientCursor::invalidate( toNSString.c_str() ); - // at this point, we haven't done anything destructive yet // ---- diff --git a/src/mongo/db/catalog/index_catalog.cpp b/src/mongo/db/catalog/index_catalog.cpp index 2e40107a044..1ca58122c40 100644 --- a/src/mongo/db/catalog/index_catalog.cpp +++ b/src/mongo/db/catalog/index_catalog.cpp @@ -167,7 +167,7 @@ namespace mongo { << " ns: " << _collection->ns().ns() ); } - bool IndexCatalog::_shouldOverridePlugin(const BSONObj& keyPattern) { + bool IndexCatalog::_shouldOverridePlugin(const BSONObj& keyPattern) const { string pluginName = IndexNames::findPluginName(keyPattern); bool known = IndexNames::isKnownName(pluginName); @@ -200,7 +200,7 @@ namespace mongo { return false; } - string IndexCatalog::_getAccessMethodName(const BSONObj& keyPattern) { + string IndexCatalog::_getAccessMethodName(const BSONObj& keyPattern) const { if ( _shouldOverridePlugin(keyPattern) ) { return ""; } @@ -703,7 +703,7 @@ namespace mongo { // there may be pointers pointing at keys in the btree(s). kill them. // TODO: can this can only clear cursors on this index? - ClientCursor::invalidate( _collection->ns().ns() ); + _collection->cursorCache()->invalidateAll( false ); // make sure nothing in progress massert( 17348, @@ -808,7 +808,7 @@ namespace mongo { // there may be pointers pointing at keys in the btree(s). kill them. // TODO: can this can only clear cursors on this index? - ClientCursor::invalidate( _collection->ns().ns() ); + _collection->cursorCache()->invalidateAll( false ); // wipe out stats _collection->infoCache()->reset(); @@ -1076,6 +1076,12 @@ namespace mongo { return entry->accessMethod(); } + const IndexAccessMethod* IndexCatalog::getIndex( const IndexDescriptor* desc ) const { + const IndexCatalogEntry* entry = _entries.find( desc ); + massert( 17357, "cannot find index entry", entry ); + return entry->accessMethod(); + } + IndexAccessMethod* IndexCatalog::_createAccessMethod( const IndexDescriptor* desc, IndexCatalogEntry* entry ) { string type = _getAccessMethodName(desc->keyPattern()); diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h index 9056b9af975..3cdb8361292 100644 --- a/src/mongo/db/catalog/index_catalog.h +++ b/src/mongo/db/catalog/index_catalog.h @@ -100,6 +100,7 @@ namespace mongo { // never returns NULL IndexAccessMethod* getIndex( const IndexDescriptor* desc ); + const IndexAccessMethod* getIndex( const IndexDescriptor* desc ) const; class IndexIterator { public: @@ -251,14 +252,14 @@ namespace mongo { int _removeFromSystemIndexes( const StringData& indexName ); - bool _shouldOverridePlugin( const BSONObj& keyPattern ); + bool _shouldOverridePlugin( const BSONObj& keyPattern ) const; /** * This differs from IndexNames::findPluginName in that returns the plugin name we *should* * use, not the plugin name inside of the provided key pattern. To understand when these * differ, see shouldOverridePlugin. */ - string _getAccessMethodName(const BSONObj& keyPattern); + string _getAccessMethodName(const BSONObj& keyPattern) const; IndexDetails* _getIndexDetails( const IndexDescriptor* descriptor ) const; diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 643d1f4aafd..8c6ca3be192 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -34,6 +34,7 @@ #include <time.h> #include <vector> +#include "mongo/base/counter.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" @@ -55,35 +56,53 @@ namespace mongo { - ClientCursor::CCById ClientCursor::clientCursorsById; - boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) ); - long long ClientCursor::numberTimedOut = 0; - ClientCursor::RunnerSet ClientCursor::nonCachedRunners; + static Counter64 cursorStatsOpen; // gauge + static Counter64 cursorStatsOpenPinned; // gauge + static Counter64 cursorStatsOpenNoTimeout; // gauge + static Counter64 cursorStatsTimedOut; + + static ServerStatusMetricField<Counter64> dCursorStatsOpen( "cursor.open.total", + &cursorStatsOpen ); + static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned( "cursor.open.pinned", + &cursorStatsOpenPinned ); + static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout( "cursor.open.noTimeout", + &cursorStatsOpenNoTimeout ); + static ServerStatusMetricField<Counter64> dCursorStatusTimedout( "cursor.timedOut", + &cursorStatsTimedOut ); void aboutToDeleteForSharding(const StringData& ns, const Database* db, const NamespaceDetails* nsd, const DiskLoc& dl ); // from s/d_logic.h - ClientCursor::ClientCursor(Runner* runner, int qopts, const BSONObj query) { + long long ClientCursor::totalOpen() { + return cursorStatsOpen.get(); + } + + ClientCursor::ClientCursor(const Collection* collection, Runner* runner, + int qopts, const BSONObj query) + : _collection( collection ), + _countedYet( false ) { _runner.reset(runner); _ns = runner->ns(); _query = query; _queryOptions = qopts; + if ( runner->collection() ) { + invariant( collection == runner->collection() ); + } init(); } - ClientCursor::ClientCursor(const string& ns) - : _ns(ns), + ClientCursor::ClientCursor(const Collection* collection) + : _ns(collection->ns().ns()), + _collection(collection), + _countedYet( false ), _queryOptions(QueryOption_NoCursorTimeout) { - init(); } void ClientCursor::init() { - _db = cc().database(); - verify( _db ); - verify( _db->ownsNS( _ns ) ); + invariant( _collection ); isAggCursor = false; @@ -91,18 +110,20 @@ namespace mongo { _leftoverMaxTimeMicros = 0; _pinValue = 0; _pos = 0; - + Lock::assertAtLeastReadLocked(_ns); if (_queryOptions & QueryOption_NoCursorTimeout) { // cursors normally timeout after an inactivity period to prevent excess memory use // setting this prevents timeout of the cursor in question. ++_pinValue; + cursorStatsOpenNoTimeout.increment(); } - recursive_scoped_lock lock(ccmutex); - _cursorid = allocCursorId_inlock(); - clientCursorsById.insert( make_pair(_cursorid, this) ); + _cursorid = _collection->cursorCache()->registerCursor( this ); + + cursorStatsOpen.increment(); + _countedYet = true; } ClientCursor::~ClientCursor() { @@ -112,160 +133,30 @@ namespace mongo { return; } - { - recursive_scoped_lock lock(ccmutex); - clientCursorsById.erase(_cursorid); - - // defensive: - _cursorid = INVALID_CURSOR_ID; - _pos = -2; - _pinValue = 0; + if ( _countedYet ) { + _countedYet = false; + cursorStatsOpen.decrement(); + if ( _pinValue == 1 ) + cursorStatsOpenNoTimeout.decrement(); } - } - - void ClientCursor::invalidate(const StringData& ns) { - Lock::assertWriteLocked(ns); - - size_t dot = ns.find( '.' ); - verify( dot != string::npos ); - // first (and only) dot is the last char - bool isDB = dot == ns.size() - 1; - - Database *db = cc().database(); - verify(db); - verify(ns.startsWith(db->name())); - - recursive_scoped_lock cclock(ccmutex); - // Look at all active non-cached Runners. These are the runners that are in auto-yield mode - // that are not attached to the the client cursor. For example, all internal runners don't - // need to be cached -- there will be no getMore. - for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); - ++it) { - - Runner* runner = *it; - const string& runnerNS = runner->ns(); - if ( ( isDB && StringData(runnerNS).startsWith(ns) ) || ns == runnerNS ) { - runner->kill(); - } + if ( _collection ) { + // this could be null if kill() was killed + _collection->cursorCache()->deregisterCursor( this ); } - // Look at all cached ClientCursor(s). The CC may have a Runner, a Cursor, or nothing (see - // sharding_block.h). - CCById::const_iterator it = clientCursorsById.begin(); - while (it != clientCursorsById.end()) { - ClientCursor* cc = it->second; - - // Aggregation cursors don't have their lifetime bound to the underlying collection. - if (cc->isAggCursor) { - ++it; - continue; - } - - // We're only interested in cursors over one db. - if (cc->_db != db) { - ++it; - continue; - } - - // Note that a valid ClientCursor state is "no cursor no runner." This is because - // the set of active cursor IDs in ClientCursor is used as representation of query - // state. See sharding_block.h. TODO(greg,hk): Move this out. - if (NULL == cc->_runner.get()) { - ++it; - continue; - } - - bool shouldDelete = false; - - // We will only delete CCs with runners that are not actively in use. The runners that - // are actively in use are instead kill()-ed. - if (NULL != cc->_runner.get()) { - if (isDB || cc->_runner->ns() == ns) { - // If there is a pinValue >= 100, somebody is actively using the CC and we do - // not delete it. Instead we notify the holder that we killed it. The holder - // will then delete the CC. - if (cc->_pinValue >= 100) { - cc->_runner->kill(); - } - else { - // pinvalue is <100, so there is nobody actively holding the CC. We can - // safely delete it as nobody is holding the CC. - shouldDelete = true; - } - } - } - - if (shouldDelete) { - ClientCursor* toDelete = it->second; - CursorId id = toDelete->cursorid(); - delete toDelete; - // We're not following the usual paradigm of saving it, ++it, and deleting the saved - // 'it' because deleting 'it' might invalidate the next thing in clientCursorsById. - // TODO: Why? - it = clientCursorsById.upper_bound(id); - } - else { - ++it; - } - } - } - - void ClientCursor::invalidateDocument(const StringData& ns, - const NamespaceDetails* nsd, - const DiskLoc& dl, - InvalidationType type) { - // TODO: Do we need this pagefault thing still - NoPageFaultsAllowed npfa; - recursive_scoped_lock lock(ccmutex); - - Database *db = cc().database(); - verify(db); - aboutToDeleteForSharding( ns, db, nsd, dl ); - - // Check our non-cached active runner list. - for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); - ++it) { - - Runner* runner = *it; - if (0 == ns.compare(runner->ns())) { - runner->invalidate(dl, type); - } - } - - // TODO: This requires optimization. We walk through *all* CCs and send the delete to every - // CC open on the db we're deleting from. We could: - // 1. Map from ns to open runners, - // 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL) - // - // We could also queue invalidations somehow and have them processed later in the runner's - // read locks. - for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end(); - ++it) { - - ClientCursor* cc = it->second; - // We're only interested in cursors over one db. - if (cc->_db != db) { continue; } - if (NULL == cc->_runner.get()) { continue; } - cc->_runner->invalidate(dl, type); - } + // defensive: + _collection = NULL; + _cursorid = INVALID_CURSOR_ID; + _pos = -2; + _pinValue = 0; } - void ClientCursor::registerRunner(Runner* runner) { - recursive_scoped_lock lock(ccmutex); - // The second part of the pair returned by unordered_map::insert tells us whether the - // insert was performed or not, so we can use that to ensure that we have not been - // double registered. - const std::pair<RunnerSet::iterator, bool> result = nonCachedRunners.insert(runner); - verify(result.second); - } + void ClientCursor::kill() { + if ( _runner.get() ) + _runner->kill(); - void ClientCursor::deregisterRunner(Runner* runner) { - recursive_scoped_lock lock(ccmutex); - // unordered_set::erase returns a count of how many elements were erased, so we can - // validate that our de-registration matched an existing register call by ensuring that - // exactly one item was erased. - verify(nonCachedRunners.erase(runner) == 1); + _collection = NULL; } void yieldOrSleepFor1Microsecond() { @@ -352,238 +243,12 @@ namespace mongo { _idleAgeMillis = millis; } - void ClientCursor::idleTimeReport(unsigned millis) { - bool foundSomeToTimeout = false; - - // two passes so that we don't need to readlock unless we really do some timeouts - // we assume here that incrementing _idleAgeMillis outside readlock is ok. - { - recursive_scoped_lock lock(ccmutex); - { - unsigned sz = clientCursorsById.size(); - static time_t last; - if( sz >= 100000 ) { - if( time(0) - last > 300 ) { - last = time(0); - log() << "warning number of open cursors is very large: " << sz << endl; - } - } - } - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) { - CCById::iterator j = i; - i++; - if( j->second->shouldTimeout( millis ) ) { - foundSomeToTimeout = true; - } - } - } - - if( foundSomeToTimeout ) { - Lock::GlobalRead lk; - - recursive_scoped_lock cclock(ccmutex); - CCById::const_iterator it = clientCursorsById.begin(); - while (it != clientCursorsById.end()) { - ClientCursor* cc = it->second; - if( cc->shouldTimeout(0) ) { - numberTimedOut++; - LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns - << " idle:" << cc->idleTime() << "ms\n"; - ClientCursor* toDelete = it->second; - CursorId id = toDelete->cursorid(); - // This is what winds up removing it from the map. - delete toDelete; - it = clientCursorsById.upper_bound(id); - } - else { - ++it; - } - } - } - } - void ClientCursor::updateSlaveLocation( CurOp& curop ) { if ( _slaveReadTill.isNull() ) return; mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill ); } - void ClientCursor::appendStats( BSONObjBuilder& result ) { - recursive_scoped_lock lock(ccmutex); - result.appendNumber("totalOpen", clientCursorsById.size() ); - result.appendNumber("clientCursors_size", (int) numCursors()); - result.appendNumber("timedOut" , numberTimedOut); - unsigned pinned = 0; - unsigned notimeout = 0; - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) { - unsigned p = i->second->_pinValue; - if( p >= 100 ) - pinned++; - else if( p > 0 ) - notimeout++; - } - if( pinned ) - result.append("pinned", pinned); - if( notimeout ) - result.append("totalNoTimeout", notimeout); - } - - // - // ClientCursor creation/deletion/access. - // - - // Some statics used by allocCursorId_inlock(). - namespace { - // so we don't have to do find() which is a little slow very often. - long long cursorGenTSLast = 0; - PseudoRandom* cursorGenRandom = NULL; - } - - long long ClientCursor::allocCursorId_inlock() { - // It is important that cursor IDs not be reused within a short period of time. - if (!cursorGenRandom) { - scoped_ptr<SecureRandom> sr( SecureRandom::create() ); - cursorGenRandom = new PseudoRandom( sr->nextInt64() ); - } - - const long long ts = Listener::getElapsedTimeMillis(); - long long x; - while ( 1 ) { - x = ts << 32; - x |= cursorGenRandom->nextInt32(); - - if ( x == 0 ) { continue; } - - if ( x < 0 ) { x *= -1; } - - if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 ) - break; - } - - cursorGenTSLast = ts; - return x; - } - - // static - ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) { - CCById::iterator it = clientCursorsById.find(id); - if ( it == clientCursorsById.end() ) { - if ( warn ) { - OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id - << "' (ok after a drop)" << endl; - } - return 0; - } - return it->second; - } - - void ClientCursor::find( const string& ns , set<CursorId>& all ) { - recursive_scoped_lock lock(ccmutex); - - for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) { - if ( i->second->_ns == ns ) - all.insert( i->first ); - } - } - - // static - ClientCursor* ClientCursor::find(CursorId id, bool warn) { - recursive_scoped_lock lock(ccmutex); - ClientCursor *c = find_inlock(id, warn); - // if this asserts, your code was not thread safe - you either need to set no timeout - // for the cursor or keep a ClientCursor::Pointer in scope for it. - massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue ); - return c; - } - - void ClientCursor::_erase_inlock(ClientCursor* cursor) { - // Must not have an active ClientCursor::Pin. - massert( 16089, - str::stream() << "Cannot kill active cursor " << cursor->cursorid(), - cursor->_pinValue < 100 ); - - delete cursor; - } - - bool ClientCursor::erase(CursorId id) { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { return false; } - _erase_inlock(cursor); - return true; - } - - int ClientCursor::erase(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( erase(ids[i])) - found++; - - if ( inShutdown() ) - break; - } - return found; - } - - bool ClientCursor::eraseIfAuthorized(CursorId id) { - NamespaceString ns; - { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - audit::logKillCursorsAuthzCheck( - &cc(), - NamespaceString(), - id, - ErrorCodes::CursorNotFound); - return false; - } - ns = NamespaceString(cursor->ns()); - } - - // Can't be in a lock when checking authorization - const bool isAuthorized = cc().getAuthorizationSession()->isAuthorizedForActionsOnNamespace( - ns, ActionType::killCursors); - audit::logKillCursorsAuthzCheck( - &cc(), - ns, - id, - isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized); - if (!isAuthorized) { - return false; - } - - // It is safe to lookup the cursor again after temporarily releasing the mutex because - // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that - // the namespace associated with a cursor cannot change. - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - // Cursor was deleted in another thread since we found it earlier in this function. - return false; - } - if (ns != cursor->ns()) { - warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: " - << cursor->ns() << endl; - return false; - } - - _erase_inlock(cursor); - return true; - } - - int ClientCursor::eraseIfAuthorized(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( eraseIfAuthorized(ids[i])) - found++; - - if ( inShutdown() ) - break; - } - return found; - } - int ClientCursor::suggestYieldMicros() { int writers = 0; int readers = 0; @@ -606,42 +271,45 @@ namespace mongo { // deleted from underneath us, so we can save the pointer and ignore the ID. // - ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) { - recursive_scoped_lock lock( ClientCursor::ccmutex ); - ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true ); - if (NULL != cursor) { - uassert( 12051, "clientcursor already in use? driver problem?", - cursor->_pinValue < 100 ); - cursor->_pinValue += 100; - _cursorid = cursorid; + ClientCursorPin::ClientCursorPin( const Collection* collection, long long cursorid ) + : _cursor( NULL ) { + cursorStatsOpenPinned.increment(); + _cursor = collection->cursorCache()->find( cursorid ); + if ( _cursor ) { + uassert( 12051, + "clientcursor already in use? driver problem?", + _cursor->_pinValue < 100 ); + _cursor->_pinValue += 100; } } - ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) } + ClientCursorPin::~ClientCursorPin() { + cursorStatsOpenPinned.decrement(); + DESTRUCTOR_GUARD( release(); ); + } void ClientCursorPin::release() { - if ( _cursorid == INVALID_CURSOR_ID ) { + if ( !_cursor ) return; - } - ClientCursor *cursor = c(); - _cursorid = INVALID_CURSOR_ID; - if ( cursor ) { - verify( cursor->_pinValue >= 100 ); - cursor->_pinValue -= 100; + + invariant( _cursor->_pinValue >= 100 ); + _cursor->_pinValue -= 100; + + if ( _cursor->collection() == NULL ) { + // the ClientCursor was killed while we had it + // therefore its our responsibility to kill it + delete _cursor; + _cursor = NULL; // defensive } } void ClientCursorPin::deleteUnderlying() { - if (_cursorid == INVALID_CURSOR_ID) { - return; - } - ClientCursor *cursor = c(); - _cursorid = INVALID_CURSOR_ID; - delete cursor; + delete _cursor; + _cursor = NULL; } ClientCursor* ClientCursorPin::c() const { - return ClientCursor::find( _cursorid ); + return _cursor; } // @@ -708,9 +376,9 @@ namespace mongo { const int Secs = 4; unsigned n = 0; while ( ! inShutdown() ) { - ClientCursor::idleTimeReport( t.millisReset() ); + cursorStatsTimedOut.increment( CollectionCursorCache::timeoutCursorsGlobal( t.millisReset() ) ); sleepsecs(Secs); - if( ++n % (60/4) == 0 /*once a minute*/ ) { + if( ++n % (60/Secs) == 0 /*once a minute*/ ) { sayMemoryStatus(); } } @@ -723,14 +391,25 @@ namespace mongo { // cursorInfo command. // + void _appendCursorStats( BSONObjBuilder& b ) { + b.append( "note" , "deprecated, use server status metrics" ); + + b.appendNumber("totalOpen", cursorStatsOpen.get() ); + b.appendNumber("pinned", cursorStatsOpenPinned.get() ); + b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get() ); + + b.appendNumber("timedOut" , cursorStatsTimedOut.get()); + } + // QUESTION: Restrict to the namespace from which this command was issued? // Alternatively, make this command admin-only? + // TODO: remove this for 2.8 class CmdCursorInfo : public Command { public: CmdCursorInfo() : Command( "cursorInfo", true ) {} virtual bool slaveOk() const { return true; } virtual void help( stringstream& help ) const { - help << " example: { cursorInfo : 1 }"; + help << " example: { cursorInfo : 1 }, deprecated"; } virtual LockType locktype() const { return NONE; } virtual void addRequiredPrivileges(const std::string& dbname, @@ -742,7 +421,8 @@ namespace mongo { } bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { - ClientCursor::appendStats( result ); + _appendCursorStats( result ); + result.append( "note", "deprecated, get from serverStatus().cursors" ); return true; } } cmdCursorInfo; @@ -758,7 +438,7 @@ namespace mongo { BSONObj generateSection(const BSONElement& configElement) const { BSONObjBuilder b; - ClientCursor::appendStats( b ); + _appendCursorStats( b ); return b.obj(); } } cursorServerStats; diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 0d15fca29c0..78075bde52b 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -42,6 +42,7 @@ namespace mongo { typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; class ClientCursor; + class Collection; class CurOp; class Database; class NamespaceDetails; @@ -56,9 +57,10 @@ namespace mongo { */ class ClientCursor : private boost::noncopyable { public: - ClientCursor(Runner* runner, int qopts = 0, const BSONObj query = BSONObj()); + ClientCursor(const Collection* collection, Runner* runner, + int qopts = 0, const BSONObj query = BSONObj()); - ClientCursor(const string& ns); + ClientCursor(const Collection* collection); ~ClientCursor(); @@ -68,90 +70,29 @@ namespace mongo { CursorId cursorid() const { return _cursorid; } string ns() const { return _ns; } - Database* db() const { return _db; } - - // - // Mutation/Invalidation of DiskLocs and dropping of namespaces - // + const Collection* collection() const { return _collection; } /** - * Get rid of cursors for namespaces 'ns'. When dropping a db, ns is "dbname." Used by drop, - * dropIndexes, dropDatabase. + * This is called when someone is dropping a collection or something else that + * goes through killing cursors. + * It removes the responsiilibty of de-registering from ClientCursor. + * Responsibility for deleting the ClientCursor doesn't change from this call + * see Runner::kill. */ - static void invalidate(const StringData& ns); - - /** - * Broadcast a document invalidation to all relevant Runner(s). invalidateDocument must - * called *before* the provided DiskLoc is about to be deleted or mutated. - */ - static void invalidateDocument(const StringData& ns, - const NamespaceDetails* nsd, - const DiskLoc& dl, - InvalidationType type); - - /** - * Register a runner so that it can be notified of deletion/invalidation during yields. - * Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it - * MUST NOT be registered; the two are mutually exclusive. - */ - static void registerRunner(Runner* runner); - - /** - * Remove a runner from the runner registry. - */ - static void deregisterRunner(Runner* runner); + void kill(); // // Yielding. - // + // static void staticYield(int micros, const StringData& ns, const Record* rec); static int suggestYieldMicros(); // - // Static methods about all ClientCursors TODO: Document. - // - - static void appendStats( BSONObjBuilder& result ); - - // - // ClientCursor creation/deletion. - // - - static unsigned numCursors() { return clientCursorsById.size(); } - static void find( const string& ns , set<CursorId>& all ); - static ClientCursor* find(CursorId id, bool warn = true); - - // Same as erase but checks to make sure this thread has read permission on the cursor's - // namespace. This should be called when receiving killCursors from a client. This should - // not be called when ccmutex is held. - static int eraseIfAuthorized(int n, long long* ids); - static bool eraseIfAuthorized(CursorId id); - - /** - * @return number of cursors found - */ - static int erase(int n, long long* ids); - - /** - * Deletes the cursor with the provided @param 'id' if one exists. - * @throw if the cursor with the provided id is pinned. - * This does not do any auth checking and should be used only when erasing cursors as part - * of cleaning up internal operations. - */ - static bool erase(CursorId id); - - // // Timing and timeouts // /** - * called every 4 seconds. millis is amount of idle time passed since the last call -- - * could be zero - */ - static void idleTimeReport(unsigned millis); - - /** * @param millis amount of idle passed time since last call * note called outside of locks (other than ccmutex) so care must be exercised */ @@ -202,51 +143,20 @@ namespace mongo { */ bool isAggCursor; + unsigned pinValue() const { return _pinValue; } + + static long long totalOpen(); + private: + friend class ClientCursorMonitor; friend class ClientCursorPin; friend class CmdCursorInfo; - // A map from the CursorId to the ClientCursor behind it. - // TODO: Consider making this per-connection. - typedef map<CursorId, ClientCursor*> CCById; - static CCById clientCursorsById; - - // A list of NON-CACHED runners. Any runner that yields must be put into this map before - // yielding in order to be notified of invalidation and namespace deletion. Before the - // runner is deleted, it must be removed from this map. - // - // TODO: This is temporary and as such is highly NOT optimized. - typedef unordered_set<Runner*> RunnerSet; - static RunnerSet nonCachedRunners; - - // How many cursors have timed out? - static long long numberTimedOut; - - // This must be held when modifying any static member. - static boost::recursive_mutex& ccmutex; - /** * Initialization common between both constructors for the ClientCursor. */ void init(); - /** - * Allocates a new CursorId. - * Called from init(...). Assumes ccmutex held. - */ - static CursorId allocCursorId_inlock(); - - /** - * Find the ClientCursor with the provided ID. Optionally warn if it's not found. - * Assumes ccmutex is held. - */ - static ClientCursor* find_inlock(CursorId id, bool warn = true); - - /** - * Delete the ClientCursor with the provided ID. masserts if the cursor is pinned. - */ - static void _erase_inlock(ClientCursor* cursor); - // // ClientCursor-specific data, independent of the underlying execution type. // @@ -263,8 +173,10 @@ namespace mongo { // The namespace we're operating on. string _ns; - // The database we're operating on. - Database* _db; + const Collection* _collection; + + // if we've added it to the total open counter yet + bool _countedYet; // How many objects have been returned by the find() so far? int _pos; @@ -303,18 +215,20 @@ namespace mongo { * against two getMore requests on the same cursor executing at the same time - which might be * bad. That should never happen, but if a client driver had a bug, it could (or perhaps some * sort of attack situation). + * Must have a read lock on the collection already */ class ClientCursorPin : boost::noncopyable { public: - ClientCursorPin( long long cursorid ); + ClientCursorPin( const Collection* collection, long long cursorid ); ~ClientCursorPin(); - // This just releases the pin, does not delete the underlying. + // This just releases the pin, does not delete the underlying + // unless ownership has passed to us after kill void release(); // Call this to delete the underlying ClientCursor. void deleteUnderlying(); ClientCursor *c() const; private: - CursorId _cursorid; + ClientCursor* _cursor; }; /** thread for timing out old cursors */ diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 1da4c5828dc..eebc3fb3330 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1215,7 +1215,9 @@ namespace mongo { auto_ptr<RangePreserver> rangePreserver; { Client::ReadContext ctx(config.ns); - rangePreserver.reset(new RangePreserver(config.ns)); + Collection* collection = ctx.ctx().db()->getCollection( config.ns ); + if ( collection ) + rangePreserver.reset(new RangePreserver(collection)); // Get metadata before we check our version, to make sure it doesn't increment // in the meantime. Need to do this in the same lock scope as the block. diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index f249e2f8174..fa3a083a53d 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -33,6 +33,7 @@ #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/privilege.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/commands.h" @@ -102,9 +103,12 @@ namespace { // These are all no-ops for PipelineRunners virtual void setYieldPolicy(YieldPolicy policy) {} virtual void invalidate(const DiskLoc& dl, InvalidationType type) {} - virtual void kill() {} + virtual void kill() { + _pipeline->output()->kill(); + } virtual void saveState() {} virtual bool restoreState() { return true; } + virtual const Collection* collection() { return NULL; } /** * Make obj the next object returned by getNext(). @@ -160,45 +164,80 @@ namespace { return true; } - static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) { + static void handleCursorCommand(const string& ns, + intrusive_ptr<Pipeline>& pPipeline, + BSONObj& cmdObj, + BSONObjBuilder& result) { + + scoped_ptr<ClientCursorPin> pin; + string cursorNs = ns; + + { + // Set up cursor + Client::ReadContext ctx(ns); + Collection* collection = ctx.ctx().db()->getCollection( ns ); + if ( collection ) { + ClientCursor* cc = new ClientCursor(collection, + new PipelineRunner(pPipeline)); + // enable special locking and ns deletion behavior + cc->isAggCursor = true; + + pin.reset( new ClientCursorPin( collection, cc->cursorid() ) ); + + // we need this after cursor may have been deleted + cursorNs = cc->ns(); + } + } + BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize"); const long long batchSize = batchSizeElem.isNumber() ? batchSizeElem.numberLong() : 101; // same as query - ClientCursorPin pin(id); - ClientCursor* cursor = pin.c(); - - massert(16958, "Cursor shouldn't have been deleted", - cursor); + ClientCursor* cursor = NULL; + PipelineRunner* runner = NULL; + if ( pin ) { + cursor = pin->c(); + massert(16958, + "Cursor shouldn't have been deleted", + cursor); + verify(cursor->isAggCursor); + + runner = dynamic_cast<PipelineRunner*>(cursor->getRunner()); + verify(runner); + } - verify(cursor->isAggCursor); - PipelineRunner* runner = dynamic_cast<PipelineRunner*>(cursor->getRunner()); - verify(runner); try { - const string cursorNs = cursor->ns(); // we need this after cursor may have been deleted // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. BSONArrayBuilder resultsArray; const int byteLimit = MaxBytesToReturnToClientAtOnce; BSONObj next; - for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineRunner may be very expensive so we don't do it - // when batchSize is 0 since that indicates a desire for a fast return. - if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { - pin.deleteUnderlying(); - id = 0; - cursor = NULL; // make it an obvious error to use cursor after this point - break; + if ( runner ) { + for (int objCount = 0; objCount < batchSize; objCount++) { + // The initial getNext() on a PipelineRunner may be very expensive so we don't + // do it when batchSize is 0 since that indicates a desire for a fast return. + if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { + pin->deleteUnderlying(); + cursor = NULL; // make it an obvious error to use cursor after this point + break; + } + + if (resultsArray.len() + next.objsize() > byteLimit) { + // too big. next will be the first doc in the second batch + runner->pushBack(next); + break; + } + + resultsArray.append(next); } - - if (resultsArray.len() + next.objsize() > byteLimit) { - // too big. next will be the first doc in the second batch - runner->pushBack(next); - break; - } - - resultsArray.append(next); + } + else { + // this is to ensure that side-effects such as $out occur, + // and that an empty output set is the correct result of this pipeline + invariant( pPipeline.get() ); + invariant( pPipeline->output() ); + invariant( !pPipeline->output()->getNext() ); } if (cursor) { @@ -208,14 +247,19 @@ namespace { } BSONObjBuilder cursorObj(result.subobjStart("cursor")); - cursorObj.append("id", id); + if ( cursor ) + cursorObj.append("id", cursor->cursorid() ); + else + cursorObj.append("id", 0LL ); cursorObj.append("ns", cursorNs); cursorObj.append("firstBatch", resultsArray.arr()); cursorObj.done(); } catch (...) { // Clean up cursor on way out of scope. - pin.deleteUnderlying(); + if ( pin ) { + pin->deleteUnderlying(); + } throw; } } @@ -278,6 +322,7 @@ namespace { // This does the mongod-specific stuff like creating a cursor PipelineD::prepareCursorSource(pPipeline, pCtx); + pPipeline->stitch(); if (pPipeline->isExplain()) { @@ -286,16 +331,7 @@ namespace { } if (isCursorCommand(cmdObj)) { - CursorId id; - { - // Set up cursor - Client::ReadContext ctx(ns); - ClientCursor* cc = new ClientCursor(new PipelineRunner(pPipeline)); - cc->isAggCursor = true; // enable special locking and ns deletion behavior - id = cc->cursorid(); - } - - handleCursorCommand(id, cmdObj, result); + handleCursorCommand(ns, pPipeline, cmdObj, result); } else { pPipeline->run(result); diff --git a/src/mongo/db/index/btree_based_access_method.cpp b/src/mongo/db/index/btree_based_access_method.cpp index 150e1e35a47..0df6c12adad 100644 --- a/src/mongo/db/index/btree_based_access_method.cpp +++ b/src/mongo/db/index/btree_based_access_method.cpp @@ -210,7 +210,7 @@ namespace mongo { return Status::OK(); } - DiskLoc BtreeBasedAccessMethod::findSingle( const BSONObj& key ) { + DiskLoc BtreeBasedAccessMethod::findSingle( const BSONObj& key ) const { DiskLoc head = _btreeState->head(); Record* record = _btreeState->recordStore()->recordFor( head ); diff --git a/src/mongo/db/index/btree_based_access_method.h b/src/mongo/db/index/btree_based_access_method.h index 76cd8bef706..a99db4db6a1 100644 --- a/src/mongo/db/index/btree_based_access_method.h +++ b/src/mongo/db/index/btree_based_access_method.h @@ -94,7 +94,7 @@ namespace mongo { virtual Status validate(int64_t* numKeys); // XXX: consider migrating callers to use IndexCursor instead - virtual DiskLoc findSingle( const BSONObj& key ); + virtual DiskLoc findSingle( const BSONObj& key ) const; // exposed for testing, used for bulk commit static ExternalSortComparison* getComparison(int version, diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 09f6a569bb7..c443de1c1b1 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -532,7 +532,7 @@ namespace mongo { verify( n < 30000 ); } - int found = ClientCursor::eraseIfAuthorized(n, (long long *) x); + int found = CollectionCursorCache::eraseCursorGlobalIfAuthorized(n, (long long *) x); if ( logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1)) || found != n ) { LOG( found == n ? 1 : 0 ) << "killcursors: found " << found << " of " << n << endl; @@ -561,7 +561,6 @@ namespace mongo { /* important: kill all open cursors on the database */ string prefix(db); prefix += '.'; - ClientCursor::invalidate(prefix.c_str()); dbHolderW().erase( db, path ); ctx->_clear(); @@ -788,7 +787,7 @@ namespace mongo { // because it may now be out of sync with the client's iteration state. // SERVER-7952 // TODO Temporary code, see SERVER-4563 for a cleanup overview. - ClientCursor::erase( cursorid ); + CollectionCursorCache::eraseCursorGlobal( cursorid ); } ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) ); ok = false; @@ -1063,7 +1062,7 @@ namespace { } void DBDirectClient::killCursor( long long id ) { - ClientCursor::erase( id ); + CollectionCursorCache::eraseCursorGlobal( id ); } HostAndPort DBDirectClient::_clientHost = HostAndPort( "0.0.0.0" , 0 ); diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 4e466e4f189..4c7b3bc61c3 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -707,10 +707,7 @@ namespace mongo { if (!damages.empty() ) { // Broadcast the mutation so that query results stay correct. - ClientCursor::invalidateDocument(nsString.ns(), - collection->details(), - loc, - INVALIDATION_MUTATION); + collection->cursorCache()->invalidateDocument(loc, INVALIDATION_MUTATION); collection->details()->paddingFits(); diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index d4c21fca2fd..60ec8772db1 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -63,6 +63,12 @@ namespace mongo { } } + void DocumentSource::kill() { + if ( pSource ) { + pSource->kill(); + } + } + void DocumentSource::serializeToArray(vector<Value>& array, bool explain) const { Value entry = serialize(explain); if (!entry.missing()) { diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index a6bd0e8727b..8965516ab55 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -76,6 +76,11 @@ namespace mongo { virtual void dispose(); /** + * See ClientCursor::kill() + */ + virtual void kill(); + + /** Get the source's name. @returns the string name of the source as a constant string; @@ -345,6 +350,7 @@ namespace mongo { virtual bool coalesce(const intrusive_ptr<DocumentSource>& nextSource); virtual bool isValidInitialSource() const { return true; } virtual void dispose(); + virtual void kill(); /** * Create a document source based on a passed-in cursor. @@ -423,6 +429,7 @@ namespace mongo { string _ns; // namespace CursorId _cursorId; + bool _killed; }; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 6be57a990d3..3388c31bcb3 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -30,6 +30,8 @@ #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/clientcursor.h" #include "mongo/db/instance.h" #include "mongo/db/pipeline/document.h" @@ -64,9 +66,21 @@ namespace mongo { return out; } + void DocumentSourceCursor::kill() { + _killed = true; + _cursorId = 0; + } + void DocumentSourceCursor::dispose() { if (_cursorId) { - ClientCursor::erase(_cursorId); + Lock::DBRead lk(_ns); + Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false); + Collection* collection = ctx.db()->getCollection( _ns ); + if ( collection ) { + ClientCursor* cc = collection->cursorCache()->find( _cursorId ); + if ( cc ) + collection->cursorCache()->deregisterCursor( cc ); + } _cursorId = 0; } @@ -74,6 +88,11 @@ namespace mongo { } void DocumentSourceCursor::loadBatch() { + + Lock::DBRead lk(_ns); + + uassert( 17361, "collection or index disappeared when cursor yielded", !_killed ); + if (!_cursorId) { dispose(); return; @@ -81,10 +100,11 @@ namespace mongo { // We have already validated the sharding version when we constructed the cursor // so we shouldn't check it again. - Lock::DBRead lk(_ns); Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false); + Collection* collection = ctx.db()->getCollection( _ns ); + uassert( 17358, "Collection dropped.", collection ); - ClientCursorPin pin(_cursorId); + ClientCursorPin pin(collection, _cursorId); ClientCursor* cursor = pin.c(); uassert(16950, "Cursor deleted. Was the collection or database dropped?", @@ -211,8 +231,10 @@ namespace { { Lock::DBRead lk(_ns); Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false); + Collection* collection = ctx.db()->getCollection( _ns ); + uassert( 17362, "Collection dropped.", collection ); - ClientCursorPin pin(_cursorId); + ClientCursorPin pin(collection, _cursorId); ClientCursor* cursor = pin.c(); uassert(17135, "Cursor deleted. Was the collection or database dropped?", @@ -258,6 +280,7 @@ namespace { , _docsAddedToBatches(0) , _ns(ns) , _cursorId(cursorId) + , _killed(false) {} intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index e9424098e5a..4d11f8eed29 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -116,7 +116,7 @@ namespace mongo { void addInitialSource(intrusive_ptr<DocumentSource> source); /// The source that represents the output. Returns a non-owning pointer. - DocumentSource* output() { return sources.back().get(); } + DocumentSource* output() { invariant( !sources.empty() ); return sources.back().get(); } /// Returns true if this pipeline only uses features that work in mongos. bool canRunInMongos() const; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index de595ebaf28..a3ce12edd18 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -141,6 +141,16 @@ namespace { // Create the necessary context to use a Runner, including taking a namespace read lock. // Note: this may throw if the sharding version for this connection is out of date. Client::ReadContext context(fullName); + Collection* collection = context.ctx().db()->getCollection(fullName); + if ( !collection ) { + intrusive_ptr<DocumentSource> source(DocumentSourceBsonArray::create(BSONObj(), + pExpCtx)); + while (!sources.empty() && source->coalesce(sources.front())) { + sources.pop_front(); + } + pPipeline->addInitialSource( source ); + return; + } // Create the Runner. // @@ -205,8 +215,9 @@ namespace { } // Now wrap the Runner in ClientCursor - auto_ptr<ClientCursor> cursor( - new ClientCursor(runner.release(), QueryOption_NoCursorTimeout)); + auto_ptr<ClientCursor> cursor(new ClientCursor(collection, + runner.release(), + QueryOption_NoCursorTimeout)); verify(cursor->getRunner()); CursorId cursorId = cursor->cursorid(); diff --git a/src/mongo/db/query/cached_plan_runner.cpp b/src/mongo/db/query/cached_plan_runner.cpp index 0139c0bdacf..27d32fae16f 100644 --- a/src/mongo/db/query/cached_plan_runner.cpp +++ b/src/mongo/db/query/cached_plan_runner.cpp @@ -45,11 +45,13 @@ namespace mongo { - CachedPlanRunner::CachedPlanRunner(CanonicalQuery* canonicalQuery, + CachedPlanRunner::CachedPlanRunner(const Collection* collection, + CanonicalQuery* canonicalQuery, QuerySolution* solution, PlanStage* root, WorkingSet* ws) - : _canonicalQuery(canonicalQuery), + : _collection(collection), + _canonicalQuery(canonicalQuery), _solution(solution), _exec(new PlanExecutor(ws, root)), _alreadyProduced(false), @@ -127,6 +129,7 @@ namespace mongo { void CachedPlanRunner::kill() { _killed = true; + _collection = NULL; _exec->kill(); if (NULL != _backupPlan.get()) { _backupPlan->kill(); diff --git a/src/mongo/db/query/cached_plan_runner.h b/src/mongo/db/query/cached_plan_runner.h index 5522f49756c..fc0af460188 100644 --- a/src/mongo/db/query/cached_plan_runner.h +++ b/src/mongo/db/query/cached_plan_runner.h @@ -58,7 +58,8 @@ namespace mongo { * Takes ownership of all arguments. * XXX: what args should this really take? probably a cachekey as well? */ - CachedPlanRunner(CanonicalQuery* canonicalQuery, QuerySolution* solution, + CachedPlanRunner(const Collection* collection, + CanonicalQuery* canonicalQuery, QuerySolution* solution, PlanStage* root, WorkingSet* ws); virtual ~CachedPlanRunner(); @@ -79,6 +80,7 @@ namespace mongo { virtual void kill(); + virtual const Collection* collection() { return _collection; } /** * Returns OK, allocating and filling in '*explain' with details of the cached * plan. Caller takes ownership of '*explain'. Otherwise, return a status describing @@ -94,6 +96,8 @@ namespace mongo { private: void updateCache(); + const Collection* _collection; + boost::scoped_ptr<CanonicalQuery> _canonicalQuery; boost::scoped_ptr<QuerySolution> _solution; boost::scoped_ptr<PlanExecutor> _exec; diff --git a/src/mongo/db/query/eof_runner.h b/src/mongo/db/query/eof_runner.h index e155b174293..154e0524644 100644 --- a/src/mongo/db/query/eof_runner.h +++ b/src/mongo/db/query/eof_runner.h @@ -69,6 +69,9 @@ namespace mongo { virtual void kill(); + // this can return NULL since we never yield or anything over it + virtual Collection* collection() { return NULL; } + /** * Always returns OK, allocating and filling in '*explain' with a fake ("zeroed") * collection scan plan. Caller owns '*explain', though. diff --git a/src/mongo/db/query/get_runner.cpp b/src/mongo/db/query/get_runner.cpp index b34993f3c3f..f230f4c95dc 100644 --- a/src/mongo/db/query/get_runner.cpp +++ b/src/mongo/db/query/get_runner.cpp @@ -243,7 +243,8 @@ namespace mongo { WorkingSet* ws; PlanStage* root; verify(StageBuilder::build(*qs, &root, &ws)); - CachedPlanRunner* cpr = new CachedPlanRunner(canonicalQuery.release(), qs, + CachedPlanRunner* cpr = new CachedPlanRunner(collection, + canonicalQuery.release(), qs, root, ws); if (NULL != backupQs) { @@ -277,9 +278,11 @@ namespace mongo { // We cannot figure out how to answer the query. Should this ever happen? if (0 == solutions.size()) { - return Status(ErrorCodes::BadValue, - "error processing query: " + canonicalQuery->toString() + - " No query solutions"); + return Status(ErrorCodes::BadValue, + str::stream() + << "error processing query: " + << canonicalQuery->toString() + << " No query solutions"); } if (1 == solutions.size()) { @@ -289,12 +292,13 @@ namespace mongo { verify(StageBuilder::build(*solutions[0], &root, &ws)); // And, run the plan. - *out = new SingleSolutionRunner(canonicalQuery.release(), solutions[0], root, ws); + *out = new SingleSolutionRunner(collection, + canonicalQuery.release(),solutions[0], root, ws); return Status::OK(); } else { // Many solutions. Let the MultiPlanRunner pick the best, update the cache, and so on. - auto_ptr<MultiPlanRunner> mpr(new MultiPlanRunner(canonicalQuery.release())); + auto_ptr<MultiPlanRunner> mpr(new MultiPlanRunner(collection,canonicalQuery.release())); for (size_t i = 0; i < solutions.size(); ++i) { WorkingSet* ws; PlanStage* root; @@ -453,7 +457,7 @@ namespace mongo { WorkingSet* ws; PlanStage* root; verify(StageBuilder::build(*soln, &root, &ws)); - *out = new SingleSolutionRunner(cq, soln, root, ws); + *out = new SingleSolutionRunner(collection, cq, soln, root, ws); return Status::OK(); } @@ -481,7 +485,7 @@ namespace mongo { WorkingSet* ws; PlanStage* root; verify(StageBuilder::build(*solutions[i], &root, &ws)); - *out = new SingleSolutionRunner(cq, solutions[i], root, ws); + *out = new SingleSolutionRunner(collection, cq, solutions[i], root, ws); return Status::OK(); } } @@ -498,11 +502,14 @@ namespace mongo { ScopedRunnerRegistration::ScopedRunnerRegistration(Runner* runner) : _runner(runner) { - ClientCursor::registerRunner(_runner); + // Collection can be null for EOFRunner, or other places where registration is not needed + if ( _runner->collection() ) + _runner->collection()->cursorCache()->registerRunner( runner ); } ScopedRunnerRegistration::~ScopedRunnerRegistration() { - ClientCursor::deregisterRunner(_runner); + if ( _runner->collection() ) + _runner->collection()->cursorCache()->deregisterRunner( _runner ); } } // namespace mongo diff --git a/src/mongo/db/query/idhack_runner.cpp b/src/mongo/db/query/idhack_runner.cpp index 64b7861a7a7..afdb82b45ed 100644 --- a/src/mongo/db/query/idhack_runner.cpp +++ b/src/mongo/db/query/idhack_runner.cpp @@ -44,7 +44,7 @@ namespace mongo { - IDHackRunner::IDHackRunner(Collection* collection, CanonicalQuery* query) + IDHackRunner::IDHackRunner(const Collection* collection, CanonicalQuery* query) : _collection(collection), _key(query->getQueryObj()["_id"].wrap()), _query(query), @@ -67,18 +67,18 @@ namespace mongo { if (_done) { return Runner::RUNNER_EOF; } // Use the index catalog to get the id index. - IndexCatalog* catalog = _collection->getIndexCatalog(); + const IndexCatalog* catalog = _collection->getIndexCatalog(); // Find the index we use. - const IndexDescriptor* idDesc = catalog->findIdIndex(); + IndexDescriptor* idDesc = catalog->findIdIndex(); if (NULL == idDesc) { _done = true; return Runner::RUNNER_EOF; } // XXX: This may not be valid always. See SERVER-12397. - BtreeBasedAccessMethod* accessMethod = - static_cast<BtreeBasedAccessMethod*>(catalog->getIndex(idDesc)); + const BtreeBasedAccessMethod* accessMethod = + static_cast<const BtreeBasedAccessMethod*>(catalog->getIndex(idDesc)); // Look up the key by going directly to the Btree. DiskLoc loc = accessMethod->findSingle( _key ); @@ -175,6 +175,7 @@ namespace mongo { void IDHackRunner::kill() { _killed = true; + _collection = NULL; } Status IDHackRunner::getExplainPlan(TypeExplain** explain) const { diff --git a/src/mongo/db/query/idhack_runner.h b/src/mongo/db/query/idhack_runner.h index 7b268e94c32..bbe4d767b1c 100644 --- a/src/mongo/db/query/idhack_runner.h +++ b/src/mongo/db/query/idhack_runner.h @@ -49,8 +49,8 @@ namespace mongo { class IDHackRunner : public Runner { public: - /** Takes ownership of all the arguments. */ - IDHackRunner(Collection* collection, CanonicalQuery* query); + /** Takes ownership of all the arguments -collection. */ + IDHackRunner(const Collection* collection, CanonicalQuery* query); IDHackRunner(Collection* collection, const BSONObj& key); @@ -72,13 +72,15 @@ namespace mongo { virtual void kill(); + virtual const Collection* collection() { return _collection; } + /** */ virtual Status getExplainPlan(TypeExplain** explain) const; private: // Not owned here. - Collection* _collection; + const Collection* _collection; // The value to match against the _id field. BSONObj _key; diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index be09789d8bb..fa5597dfbc1 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -62,7 +62,7 @@ namespace mongo { /** * Return a collection scan. Caller owns pointer. */ - static Runner* collectionScan(const StringData& ns, + static Runner* collectionScan(const StringData& ns, // TODO: make this a Collection* const Direction direction = FORWARD, const DiskLoc startLoc = DiskLoc()) { Collection* collection = cc().database()->getCollection(ns); @@ -81,7 +81,7 @@ namespace mongo { WorkingSet* ws = new WorkingSet(); CollectionScan* cs = new CollectionScan(params, ws, NULL); - return new InternalRunner(ns.toString(), cs, ws); + return new InternalRunner(collection, cs, ws); } /** @@ -92,9 +92,8 @@ namespace mongo { const BSONObj& startKey, const BSONObj& endKey, bool endKeyInclusive, Direction direction = FORWARD, int options = 0) { - verify(descriptor); - - const NamespaceString& ns = collection->ns(); + invariant(collection); + invariant(descriptor); IndexScanParams params; params.descriptor = descriptor; @@ -108,10 +107,10 @@ namespace mongo { IndexScan* ix = new IndexScan(params, ws, NULL); if (IXSCAN_FETCH & options) { - return new InternalRunner(ns.toString(), new FetchStage(ws, ix, NULL), ws); + return new InternalRunner(collection, new FetchStage(ws, ix, NULL), ws); } else { - return new InternalRunner(ns.toString(), ix, ws); + return new InternalRunner(collection, ix, ws); } } }; diff --git a/src/mongo/db/query/internal_runner.cpp b/src/mongo/db/query/internal_runner.cpp index f5d1b392830..504e33e2056 100644 --- a/src/mongo/db/query/internal_runner.cpp +++ b/src/mongo/db/query/internal_runner.cpp @@ -28,10 +28,11 @@ #include "mongo/db/query/internal_runner.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/diskloc.h" -#include "mongo/db/jsobj.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/working_set.h" +#include "mongo/db/jsobj.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/explain_plan.h" #include "mongo/db/query/plan_executor.h" @@ -39,14 +40,16 @@ namespace mongo { - /** Takes ownership of all arguments. */ - InternalRunner::InternalRunner(const std::string& ns, PlanStage* root, WorkingSet* ws) - : _ns(ns), _exec(new PlanExecutor(ws, root)), _policy(Runner::YIELD_MANUAL) { + InternalRunner::InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws) + : _collection(collection), + _exec(new PlanExecutor(ws, root)), + _policy(Runner::YIELD_MANUAL) { + invariant( collection ); } InternalRunner::~InternalRunner() { - if (Runner::YIELD_AUTO == _policy) { - ClientCursor::deregisterRunner(this); + if (Runner::YIELD_AUTO == _policy && _collection) { + _collection->cursorCache()->deregisterRunner(this); } } @@ -67,7 +70,7 @@ namespace mongo { } const std::string& InternalRunner::ns() { - return _ns; + return _collection->ns().ns(); } void InternalRunner::invalidate(const DiskLoc& dl, InvalidationType type) { @@ -78,13 +81,15 @@ namespace mongo { // No-op. if (_policy == policy) { return; } + invariant( _collection ); + if (Runner::YIELD_AUTO == policy) { // Going from manual to auto. - ClientCursor::registerRunner(this); + _collection->cursorCache()->registerRunner(this); } else { // Going from auto to manual. - ClientCursor::deregisterRunner(this); + _collection->cursorCache()->deregisterRunner(this); } _policy = policy; @@ -93,6 +98,7 @@ namespace mongo { void InternalRunner::kill() { _exec->kill(); + _collection = NULL; } Status InternalRunner::getExplainPlan(TypeExplain** explain) const { diff --git a/src/mongo/db/query/internal_runner.h b/src/mongo/db/query/internal_runner.h index 699814e1a2a..441224d7c2b 100644 --- a/src/mongo/db/query/internal_runner.h +++ b/src/mongo/db/query/internal_runner.h @@ -56,8 +56,8 @@ namespace mongo { class InternalRunner : public Runner { public: - /** Takes ownership of all arguments. */ - InternalRunner(const string& ns, PlanStage* root, WorkingSet* ws); + /** Takes ownership of root and ws. */ + InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws); virtual ~InternalRunner(); @@ -77,6 +77,8 @@ namespace mongo { virtual void kill(); + virtual const Collection* collection() { return _collection; } + /** * Returns OK, allocating and filling in '*explain' with details of the plan used by * this runner. Caller takes ownership of '*explain'. Otherwise, return a status @@ -89,7 +91,7 @@ namespace mongo { virtual Status getExplainPlan(TypeExplain** explain) const; private: - std::string _ns; + const Collection* _collection; boost::scoped_ptr<PlanExecutor> _exec; Runner::YieldPolicy _policy; diff --git a/src/mongo/db/query/multi_plan_runner.cpp b/src/mongo/db/query/multi_plan_runner.cpp index 5a74050e948..6716d15d2dc 100644 --- a/src/mongo/db/query/multi_plan_runner.cpp +++ b/src/mongo/db/query/multi_plan_runner.cpp @@ -47,8 +47,9 @@ namespace mongo { - MultiPlanRunner::MultiPlanRunner(CanonicalQuery* query) - : _killed(false), + MultiPlanRunner::MultiPlanRunner(const Collection* collection, CanonicalQuery* query) + : _collection(collection), + _killed(false), _failure(false), _failureCount(0), _policy(Runner::YIELD_MANUAL), @@ -208,6 +209,7 @@ namespace mongo { void MultiPlanRunner::kill() { _killed = true; + _collection = NULL; if (NULL != _bestPlan) { _bestPlan->kill(); } } diff --git a/src/mongo/db/query/multi_plan_runner.h b/src/mongo/db/query/multi_plan_runner.h index ca4910e1f6f..d1c13c616d8 100644 --- a/src/mongo/db/query/multi_plan_runner.h +++ b/src/mongo/db/query/multi_plan_runner.h @@ -57,7 +57,7 @@ namespace mongo { /** * Takes ownership of query. */ - MultiPlanRunner(CanonicalQuery* query); + MultiPlanRunner(const Collection* collection, CanonicalQuery* query); virtual ~MultiPlanRunner(); /** @@ -93,6 +93,8 @@ namespace mongo { virtual void kill(); + virtual const Collection* collection() { return _collection; } + /** * Returns OK, allocating and filling in '*explain' with details of the "winner" * plan. Caller takes ownership of '*explain'. Otherwise, return a status describing @@ -110,6 +112,8 @@ namespace mongo { void allPlansSaveState(); void allPlansRestoreState(); + const Collection* _collection; + // Were we killed by an invalidate? bool _killed; diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index b40a1f37073..26807bf1b3d 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -126,6 +126,8 @@ namespace mongo { // This is a read lock. scoped_ptr<Client::ReadContext> ctx(new Client::ReadContext(ns)); + Collection* collection = ctx->ctx().db()->getCollection(ns); + uassert( 17356, "collection dropped between getMore calls", collection ); QLOG() << "running getMore in new system, cursorid " << cursorid << endl; @@ -138,7 +140,7 @@ namespace mongo { // A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it // doesn't time out. Also informs ClientCursor that there is somebody actively holding the // CC, so don't delete it. - ClientCursorPin ccPin(cursorid); + ClientCursorPin ccPin(collection, cursorid); ClientCursor* cc = ccPin.c(); // These are set in the QueryResult msg we return. @@ -291,13 +293,17 @@ namespace mongo { return qr; } - Status getOplogStartHack(CanonicalQuery* cq, Runner** runnerOut) { + Status getOplogStartHack(Collection* collection, CanonicalQuery* cq, Runner** runnerOut) { + if ( collection == NULL ) + return Status(ErrorCodes::InternalError, + "getOplogStartHack called with a NULL collection" ); + // Make an oplog start finding stage. WorkingSet* oplogws = new WorkingSet(); OplogStart* stage = new OplogStart(cq->ns(), cq->root(), oplogws); // Takes ownership of ws and stage. - auto_ptr<InternalRunner> runner(new InternalRunner(cq->ns(), stage, oplogws)); + auto_ptr<InternalRunner> runner(new InternalRunner(collection, stage, oplogws)); runner->setYieldPolicy(Runner::YIELD_AUTO); // The stage returns a DiskLoc of where to start. @@ -325,7 +331,7 @@ namespace mongo { WorkingSet* ws = new WorkingSet(); CollectionScan* cs = new CollectionScan(params, ws, cq->root()); // Takes ownership of cq, cs, ws. - *runnerOut = new SingleSolutionRunner(cq, NULL, cs, ws); + *runnerOut = new SingleSolutionRunner(collection, cq, NULL, cs, ws); return Status::OK(); } @@ -381,6 +387,7 @@ namespace mongo { // where-specific parsing code assumes we have a lock and creates execution machinery that // requires it. Client::ReadContext ctx(q.ns); + Collection* collection = ctx.ctx().db()->getCollection( ns ); // Parse the qm into a CanonicalQuery. CanonicalQuery* cq; @@ -412,11 +419,11 @@ namespace mongo { // Otherwise we go through the selection of which runner is most suited to the // query + run-time context at hand. Status status = Status::OK(); - if (ctx.ctx().db()->getCollection(cq->ns()) == NULL) { + if (collection == NULL) { rawRunner = new EOFRunner(cq, cq->ns()); } else if (pq.hasOption(QueryOption_OplogReplay)) { - status = getOplogStartHack(cq, &rawRunner); + status = getOplogStartHack(collection, cq, &rawRunner); } else { // Takes ownership of cq. @@ -617,7 +624,8 @@ namespace mongo { // Allocate a new ClientCursor. We don't have to worry about leaking it as it's // inserted into a global map by its ctor. - ClientCursor* cc = new ClientCursor(runner.get(), cq->getParsed().getOptions(), + ClientCursor* cc = new ClientCursor(collection, runner.get(), + cq->getParsed().getOptions(), cq->getParsed().getFilter()); ccId = cc->cursorid(); diff --git a/src/mongo/db/query/runner.h b/src/mongo/db/query/runner.h index 01ad9988338..4e39c6c5f41 100644 --- a/src/mongo/db/query/runner.h +++ b/src/mongo/db/query/runner.h @@ -34,6 +34,7 @@ namespace mongo { + class Collection; class DiskLoc; class TypeExplain; @@ -162,7 +163,7 @@ namespace mongo { * The runner must take any actions required to continue operating correctly, including * broadcasting the invalidation request to the PlanStage tree being run. * - * Called from ClientCursor::invalidateDocument. + * Called from CollectionCursorCache::invalidateDocument. * * See db/invalidation_type.h for InvalidationType. */ @@ -192,6 +193,11 @@ namespace mongo { virtual const string& ns() = 0; /** + * Return the Collection that the query is running over. + */ + virtual const Collection* collection() = 0; + + /** * Returns OK, allocating and filling '*explain' with a description of the chosen plan. * Caller takes onwership of '*explain'. Otherwise, returns false with a detailed error * status. diff --git a/src/mongo/db/query/runner_yield_policy.h b/src/mongo/db/query/runner_yield_policy.h index 4058108c3bd..4f164c55b01 100644 --- a/src/mongo/db/query/runner_yield_policy.h +++ b/src/mongo/db/query/runner_yield_policy.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/clientcursor.h" +#include "mongo/db/catalog/collection.h" #include "mongo/util/elapsed_tracker.h" namespace mongo { @@ -41,7 +42,9 @@ namespace mongo { if (NULL != _runnerYielding) { // We were destructed mid-yield. Since we're being used to yield a runner, we have // to deregister the runner. - ClientCursor::deregisterRunner(_runnerYielding); + if ( _runnerYielding->collection() ) { + _runnerYielding->collection()->cursorCache()->deregisterRunner(_runnerYielding); + } } } @@ -56,7 +59,9 @@ namespace mongo { * Provided runner MUST be YIELD_MANUAL. */ bool yieldAndCheckIfOK(Runner* runner, Record* record = NULL) { - verify(runner); + invariant(runner); + invariant(runner->collection()); // XXX: should this just return true? + int micros = ClientCursor::suggestYieldMicros(); // If micros is not positive, no point in yielding, nobody waiting. @@ -66,9 +71,16 @@ namespace mongo { // If micros > 0, we should yield. runner->saveState(); _runnerYielding = runner; - ClientCursor::registerRunner(_runnerYielding); + + runner->collection()->cursorCache()->registerRunner( _runnerYielding ); + staticYield(micros, record); - ClientCursor::deregisterRunner(_runnerYielding); + + if ( runner->collection() ) { + // if the runner was killed, runner->collection() will return NULL + // so we don't deregister as it was done when killed + runner->collection()->cursorCache()->registerRunner( _runnerYielding ); + } _runnerYielding = NULL; _elapsedTracker.resetLastTime(); return runner->restoreState(); diff --git a/src/mongo/db/query/single_solution_runner.cpp b/src/mongo/db/query/single_solution_runner.cpp index 3196fc831b9..40c6e875257 100644 --- a/src/mongo/db/query/single_solution_runner.cpp +++ b/src/mongo/db/query/single_solution_runner.cpp @@ -40,11 +40,13 @@ namespace mongo { - SingleSolutionRunner::SingleSolutionRunner(CanonicalQuery* canonicalQuery, + SingleSolutionRunner::SingleSolutionRunner(const Collection* collection, + CanonicalQuery* canonicalQuery, QuerySolution* soln, PlanStage* root, WorkingSet* ws) - : _canonicalQuery(canonicalQuery), + : _collection( collection ), + _canonicalQuery(canonicalQuery), _solution(soln), _exec(new PlanExecutor(ws, root)) { } @@ -84,6 +86,7 @@ namespace mongo { void SingleSolutionRunner::kill() { _exec->kill(); + _collection = NULL; } Status SingleSolutionRunner::getExplainPlan(TypeExplain** explain) const { diff --git a/src/mongo/db/query/single_solution_runner.h b/src/mongo/db/query/single_solution_runner.h index 7062a70c153..83aa9f8f869 100644 --- a/src/mongo/db/query/single_solution_runner.h +++ b/src/mongo/db/query/single_solution_runner.h @@ -52,8 +52,9 @@ namespace mongo { class SingleSolutionRunner : public Runner { public: - /** Takes ownership of all the arguments. */ - SingleSolutionRunner(CanonicalQuery* canonicalQuery, QuerySolution* soln, + /** Takes ownership of all the arguments except collection */ + SingleSolutionRunner(const Collection* collection, + CanonicalQuery* canonicalQuery, QuerySolution* soln, PlanStage* root, WorkingSet* ws); virtual ~SingleSolutionRunner(); @@ -74,6 +75,7 @@ namespace mongo { virtual void kill(); + virtual const Collection* collection() { return _collection; } /** * Returns OK, allocating and filling in '*explain' with the details of the plan used * by this runner. Caller takes ownership of '*explain'. Otherwise, return a status @@ -82,6 +84,8 @@ namespace mongo { virtual Status getExplainPlan(TypeExplain** explain) const; private: + const Collection* _collection; + boost::scoped_ptr<CanonicalQuery> _canonicalQuery; boost::scoped_ptr<QuerySolution> _solution; boost::scoped_ptr<PlanExecutor> _exec; diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 1be9f169c74..e7c524128f0 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -150,6 +150,11 @@ namespace mongo { void RangeDeleterDBEnv::getCursorIds(const StringData& ns, std::set<CursorId>* openCursors) { - ClientCursor::find(ns.toString(), *openCursors); + Client::ReadContext ctx(ns.toString()); + Collection* collection = ctx.ctx().db()->getCollection( ns ); + if ( !collection ) + return; + + collection->cursorCache()->getCursorIds( openCursors ); } } diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h index ed44dc2bc33..dac32c30fd6 100644 --- a/src/mongo/db/range_preserver.h +++ b/src/mongo/db/range_preserver.h @@ -48,12 +48,13 @@ namespace mongo { * object does. The ClientCursorPin guarantees that the underlying ClientCursor is not * deleted until this object goes out of scope. */ - RangePreserver(const string& ns) { + RangePreserver(const Collection* collection) { + invariant( collection ); // Not a memory leak. Cached in a static structure by CC's ctor. - ClientCursor* cc = new ClientCursor(ns); + ClientCursor* cc = new ClientCursor(collection); // Pin keeps the CC from being deleted while it's in scope. We delete it ourselves. - _pin.reset(new ClientCursorPin(cc->cursorid())); + _pin.reset(new ClientCursorPin(collection, cc->cursorid())); } ~RangePreserver() { diff --git a/src/mongo/db/restapi.cpp b/src/mongo/db/restapi.cpp index 2585681796d..30f82f4c591 100644 --- a/src/mongo/db/restapi.cpp +++ b/src/mongo/db/restapi.cpp @@ -275,7 +275,7 @@ namespace mongo { ss << "<pre>\n"; ss << "time to get readlock: " << millis << "ms\n"; ss << "# databases: " << dbHolder().sizeInfo() << '\n'; - ss << "# Cursors: " << ClientCursor::numCursors() << '\n'; + ss << "# Cursors: " << ClientCursor::totalOpen() << '\n'; ss << "replication: "; if( *replInfo ) ss << "\nreplInfo: " << replInfo << "\n\n"; diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 48d13049d9e..3d8ffa28a71 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -32,6 +32,8 @@ #include <boost/thread/thread.hpp> +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/interrupt_status_mongod.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" @@ -158,12 +160,15 @@ namespace DocumentSourceTests { { _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; } protected: void createSource() { - Client::ReadContext ctx (ns); + Client::WriteContext ctx (ns); + Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns ); CanonicalQuery* cq; uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq)); Runner* runner; uassertStatusOK(getRunner(cq, &runner)); - auto_ptr<ClientCursor> cc(new ClientCursor(runner, QueryOption_NoCursorTimeout)); + auto_ptr<ClientCursor> cc(new ClientCursor(collection, + runner, + QueryOption_NoCursorTimeout)); verify(cc->getRunner()); cc->getRunner()->setYieldPolicy(Runner::YIELD_AUTO); CursorId cursorId = cc->cursorid(); @@ -196,9 +201,13 @@ namespace DocumentSourceTests { } private: void assertNumClientCursors( unsigned int expected ) { - set<CursorId> nsCursors; - ClientCursor::find( ns, nsCursors ); - ASSERT_EQUALS( expected, nsCursors.size() ); + Client::ReadContext ctx( ns ); + Collection* collection = ctx.ctx().db()->getCollection( ns ); + if ( !collection ) { + ASSERT( 0 == expected ); + return; + } + ASSERT_EQUALS( expected, collection->cursorCache()->numCursors() ); } }; diff --git a/src/mongo/dbtests/query_multi_plan_runner.cpp b/src/mongo/dbtests/query_multi_plan_runner.cpp index 114cf792c3e..e12241ece6c 100644 --- a/src/mongo/dbtests/query_multi_plan_runner.cpp +++ b/src/mongo/dbtests/query_multi_plan_runner.cpp @@ -130,7 +130,7 @@ namespace QueryMultiPlanRunner { CanonicalQuery* cq = NULL; verify(CanonicalQuery::canonicalize(ns(), BSON("foo" << 7), &cq).isOK()); verify(NULL != cq); - MultiPlanRunner mpr(cq); + MultiPlanRunner mpr(ctx.ctx().db()->getCollection(ns()),cq); mpr.addPlan(createQuerySolution(), firstRoot.release(), firstWs.release()); mpr.addPlan(createQuerySolution(), secondRoot.release(), secondWs.release()); diff --git a/src/mongo/dbtests/query_single_solution_runner.cpp b/src/mongo/dbtests/query_single_solution_runner.cpp index 77af61b685f..b11bc35aaed 100644 --- a/src/mongo/dbtests/query_single_solution_runner.cpp +++ b/src/mongo/dbtests/query_single_solution_runner.cpp @@ -77,7 +77,8 @@ namespace QuerySingleSolutionRunner { * * The caller takes ownership of the returned SingleSolutionRunner*. */ - SingleSolutionRunner* makeCollScanRunner(BSONObj& filterObj) { + SingleSolutionRunner* makeCollScanRunner(Client::Context& ctx, + BSONObj& filterObj) { CollectionScanParams csparams; csparams.ns = ns(); csparams.direction = CollectionScanParams::FORWARD; @@ -94,8 +95,11 @@ namespace QuerySingleSolutionRunner { verify(NULL != cq); // Hand the plan off to the single solution runner. - SingleSolutionRunner* ssr = new SingleSolutionRunner(cq, new QuerySolution(), - root.release(), ws.release()); + SingleSolutionRunner* ssr = new SingleSolutionRunner(ctx.db()->getCollection(ns()), + cq, + new QuerySolution(), + root.release(), + ws.release()); return ssr; } @@ -112,7 +116,8 @@ namespace QuerySingleSolutionRunner { * * The caller takes ownership of the returned SingleSolutionRunner*. */ - SingleSolutionRunner* makeIndexScanRunner(BSONObj& indexSpec, int start, int end) { + SingleSolutionRunner* makeIndexScanRunner(Client::Context& context, + BSONObj& indexSpec, int start, int end) { // Build the index scan stage. IndexScanParams ixparams; ixparams.descriptor = getIndex(indexSpec); @@ -130,12 +135,33 @@ namespace QuerySingleSolutionRunner { verify(NULL != cq); // Hand the plan off to the single solution runner. - return new SingleSolutionRunner(cq, new QuerySolution(), + return new SingleSolutionRunner(context.db()->getCollection(ns()), + cq, new QuerySolution(), root.release(), ws.release()); } static const char* ns() { return "unittests.QueryStageSingleSolutionRunner"; } + size_t numCursors() { + Client::ReadContext ctx( ns() ); + Collection* collection = ctx.ctx().db()->getCollection( ns() ); + if ( !collection ) + return 0; + return collection->cursorCache()->numCursors(); + } + + void registerRunner( Runner* runner ) { + Client::ReadContext ctx( ns() ); + Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns() ); + return collection->cursorCache()->registerRunner( runner ); + } + + void deregisterRunner( Runner* runner ) { + Client::ReadContext ctx( ns() ); + Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns() ); + return collection->cursorCache()->deregisterRunner( runner ); + } + private: IndexDescriptor* getIndex(const BSONObj& obj) { Collection* collection = cc().database()->getCollection( ns() ); @@ -159,10 +185,10 @@ namespace QuerySingleSolutionRunner { insert(BSON("_id" << 2)); BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(filterObj)); + scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj)); // Set up autoyielding. - ClientCursor::registerRunner(ssr.get()); + registerRunner(ssr.get()); ssr->setYieldPolicy(Runner::YIELD_AUTO); BSONObj objOut; @@ -174,7 +200,7 @@ namespace QuerySingleSolutionRunner { dropCollection(); ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL)); - ClientCursor::deregisterRunner(ssr.get()); + deregisterRunner(ssr.get()); } }; @@ -192,10 +218,10 @@ namespace QuerySingleSolutionRunner { BSONObj indexSpec = BSON("a" << 1); addIndex(indexSpec); - scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(indexSpec, 7, 10)); + scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 7, 10)); // Set up autoyielding. - ClientCursor::registerRunner(ssr.get()); + registerRunner(ssr.get()); ssr->setYieldPolicy(Runner::YIELD_AUTO); BSONObj objOut; @@ -207,7 +233,7 @@ namespace QuerySingleSolutionRunner { dropCollection(); ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL)); - ClientCursor::deregisterRunner(ssr.get()); + deregisterRunner(ssr.get()); } }; @@ -263,7 +289,7 @@ namespace QuerySingleSolutionRunner { setupCollection(); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(filterObj)); + scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj)); BSONObj objOut; ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); @@ -290,7 +316,7 @@ namespace QuerySingleSolutionRunner { addIndex(indexSpec); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(indexSpec, 2, 5)); + scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 2, 5)); BSONObj objOut; ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); @@ -319,16 +345,17 @@ namespace QuerySingleSolutionRunner { insert(BSON("a" << 1 << "b" << 1)); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(filterObj); + SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); // Make a client cursor from the runner. - new ClientCursor(ssr, 0, BSONObj()); + new ClientCursor(ctx.ctx().db()->getCollection(ns()), + ssr, 0, BSONObj()); // There should be one cursor before invalidation, // and zero cursors after invalidation. - ASSERT_EQUALS(1U, ClientCursor::numCursors()); - ClientCursor::invalidate(ns()); - ASSERT_EQUALS(0U, ClientCursor::numCursors()); + ASSERT_EQUALS(1U, numCursors()); + ctx.ctx().db()->getCollection( ns() )->cursorCache()->invalidateAll(false); + ASSERT_EQUALS(0U, numCursors()); } }; @@ -342,18 +369,21 @@ namespace QuerySingleSolutionRunner { Client::WriteContext ctx(ns()); insert(BSON("a" << 1 << "b" << 1)); + Collection* collection = ctx.ctx().db()->getCollection(ns()); + BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(filterObj); + SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); // Make a client cursor from the runner. - ClientCursor* cc = new ClientCursor(ssr, 0, BSONObj()); - ClientCursorPin ccPin(cc->cursorid()); + ClientCursor* cc = new ClientCursor(collection, + ssr, 0, BSONObj()); + ClientCursorPin ccPin(collection,cc->cursorid()); // If the cursor is pinned, it sticks around, // even after invalidation. - ASSERT_EQUALS(1U, ClientCursor::numCursors()); - ClientCursor::invalidate(ns()); - ASSERT_EQUALS(1U, ClientCursor::numCursors()); + ASSERT_EQUALS(1U, numCursors()); + collection->cursorCache()->invalidateAll(false); + ASSERT_EQUALS(1U, numCursors()); // The invalidation should have killed the runner. BSONObj objOut; @@ -362,7 +392,7 @@ namespace QuerySingleSolutionRunner { // Deleting the underlying cursor should cause the // number of cursors to return to 0. ccPin.deleteUnderlying(); - ASSERT_EQUALS(0U, ClientCursor::numCursors()); + ASSERT_EQUALS(0U, numCursors()); } }; @@ -380,19 +410,20 @@ namespace QuerySingleSolutionRunner { { Client::ReadContext ctx(ns()); + Collection* collection = ctx.ctx().db()->getCollection(ns()); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(filterObj); + SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); // Make a client cursor from the runner. - new ClientCursor(ssr, 0, BSONObj()); + new ClientCursor(collection, ssr, 0, BSONObj()); } // There should be one cursor before timeout, // and zero cursors after timeout. - ASSERT_EQUALS(1U, ClientCursor::numCursors()); - ClientCursor::idleTimeReport(600001); - ASSERT_EQUALS(0U, ClientCursor::numCursors()); + ASSERT_EQUALS(1U, numCursors()); + CollectionCursorCache::timeoutCursorsGlobal(600001); + ASSERT_EQUALS(0U, numCursors()); } }; diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 12a25aba5ec..10f3807bc54 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -238,7 +238,7 @@ namespace QueryTests { // Check internal server handoff to getmore. Lock::DBWrite lk(ns); Client::Context ctx( ns ); - ClientCursorPin clientCursor( cursorId ); + ClientCursorPin clientCursor( ctx.db()->getCollection(ns), cursorId ); // pq doesn't exist if it's a runner inside of the clientcursor. // ASSERT( clientCursor.c()->pq ); // ASSERT_EQUALS( 2, clientCursor.c()->pq->getNumToReturn() ); @@ -290,9 +290,11 @@ namespace QueryTests { killCurrentOp.reset(); // Check that the cursor has been removed. - set<CursorId> ids; - ClientCursor::find( ns, ids ); - ASSERT_EQUALS( 0U, ids.count( cursorId ) ); + { + Client::ReadContext ctx( ns ); + ASSERT( 0 == ctx.ctx().db()->getCollection( ns )->cursorCache()->numCursors() ); + } + ASSERT_FALSE( CollectionCursorCache::eraseCursorGlobal( cursorId ) ); // Check that a subsequent get more fails with the cursor removed. ASSERT_THROWS( client().getMore( ns, cursorId ), UserException ); @@ -337,10 +339,12 @@ namespace QueryTests { cursor->getCursorId() ); // Check that the cursor still exists - set<CursorId> ids; - ClientCursor::find( ns, ids ); - ASSERT_EQUALS( 1U, ids.count( cursorId ) ); - + { + Client::ReadContext ctx( ns ); + ASSERT( 1 == ctx.ctx().db()->getCollection( ns )->cursorCache()->numCursors() ); + ASSERT( ctx.ctx().db()->getCollection( ns )->cursorCache()->find( cursorId ) ); + } + // Check that the cursor can be iterated until all documents are returned. while( cursor->more() ) { cursor->next(); @@ -597,7 +601,7 @@ namespace QueryTests { ASSERT_EQUALS( two, c->next()["ts"].Date() ); long long cursorId = c->getCursorId(); - ClientCursorPin clientCursor( cursorId ); + ClientCursorPin clientCursor( ctx.db()->getCollection( ns ), cursorId ); ASSERT_EQUALS( three.millis, clientCursor.c()->getSlaveReadTill().asDate() ); } }; @@ -1096,6 +1100,14 @@ namespace QueryTests { return (int) client().count( ns() ); } + size_t numCursorsOpen() { + Client::ReadContext ctx( _ns ); + Collection* collection = ctx.ctx().db()->getCollection( _ns ); + if ( !collection ) + return 0; + return collection->cursorCache()->numCursors(); + } + const char * ns() { return _ns.c_str(); } @@ -1297,7 +1309,7 @@ namespace QueryTests { } void run() { - unsigned startNumCursors = ClientCursor::numCursors(); + size_t startNumCursors = numCursorsOpen(); BSONObj info; ASSERT( client().runCommand( "unittests", BSON( "create" << "querytests.findingstart" << "capped" << true << "$nExtents" << 5 << "autoIndexId" << false ), info ) ); @@ -1317,7 +1329,7 @@ namespace QueryTests { } } - ASSERT_EQUALS( startNumCursors, ClientCursor::numCursors() ); + ASSERT_EQUALS( startNumCursors, numCursorsOpen() ); } }; @@ -1330,7 +1342,7 @@ namespace QueryTests { FindingStartStale() : CollectionBase( "findingstart" ) {} void run() { - unsigned startNumCursors = ClientCursor::numCursors(); + size_t startNumCursors = numCursorsOpen(); // Check OplogReplay mode with missing collection. auto_ptr< DBClientCursor > c0 = client().query( ns(), QUERY( "ts" << GTE << 50 ), 0, 0, 0, QueryOption_OplogReplay ); @@ -1350,7 +1362,7 @@ namespace QueryTests { ASSERT_EQUALS( 100, c->next()[ "ts" ].numberInt() ); // Check that no persistent cursors outlast our queries above. - ASSERT_EQUALS( startNumCursors, ClientCursor::numCursors() ); + ASSERT_EQUALS( startNumCursors, numCursorsOpen() ); } }; @@ -1412,7 +1424,9 @@ namespace QueryTests { ClientCursor *clientCursor = 0; { - ClientCursorPin clientCursorPointer( cursorId ); + Client::ReadContext ctx( ns() ); + ClientCursorPin clientCursorPointer( ctx.ctx().db()->getCollection( ns() ), + cursorId ); clientCursor = clientCursorPointer.c(); // clientCursorPointer destructor unpins the cursor. } @@ -1449,7 +1463,7 @@ namespace QueryTests { { Client::WriteContext ctx( ns() ); - ClientCursorPin pinCursor( cursorId ); + ClientCursorPin pinCursor( ctx.ctx().db()->getCollection( ns() ), cursorId ); ASSERT_THROWS( client().killCursor( cursorId ), MsgAssertionException ); string expectedAssertion = diff --git a/src/mongo/dbtests/runner_registry.cpp b/src/mongo/dbtests/runner_registry.cpp index 30cd9971997..9b53453afd4 100644 --- a/src/mongo/dbtests/runner_registry.cpp +++ b/src/mongo/dbtests/runner_registry.cpp @@ -71,10 +71,19 @@ namespace RunnerRegistry { CanonicalQuery* cq; ASSERT(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); // Owns all args - auto_ptr<Runner> run(new SingleSolutionRunner(cq, NULL, scan.release(), ws.release())); + auto_ptr<Runner> run(new SingleSolutionRunner(_ctx->ctx().db()->getCollection( ns() ), + cq, NULL, scan.release(), ws.release())); return run.release(); } + void registerRunner( Runner* runner ) { + _ctx->ctx().db()->getOrCreateCollection( ns() )->cursorCache()->registerRunner( runner ); + } + + void deregisterRunner( Runner* runner ) { + _ctx->ctx().db()->getOrCreateCollection( ns() )->cursorCache()->deregisterRunner( runner ); + } + int N() { return 50; } static const char* ns() { return "unittests.RunnerRegistryDiskLocInvalidation"; } @@ -99,7 +108,7 @@ namespace RunnerRegistry { // Register it. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // At this point it's safe to yield. forceYield would do that. Let's now simulate some // stuff going on in the yield. @@ -110,7 +119,7 @@ namespace RunnerRegistry { // At this point, we're done yielding. We recover our lock. // Unregister the runner. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); // And clean up anything that happened before. run->restoreState(); @@ -141,13 +150,13 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop a collection that's not ours. _client.dropCollection("unittests.someboguscollection"); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL)); @@ -155,13 +164,13 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop our collection. _client.dropCollection(ns()); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); // Runner was killed. @@ -186,13 +195,13 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop all indices. _client.dropIndexes(ns()); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); // Runner was killed. @@ -217,13 +226,13 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop a specific index. _client.dropIndex(ns(), BSON("foo" << 1)); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); // Runner was killed. @@ -246,7 +255,7 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB // requires a "global write lock." @@ -255,7 +264,7 @@ namespace RunnerRegistry { _ctx.reset(new Client::WriteContext(ns())); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL)); @@ -263,7 +272,7 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - ClientCursor::registerRunner(run.get()); + registerRunner(run.get()); // Drop our DB. Once again, must give up the lock. _ctx.reset(); @@ -271,7 +280,7 @@ namespace RunnerRegistry { _ctx.reset(new Client::WriteContext(ns())); // Unregister and restore state. - ClientCursor::deregisterRunner(run.get()); + deregisterRunner(run.get()); run->restoreState(); // Runner was killed. |