summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
committerEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
commit7349ba70a0e68627dc322113c561afe3a9ed37a1 (patch)
treeafe597cf004f191288999d8efad785b42833809d
parented58b0dfe564253067b4cab11ab75477b7e48388 (diff)
downloadmongo-7349ba70a0e68627dc322113c561afe3a9ed37a1.tar.gz
SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache
-rw-r--r--jstests/aggregation/testshard1.js2
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/cap.cpp2
-rw-r--r--src/mongo/db/catalog/collection.cpp10
-rw-r--r--src/mongo/db/catalog/collection.h8
-rw-r--r--src/mongo/db/catalog/collection_cursor_cache.cpp459
-rw-r--r--src/mongo/db/catalog/collection_cursor_cache.h129
-rw-r--r--src/mongo/db/catalog/database.cpp6
-rw-r--r--src/mongo/db/catalog/index_catalog.cpp14
-rw-r--r--src/mongo/db/catalog/index_catalog.h5
-rw-r--r--src/mongo/db/clientcursor.cpp512
-rw-r--r--src/mongo/db/clientcursor.h138
-rw-r--r--src/mongo/db/commands/mr.cpp4
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp114
-rw-r--r--src/mongo/db/index/btree_based_access_method.cpp2
-rw-r--r--src/mongo/db/index/btree_based_access_method.h2
-rw-r--r--src/mongo/db/instance.cpp7
-rw-r--r--src/mongo/db/ops/update.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source.h7
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp31
-rw-r--r--src/mongo/db/pipeline/pipeline.h2
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp15
-rw-r--r--src/mongo/db/query/cached_plan_runner.cpp7
-rw-r--r--src/mongo/db/query/cached_plan_runner.h6
-rw-r--r--src/mongo/db/query/eof_runner.h3
-rw-r--r--src/mongo/db/query/get_runner.cpp27
-rw-r--r--src/mongo/db/query/idhack_runner.cpp11
-rw-r--r--src/mongo/db/query/idhack_runner.h8
-rw-r--r--src/mongo/db/query/internal_plans.h13
-rw-r--r--src/mongo/db/query/internal_runner.cpp24
-rw-r--r--src/mongo/db/query/internal_runner.h8
-rw-r--r--src/mongo/db/query/multi_plan_runner.cpp6
-rw-r--r--src/mongo/db/query/multi_plan_runner.h6
-rw-r--r--src/mongo/db/query/new_find.cpp22
-rw-r--r--src/mongo/db/query/runner.h8
-rw-r--r--src/mongo/db/query/runner_yield_policy.h20
-rw-r--r--src/mongo/db/query/single_solution_runner.cpp7
-rw-r--r--src/mongo/db/query/single_solution_runner.h8
-rw-r--r--src/mongo/db/range_deleter_db_env.cpp7
-rw-r--r--src/mongo/db/range_preserver.h7
-rw-r--r--src/mongo/db/restapi.cpp2
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp19
-rw-r--r--src/mongo/dbtests/query_multi_plan_runner.cpp2
-rw-r--r--src/mongo/dbtests/query_single_solution_runner.cpp91
-rw-r--r--src/mongo/dbtests/querytests.cpp44
-rw-r--r--src/mongo/dbtests/runner_registry.cpp39
47 files changed, 1143 insertions, 733 deletions
diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js
index 259c6b8ce1a..a71fb2db6be 100644
--- a/jstests/aggregation/testshard1.js
+++ b/jstests/aggregation/testshard1.js
@@ -206,7 +206,7 @@ assert.eq(db.ts1.find().sort({_id:1}).toArray(),
outCollection.find().sort({_id:1}).toArray());
// Make sure we error out if $out collection is sharded
-assertErrorCode(db.outCollection, [{$out: db.ts1.getName()}], 17017);
+assertErrorCode(outCollection, [{$out: db.ts1.getName()}], 17017);
db.literal.save({dollar:false});
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 78e41bbf7c9..2fd5a3d74ef 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -588,6 +588,7 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/catalog/index_create.cpp",
"db/catalog/collection.cpp",
"db/structure/collection_compact.cpp",
+ "db/catalog/collection_cursor_cache.cpp",
"db/catalog/collection_info_cache.cpp",
"db/structure/collection_iterator.cpp",
"db/catalog/database_holder.cpp",
diff --git a/src/mongo/db/cap.cpp b/src/mongo/db/cap.cpp
index b0fb51d69b5..8492c02bfd6 100644
--- a/src/mongo/db/cap.cpp
+++ b/src/mongo/db/cap.cpp
@@ -466,8 +466,8 @@ namespace mongo {
}
// Clear all references to this namespace.
- ClientCursor::invalidate( ns );
Collection* collection = cc().database()->getCollection( ns );
+ collection->cursorCache()->invalidateAll( false );
verify( collection->details() == this );
collection->infoCache()->reset();
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index e135f6b8e95..018c4c71480 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -75,7 +75,8 @@ namespace mongo {
: _ns( fullNS ),
_recordStore( _ns.ns() ),
_infoCache( this ),
- _indexCatalog( this, details ) {
+ _indexCatalog( this, details ),
+ _cursorCache( fullNS ) {
_details = details;
_database = database;
_recordStore.init( _details,
@@ -232,7 +233,7 @@ namespace mongo {
}
/* check if any cursors point to us. if so, advance them. */
- ClientCursor::invalidateDocument(_ns.ns(), _details, loc, INVALIDATION_DELETION);
+ _cursorCache.invalidateDocument(loc, INVALIDATION_DELETION);
_indexCatalog.unindexRecord( doc, loc, noWarn);
@@ -304,8 +305,7 @@ namespace mongo {
// unindex old record, don't delete
// this way, if inserting new doc fails, we can re-index this one
- ClientCursor::invalidateDocument(_ns.ns(), _details, oldLocation,
- INVALIDATION_DELETION);
+ _cursorCache.invalidateDocument(oldLocation, INVALIDATION_DELETION);
_indexCatalog.unindexRecord( objOld, oldLocation, true );
if ( debug ) {
@@ -350,7 +350,7 @@ namespace mongo {
}
// Broadcast the mutation so that query results stay correct.
- ClientCursor::invalidateDocument(_ns.ns(), _details, oldLocation, INVALIDATION_MUTATION);
+ _cursorCache.invalidateDocument(oldLocation, INVALIDATION_MUTATION);
// update in place
int sz = objNew.objsize();
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index d2ec9ac5cc9..2472eed68b1 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -33,6 +33,7 @@
#include <string>
#include "mongo/base/string_data.h"
+#include "mongo/db/catalog/collection_cursor_cache.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/diskloc.h"
#include "mongo/db/exec/collection_scan_common.h"
@@ -124,6 +125,8 @@ namespace mongo {
const IndexCatalog* getIndexCatalog() const { return &_indexCatalog; }
IndexCatalog* getIndexCatalog() { return &_indexCatalog; }
+ CollectionCursorCache* cursorCache() const { return &_cursorCache; }
+
bool requiresIdIndex() const;
BSONObj docFor( const DiskLoc& loc );
@@ -227,6 +230,11 @@ namespace mongo {
CollectionInfoCache _infoCache;
IndexCatalog _indexCatalog;
+ // this is mutable because read only users of the Collection class
+ // use it keep state. This seems valid as const correctness of Collection
+ // should be about the data.
+ mutable CollectionCursorCache _cursorCache;
+
friend class Database;
friend class FlatIterator;
friend class CappedIterator;
diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp
new file mode 100644
index 00000000000..4a10e0664dd
--- /dev/null
+++ b/src/mongo/db/catalog/collection_cursor_cache.cpp
@@ -0,0 +1,459 @@
+// collection_cursor_cache.h
+
+/**
+* Copyright (C) 2013 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/db/catalog/collection_cursor_cache.h"
+
+#include "mongo/db/audit.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/client.h"
+#include "mongo/db/query/runner.h"
+#include "mongo/platform/random.h"
+#include "mongo/util/startup_test.h"
+
+namespace mongo {
+
+ namespace {
+ unsigned idFromCursorId( CursorId id ) {
+ uint64_t x = static_cast<uint64_t>(id);
+ x = x >> 32;
+ return static_cast<unsigned>( x );
+ }
+
+ CursorId cursorIdFromParts( unsigned collection,
+ unsigned cursor ) {
+ CursorId x = static_cast<CursorId>( collection ) << 32;
+ x |= cursor;
+ return x;
+ }
+
+ class IdWorkTest : public StartupTest {
+ public:
+ void _run( unsigned a, unsigned b) {
+ CursorId x = cursorIdFromParts( a, b );
+ invariant( a == idFromCursorId( x ) );
+ CursorId y = cursorIdFromParts( a, b + 1 );
+ invariant( x != y );
+ }
+
+ void run() {
+ _run( 123, 456 );
+ _run( 0xdeadbeef, 0xcafecafe );
+ _run( 0, 0 );
+ _run( 99999999, 999 );
+ _run( 0xFFFFFFFF, 1 );
+ _run( 0xFFFFFFFF, 0 );
+ _run( 0xFFFFFFFF, 0xFFFFFFFF );
+ }
+ } idWorkTest;
+ }
+
+ class GlobalCursorIdCache {
+ public:
+
+ GlobalCursorIdCache();
+ ~GlobalCursorIdCache();
+
+ /**
+ * this gets called when a CollectionCursorCache gets created
+ * @return the id the CollectionCursorCache should use when generating
+ * cursor ids
+ */
+ unsigned created( const std::string& ns );
+
+ /**
+ * called by CollectionCursorCache when its going away
+ */
+ void destroyed( unsigned id, const std::string& ns );
+
+ /**
+ * works globally
+ */
+ bool eraseCursor( CursorId id, bool checkAuth );
+
+ void appendStats( BSONObjBuilder& builder );
+
+ std::size_t timeoutCursors( unsigned millisSinceLastCall );
+
+ int64_t nextSeed();
+
+ private:
+ SimpleMutex _mutex;
+
+ typedef unordered_map<unsigned,string> Map;
+ Map _idToNS;
+ unsigned _nextId;
+
+ SecureRandom* _secureRandom;
+ } _globalCursorIdCache;
+
+ GlobalCursorIdCache::GlobalCursorIdCache()
+ : _mutex( "GlobalCursorIdCache" ),
+ _nextId( 0 ),
+ _secureRandom( NULL ) {
+ }
+
+ GlobalCursorIdCache::~GlobalCursorIdCache() {
+ // we're just going to leak everything, as it doesn't matter
+ }
+
+ int64_t GlobalCursorIdCache::nextSeed() {
+ SimpleMutex::scoped_lock lk( _mutex );
+ if ( !_secureRandom )
+ _secureRandom = SecureRandom::create();
+ return _secureRandom->nextInt64();
+ }
+
+ unsigned GlobalCursorIdCache::created( const std::string& ns ) {
+ static const unsigned MAX_IDS = 1000 * 1000 * 1000;
+
+ SimpleMutex::scoped_lock lk( _mutex );
+
+ fassert( 17359, _idToNS.size() < MAX_IDS );
+
+ for ( unsigned i = 0; i <= MAX_IDS; i++ ) {
+ unsigned id = ++_nextId;
+ if ( id == 0 )
+ continue;
+ if ( _idToNS.count( id ) > 0 )
+ continue;
+ _idToNS[id] = ns;
+ return id;
+ }
+
+ invariant( false );
+ }
+
+ void GlobalCursorIdCache::destroyed( unsigned id, const std::string& ns ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+ invariant( ns == _idToNS[id] );
+ _idToNS.erase( id );
+ }
+
+ bool GlobalCursorIdCache::eraseCursor(CursorId id, bool checkAuth) {
+ string ns;
+ {
+ SimpleMutex::scoped_lock lk( _mutex );
+ unsigned nsid = idFromCursorId( id );
+ Map::const_iterator it = _idToNS.find( nsid );
+ if ( it == _idToNS.end() ) {
+ return false;
+ }
+ ns = it->second;
+ }
+
+ NamespaceString nss( ns );
+
+ if ( checkAuth ) {
+ AuthorizationSession* as = cc().getAuthorizationSession();
+ bool isAuthorized = as->isAuthorizedForActionsOnNamespace(nss,
+ ActionType::killCursors);
+ if ( !isAuthorized ) {
+ audit::logKillCursorsAuthzCheck( currentClient.get(),
+ nss,
+ id,
+ ErrorCodes::Unauthorized );
+ return false;
+ }
+ }
+
+ Client::ReadContext ctx( ns );
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
+ ClientCursor* cursor = NULL;
+ if ( collection ) {
+ cursor = collection->cursorCache()->find( id );
+ }
+
+ if ( !cursor ) {
+ if ( checkAuth )
+ audit::logKillCursorsAuthzCheck( currentClient.get(),
+ nss,
+ id,
+ ErrorCodes::CursorNotFound );
+ return false;
+ }
+
+ if ( checkAuth )
+ audit::logKillCursorsAuthzCheck( currentClient.get(),
+ nss,
+ id,
+ ErrorCodes::OK );
+
+ massert( 16089,
+ str::stream() << "Cannot kill active cursor " << id,
+ cursor->pinValue() < 100 );
+
+ cursor->kill();
+ collection->cursorCache()->deregisterCursor( cursor );
+ return true;
+ }
+
+ std::size_t GlobalCursorIdCache::timeoutCursors( unsigned millisSinceLastCall ) {
+ vector<string> todo;
+ {
+ SimpleMutex::scoped_lock lk( _mutex );
+ for ( Map::const_iterator i = _idToNS.begin(); i != _idToNS.end(); ++i )
+ todo.push_back( i->second );
+ }
+
+ size_t totalTimedOut = 0;
+
+ for ( unsigned i = 0; i < todo.size(); i++ ) {
+ const string& ns = todo[i];
+ Client::ReadContext context( ns );
+ Collection* collection = context.ctx().db()->getCollection( ns );
+ if ( collection == NULL ) {
+ continue;
+ }
+
+ totalTimedOut += collection->cursorCache()->timeoutCursors( millisSinceLastCall );
+
+ }
+
+ return totalTimedOut;
+ }
+
+ // ---
+
+ std::size_t CollectionCursorCache::timeoutCursorsGlobal( unsigned millisSinceLastCall ) {
+ return _globalCursorIdCache.timeoutCursors( millisSinceLastCall );
+ }
+
+ int CollectionCursorCache::eraseCursorGlobalIfAuthorized(int n, long long* ids) {
+ int numDeleted = 0;
+ for ( int i = 0; i < n; i++ ) {
+ if ( eraseCursorGlobalIfAuthorized( ids[i] ) )
+ numDeleted++;
+ if ( inShutdown() )
+ break;
+ }
+ return numDeleted;
+ }
+ bool CollectionCursorCache::eraseCursorGlobalIfAuthorized(CursorId id) {
+ return _globalCursorIdCache.eraseCursor( id, true );
+ }
+ bool CollectionCursorCache::eraseCursorGlobal( CursorId id ) {
+ return _globalCursorIdCache.eraseCursor( id, false );
+ }
+
+
+ // --------------------------
+
+
+ CollectionCursorCache::CollectionCursorCache( const StringData& ns )
+ : _ns( ns.toString() ),
+ _mutex( "CollectionCursorCache" ) {
+ _collectionCacheRuntimeId = _globalCursorIdCache.created( _ns );
+ _random.reset( new PseudoRandom( _globalCursorIdCache.nextSeed() ) );
+ }
+
+ CollectionCursorCache::~CollectionCursorCache() {
+ invalidateAll( true );
+ _globalCursorIdCache.destroyed( _collectionCacheRuntimeId, _ns );
+ }
+
+ void CollectionCursorCache::invalidateAll( bool collectionGoingAway ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+
+ for ( RunnerSet::iterator it = _nonCachedRunners.begin();
+ it != _nonCachedRunners.end();
+ ++it ) {
+
+ // we kill the runner, but it deletes itself
+ Runner* runner = *it;
+ runner->kill();
+ invariant( runner->collection() == NULL );
+ }
+ _nonCachedRunners.clear();
+
+ if ( collectionGoingAway ) {
+ // we're going to wipe out the world
+ for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) {
+ ClientCursor* cc = i->second;
+
+ cc->kill();
+
+ invariant( cc->getRunner() == NULL || cc->getRunner()->collection() == NULL );
+
+ // If there is a pinValue >= 100, somebody is actively using the CC and we do
+ // not delete it. Instead we notify the holder that we killed it. The holder
+ // will then delete the CC.
+ // pinvalue is <100, so there is nobody actively holding the CC. We can
+ // safely delete it as nobody is holding the CC.
+
+ if (cc->pinValue() < 100) {
+ delete cc;
+ }
+ }
+ }
+ else {
+ CursorMap newMap;
+
+ // collection will still be around, just all Runners are invalid
+ for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) {
+ ClientCursor* cc = i->second;
+
+ if (cc->isAggCursor) {
+ // Aggregation cursors don't have their lifetime bound to the underlying collection.
+ newMap[i->first] = i->second;
+ continue;
+ }
+
+ // Note that a valid ClientCursor state is "no cursor no runner." This is because
+ // the set of active cursor IDs in ClientCursor is used as representation of query
+ // state. See sharding_block.h. TODO(greg,hk): Move this out.
+ if (NULL == cc->getRunner() ) {
+ newMap.insert( *i );
+ continue;
+ }
+
+ if (cc->pinValue() < 100) {
+ cc->kill();
+ delete cc;
+ }
+ else {
+ // this is pinned, so still alive, so we leave around
+ // we kill the Runner to signal
+ if ( cc->getRunner() )
+ cc->getRunner()->kill();
+ newMap.insert( *i );
+ }
+
+ }
+
+ _cursors = newMap;
+ }
+ }
+
+ void CollectionCursorCache::invalidateDocument( const DiskLoc& dl,
+ InvalidationType type ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+
+ for ( RunnerSet::iterator it = _nonCachedRunners.begin();
+ it != _nonCachedRunners.end();
+ ++it ) {
+
+ Runner* runner = *it;
+ runner->invalidate(dl, type);
+ }
+
+ for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) {
+ Runner* runner = i->second->getRunner();
+ if ( runner ) {
+ runner->invalidate(dl, type);
+ }
+ }
+ }
+
+ std::size_t CollectionCursorCache::timeoutCursors( unsigned millisSinceLastCall ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+
+ vector<ClientCursor*> toDelete;
+
+ for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) {
+ ClientCursor* cc = i->second;
+ if ( cc->shouldTimeout( millisSinceLastCall ) )
+ toDelete.push_back( cc );
+ }
+
+ for ( vector<ClientCursor*>::const_iterator i = toDelete.begin(); i != toDelete.end(); ++i ) {
+ ClientCursor* cc = *i;
+ _deregisterCursor_inlock( cc );
+ cc->kill();
+ delete cc;
+ }
+
+ return toDelete.size();
+ }
+
+ void CollectionCursorCache::registerRunner( Runner* runner ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+ const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner);
+ invariant(result.second); // make sure this was inserted
+ }
+
+ void CollectionCursorCache::deregisterRunner( Runner* runner ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+ _nonCachedRunners.erase( runner );
+ }
+
+ ClientCursor* CollectionCursorCache::find( CursorId id ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+ CursorMap::const_iterator it = _cursors.find( id );
+ if ( it == _cursors.end() )
+ return NULL;
+ return it->second;
+ }
+
+ void CollectionCursorCache::getCursorIds( std::set<CursorId>* openCursors ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+
+ for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) {
+ ClientCursor* cc = i->second;
+ openCursors->insert( cc->cursorid() );
+ }
+ }
+
+ size_t CollectionCursorCache::numCursors(){
+ SimpleMutex::scoped_lock lk( _mutex );
+ return _cursors.size();
+ }
+
+ CursorId CollectionCursorCache::_allocateCursorId_inlock() {
+ for ( int i = 0; i < 10000; i++ ) {
+ unsigned mypart = static_cast<unsigned>( _random->nextInt32() );
+ CursorId id = cursorIdFromParts( _collectionCacheRuntimeId, mypart );
+ if ( _cursors.count( id ) == 0 )
+ return id;
+ }
+ fassertFailed( 17360 );
+ }
+
+ CursorId CollectionCursorCache::registerCursor( ClientCursor* cc ) {
+ invariant( cc );
+ SimpleMutex::scoped_lock lk( _mutex );
+ CursorId id = _allocateCursorId_inlock();
+ _cursors[id] = cc;
+ return id;
+ }
+
+ void CollectionCursorCache::deregisterCursor( ClientCursor* cc ) {
+ SimpleMutex::scoped_lock lk( _mutex );
+ _deregisterCursor_inlock( cc );
+ }
+
+ void CollectionCursorCache::_deregisterCursor_inlock( ClientCursor* cc ) {
+ invariant( cc );
+ CursorId id = cc->cursorid();
+ _cursors.erase( id );
+ }
+
+}
diff --git a/src/mongo/db/catalog/collection_cursor_cache.h b/src/mongo/db/catalog/collection_cursor_cache.h
new file mode 100644
index 00000000000..91bb80e84f6
--- /dev/null
+++ b/src/mongo/db/catalog/collection_cursor_cache.h
@@ -0,0 +1,129 @@
+// collection_cursor_cache.h
+
+/**
+* Copyright (C) 2013 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/diskloc.h"
+#include "mongo/db/invalidation_type.h"
+#include "mongo/platform/unordered_set.h"
+#include "mongo/util/concurrency/mutex.h"
+
+namespace mongo {
+
+ class PseudoRandom;
+ class Runner;
+
+ class CollectionCursorCache {
+ public:
+ CollectionCursorCache( const StringData& ns );
+
+ /**
+ * will kill() all Runner instances it has
+ */
+ ~CollectionCursorCache();
+
+ // -----------------
+
+ /**
+ * @param collectionGoingAway should be tru if the Collection instance is going away
+ * this could be because the db is being closed, or the
+ * collection/db is being dropped.
+ */
+ void invalidateAll( bool collectionGoingAway );
+
+ /**
+ * Broadcast a document invalidation to all relevant Runner(s). invalidateDocument must
+ * called *before* the provided DiskLoc is about to be deleted or mutated.
+ */
+ void invalidateDocument( const DiskLoc& dl,
+ InvalidationType type );
+
+ /*
+ * timesout cursors that have been idle for too long
+ * note: must have a readlock on the collection
+ * @return number timed out
+ */
+ std::size_t timeoutCursors( unsigned millisSinceLastCall );
+
+ // -----------------
+
+ /**
+ * Register a runner so that it can be notified of deletion/invalidation during yields.
+ * Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it
+ * MUST NOT be registered; the two are mutually exclusive.
+ */
+ void registerRunner(Runner* runner);
+
+ /**
+ * Remove a runner from the runner registry.
+ */
+ void deregisterRunner(Runner* runner);
+
+ // -----------------
+
+ CursorId registerCursor( ClientCursor* cc );
+ void deregisterCursor( ClientCursor* cc );
+
+ void getCursorIds( std::set<CursorId>* openCursors );
+ std::size_t numCursors();
+
+ ClientCursor* find( CursorId id );
+
+ // ----------------------
+
+ static int eraseCursorGlobalIfAuthorized( int n, long long* ids );
+ static bool eraseCursorGlobalIfAuthorized( CursorId id );
+
+ static bool eraseCursorGlobal( CursorId id );
+
+ /**
+ * @return number timed out
+ */
+ static std::size_t timeoutCursorsGlobal( unsigned millisSinceLastCall );
+
+ private:
+ CursorId _allocateCursorId_inlock();
+ void _deregisterCursor_inlock( ClientCursor* cc );
+
+ string _ns;
+ unsigned _collectionCacheRuntimeId;
+ scoped_ptr<PseudoRandom> _random;
+
+ SimpleMutex _mutex;
+
+ typedef unordered_set<Runner*> RunnerSet;
+ RunnerSet _nonCachedRunners;
+
+ typedef unordered_map<CursorId,ClientCursor*> CursorMap;
+ CursorMap _cursors;
+ };
+
+}
diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp
index 74c5bccbcbc..f6d6815cbee 100644
--- a/src/mongo/db/catalog/database.cpp
+++ b/src/mongo/db/catalog/database.cpp
@@ -321,7 +321,6 @@ namespace mongo {
verify( collection->_details->getTotalIndexCount() == 0 );
LOG(1) << "\t dropIndexes done" << endl;
- ClientCursor::invalidate( fullns );
Top::global.collectionDropped( fullns );
Status s = _dropNS( fullns );
@@ -361,7 +360,7 @@ namespace mongo {
if ( it == _collections.end() )
return;
- delete it->second;
+ delete it->second; // this also deletes all cursors + runners
_collections.erase( it );
}
@@ -475,9 +474,6 @@ namespace mongo {
_clearCollectionCache_inlock( toNSString );
}
- ClientCursor::invalidate( fromNSString.c_str() );
- ClientCursor::invalidate( toNSString.c_str() );
-
// at this point, we haven't done anything destructive yet
// ----
diff --git a/src/mongo/db/catalog/index_catalog.cpp b/src/mongo/db/catalog/index_catalog.cpp
index 2e40107a044..1ca58122c40 100644
--- a/src/mongo/db/catalog/index_catalog.cpp
+++ b/src/mongo/db/catalog/index_catalog.cpp
@@ -167,7 +167,7 @@ namespace mongo {
<< " ns: " << _collection->ns().ns() );
}
- bool IndexCatalog::_shouldOverridePlugin(const BSONObj& keyPattern) {
+ bool IndexCatalog::_shouldOverridePlugin(const BSONObj& keyPattern) const {
string pluginName = IndexNames::findPluginName(keyPattern);
bool known = IndexNames::isKnownName(pluginName);
@@ -200,7 +200,7 @@ namespace mongo {
return false;
}
- string IndexCatalog::_getAccessMethodName(const BSONObj& keyPattern) {
+ string IndexCatalog::_getAccessMethodName(const BSONObj& keyPattern) const {
if ( _shouldOverridePlugin(keyPattern) ) {
return "";
}
@@ -703,7 +703,7 @@ namespace mongo {
// there may be pointers pointing at keys in the btree(s). kill them.
// TODO: can this can only clear cursors on this index?
- ClientCursor::invalidate( _collection->ns().ns() );
+ _collection->cursorCache()->invalidateAll( false );
// make sure nothing in progress
massert( 17348,
@@ -808,7 +808,7 @@ namespace mongo {
// there may be pointers pointing at keys in the btree(s). kill them.
// TODO: can this can only clear cursors on this index?
- ClientCursor::invalidate( _collection->ns().ns() );
+ _collection->cursorCache()->invalidateAll( false );
// wipe out stats
_collection->infoCache()->reset();
@@ -1076,6 +1076,12 @@ namespace mongo {
return entry->accessMethod();
}
+ const IndexAccessMethod* IndexCatalog::getIndex( const IndexDescriptor* desc ) const {
+ const IndexCatalogEntry* entry = _entries.find( desc );
+ massert( 17357, "cannot find index entry", entry );
+ return entry->accessMethod();
+ }
+
IndexAccessMethod* IndexCatalog::_createAccessMethod( const IndexDescriptor* desc,
IndexCatalogEntry* entry ) {
string type = _getAccessMethodName(desc->keyPattern());
diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h
index 9056b9af975..3cdb8361292 100644
--- a/src/mongo/db/catalog/index_catalog.h
+++ b/src/mongo/db/catalog/index_catalog.h
@@ -100,6 +100,7 @@ namespace mongo {
// never returns NULL
IndexAccessMethod* getIndex( const IndexDescriptor* desc );
+ const IndexAccessMethod* getIndex( const IndexDescriptor* desc ) const;
class IndexIterator {
public:
@@ -251,14 +252,14 @@ namespace mongo {
int _removeFromSystemIndexes( const StringData& indexName );
- bool _shouldOverridePlugin( const BSONObj& keyPattern );
+ bool _shouldOverridePlugin( const BSONObj& keyPattern ) const;
/**
* This differs from IndexNames::findPluginName in that returns the plugin name we *should*
* use, not the plugin name inside of the provided key pattern. To understand when these
* differ, see shouldOverridePlugin.
*/
- string _getAccessMethodName(const BSONObj& keyPattern);
+ string _getAccessMethodName(const BSONObj& keyPattern) const;
IndexDetails* _getIndexDetails( const IndexDescriptor* descriptor ) const;
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 643d1f4aafd..8c6ca3be192 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -34,6 +34,7 @@
#include <time.h>
#include <vector>
+#include "mongo/base/counter.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_set.h"
@@ -55,35 +56,53 @@
namespace mongo {
- ClientCursor::CCById ClientCursor::clientCursorsById;
- boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) );
- long long ClientCursor::numberTimedOut = 0;
- ClientCursor::RunnerSet ClientCursor::nonCachedRunners;
+ static Counter64 cursorStatsOpen; // gauge
+ static Counter64 cursorStatsOpenPinned; // gauge
+ static Counter64 cursorStatsOpenNoTimeout; // gauge
+ static Counter64 cursorStatsTimedOut;
+
+ static ServerStatusMetricField<Counter64> dCursorStatsOpen( "cursor.open.total",
+ &cursorStatsOpen );
+ static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned( "cursor.open.pinned",
+ &cursorStatsOpenPinned );
+ static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout( "cursor.open.noTimeout",
+ &cursorStatsOpenNoTimeout );
+ static ServerStatusMetricField<Counter64> dCursorStatusTimedout( "cursor.timedOut",
+ &cursorStatsTimedOut );
void aboutToDeleteForSharding(const StringData& ns,
const Database* db,
const NamespaceDetails* nsd,
const DiskLoc& dl ); // from s/d_logic.h
- ClientCursor::ClientCursor(Runner* runner, int qopts, const BSONObj query) {
+ long long ClientCursor::totalOpen() {
+ return cursorStatsOpen.get();
+ }
+
+ ClientCursor::ClientCursor(const Collection* collection, Runner* runner,
+ int qopts, const BSONObj query)
+ : _collection( collection ),
+ _countedYet( false ) {
_runner.reset(runner);
_ns = runner->ns();
_query = query;
_queryOptions = qopts;
+ if ( runner->collection() ) {
+ invariant( collection == runner->collection() );
+ }
init();
}
- ClientCursor::ClientCursor(const string& ns)
- : _ns(ns),
+ ClientCursor::ClientCursor(const Collection* collection)
+ : _ns(collection->ns().ns()),
+ _collection(collection),
+ _countedYet( false ),
_queryOptions(QueryOption_NoCursorTimeout) {
-
init();
}
void ClientCursor::init() {
- _db = cc().database();
- verify( _db );
- verify( _db->ownsNS( _ns ) );
+ invariant( _collection );
isAggCursor = false;
@@ -91,18 +110,20 @@ namespace mongo {
_leftoverMaxTimeMicros = 0;
_pinValue = 0;
_pos = 0;
-
+
Lock::assertAtLeastReadLocked(_ns);
if (_queryOptions & QueryOption_NoCursorTimeout) {
// cursors normally timeout after an inactivity period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
++_pinValue;
+ cursorStatsOpenNoTimeout.increment();
}
- recursive_scoped_lock lock(ccmutex);
- _cursorid = allocCursorId_inlock();
- clientCursorsById.insert( make_pair(_cursorid, this) );
+ _cursorid = _collection->cursorCache()->registerCursor( this );
+
+ cursorStatsOpen.increment();
+ _countedYet = true;
}
ClientCursor::~ClientCursor() {
@@ -112,160 +133,30 @@ namespace mongo {
return;
}
- {
- recursive_scoped_lock lock(ccmutex);
- clientCursorsById.erase(_cursorid);
-
- // defensive:
- _cursorid = INVALID_CURSOR_ID;
- _pos = -2;
- _pinValue = 0;
+ if ( _countedYet ) {
+ _countedYet = false;
+ cursorStatsOpen.decrement();
+ if ( _pinValue == 1 )
+ cursorStatsOpenNoTimeout.decrement();
}
- }
-
- void ClientCursor::invalidate(const StringData& ns) {
- Lock::assertWriteLocked(ns);
-
- size_t dot = ns.find( '.' );
- verify( dot != string::npos );
- // first (and only) dot is the last char
- bool isDB = dot == ns.size() - 1;
-
- Database *db = cc().database();
- verify(db);
- verify(ns.startsWith(db->name()));
-
- recursive_scoped_lock cclock(ccmutex);
- // Look at all active non-cached Runners. These are the runners that are in auto-yield mode
- // that are not attached to the the client cursor. For example, all internal runners don't
- // need to be cached -- there will be no getMore.
- for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
- ++it) {
-
- Runner* runner = *it;
- const string& runnerNS = runner->ns();
- if ( ( isDB && StringData(runnerNS).startsWith(ns) ) || ns == runnerNS ) {
- runner->kill();
- }
+ if ( _collection ) {
+ // this could be null if kill() was killed
+ _collection->cursorCache()->deregisterCursor( this );
}
- // Look at all cached ClientCursor(s). The CC may have a Runner, a Cursor, or nothing (see
- // sharding_block.h).
- CCById::const_iterator it = clientCursorsById.begin();
- while (it != clientCursorsById.end()) {
- ClientCursor* cc = it->second;
-
- // Aggregation cursors don't have their lifetime bound to the underlying collection.
- if (cc->isAggCursor) {
- ++it;
- continue;
- }
-
- // We're only interested in cursors over one db.
- if (cc->_db != db) {
- ++it;
- continue;
- }
-
- // Note that a valid ClientCursor state is "no cursor no runner." This is because
- // the set of active cursor IDs in ClientCursor is used as representation of query
- // state. See sharding_block.h. TODO(greg,hk): Move this out.
- if (NULL == cc->_runner.get()) {
- ++it;
- continue;
- }
-
- bool shouldDelete = false;
-
- // We will only delete CCs with runners that are not actively in use. The runners that
- // are actively in use are instead kill()-ed.
- if (NULL != cc->_runner.get()) {
- if (isDB || cc->_runner->ns() == ns) {
- // If there is a pinValue >= 100, somebody is actively using the CC and we do
- // not delete it. Instead we notify the holder that we killed it. The holder
- // will then delete the CC.
- if (cc->_pinValue >= 100) {
- cc->_runner->kill();
- }
- else {
- // pinvalue is <100, so there is nobody actively holding the CC. We can
- // safely delete it as nobody is holding the CC.
- shouldDelete = true;
- }
- }
- }
-
- if (shouldDelete) {
- ClientCursor* toDelete = it->second;
- CursorId id = toDelete->cursorid();
- delete toDelete;
- // We're not following the usual paradigm of saving it, ++it, and deleting the saved
- // 'it' because deleting 'it' might invalidate the next thing in clientCursorsById.
- // TODO: Why?
- it = clientCursorsById.upper_bound(id);
- }
- else {
- ++it;
- }
- }
- }
-
- void ClientCursor::invalidateDocument(const StringData& ns,
- const NamespaceDetails* nsd,
- const DiskLoc& dl,
- InvalidationType type) {
- // TODO: Do we need this pagefault thing still
- NoPageFaultsAllowed npfa;
- recursive_scoped_lock lock(ccmutex);
-
- Database *db = cc().database();
- verify(db);
- aboutToDeleteForSharding( ns, db, nsd, dl );
-
- // Check our non-cached active runner list.
- for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
- ++it) {
-
- Runner* runner = *it;
- if (0 == ns.compare(runner->ns())) {
- runner->invalidate(dl, type);
- }
- }
-
- // TODO: This requires optimization. We walk through *all* CCs and send the delete to every
- // CC open on the db we're deleting from. We could:
- // 1. Map from ns to open runners,
- // 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL)
- //
- // We could also queue invalidations somehow and have them processed later in the runner's
- // read locks.
- for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end();
- ++it) {
-
- ClientCursor* cc = it->second;
- // We're only interested in cursors over one db.
- if (cc->_db != db) { continue; }
- if (NULL == cc->_runner.get()) { continue; }
- cc->_runner->invalidate(dl, type);
- }
+ // defensive:
+ _collection = NULL;
+ _cursorid = INVALID_CURSOR_ID;
+ _pos = -2;
+ _pinValue = 0;
}
- void ClientCursor::registerRunner(Runner* runner) {
- recursive_scoped_lock lock(ccmutex);
- // The second part of the pair returned by unordered_map::insert tells us whether the
- // insert was performed or not, so we can use that to ensure that we have not been
- // double registered.
- const std::pair<RunnerSet::iterator, bool> result = nonCachedRunners.insert(runner);
- verify(result.second);
- }
+ void ClientCursor::kill() {
+ if ( _runner.get() )
+ _runner->kill();
- void ClientCursor::deregisterRunner(Runner* runner) {
- recursive_scoped_lock lock(ccmutex);
- // unordered_set::erase returns a count of how many elements were erased, so we can
- // validate that our de-registration matched an existing register call by ensuring that
- // exactly one item was erased.
- verify(nonCachedRunners.erase(runner) == 1);
+ _collection = NULL;
}
void yieldOrSleepFor1Microsecond() {
@@ -352,238 +243,12 @@ namespace mongo {
_idleAgeMillis = millis;
}
- void ClientCursor::idleTimeReport(unsigned millis) {
- bool foundSomeToTimeout = false;
-
- // two passes so that we don't need to readlock unless we really do some timeouts
- // we assume here that incrementing _idleAgeMillis outside readlock is ok.
- {
- recursive_scoped_lock lock(ccmutex);
- {
- unsigned sz = clientCursorsById.size();
- static time_t last;
- if( sz >= 100000 ) {
- if( time(0) - last > 300 ) {
- last = time(0);
- log() << "warning number of open cursors is very large: " << sz << endl;
- }
- }
- }
- for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) {
- CCById::iterator j = i;
- i++;
- if( j->second->shouldTimeout( millis ) ) {
- foundSomeToTimeout = true;
- }
- }
- }
-
- if( foundSomeToTimeout ) {
- Lock::GlobalRead lk;
-
- recursive_scoped_lock cclock(ccmutex);
- CCById::const_iterator it = clientCursorsById.begin();
- while (it != clientCursorsById.end()) {
- ClientCursor* cc = it->second;
- if( cc->shouldTimeout(0) ) {
- numberTimedOut++;
- LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns
- << " idle:" << cc->idleTime() << "ms\n";
- ClientCursor* toDelete = it->second;
- CursorId id = toDelete->cursorid();
- // This is what winds up removing it from the map.
- delete toDelete;
- it = clientCursorsById.upper_bound(id);
- }
- else {
- ++it;
- }
- }
- }
- }
-
void ClientCursor::updateSlaveLocation( CurOp& curop ) {
if ( _slaveReadTill.isNull() )
return;
mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill );
}
- void ClientCursor::appendStats( BSONObjBuilder& result ) {
- recursive_scoped_lock lock(ccmutex);
- result.appendNumber("totalOpen", clientCursorsById.size() );
- result.appendNumber("clientCursors_size", (int) numCursors());
- result.appendNumber("timedOut" , numberTimedOut);
- unsigned pinned = 0;
- unsigned notimeout = 0;
- for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) {
- unsigned p = i->second->_pinValue;
- if( p >= 100 )
- pinned++;
- else if( p > 0 )
- notimeout++;
- }
- if( pinned )
- result.append("pinned", pinned);
- if( notimeout )
- result.append("totalNoTimeout", notimeout);
- }
-
- //
- // ClientCursor creation/deletion/access.
- //
-
- // Some statics used by allocCursorId_inlock().
- namespace {
- // so we don't have to do find() which is a little slow very often.
- long long cursorGenTSLast = 0;
- PseudoRandom* cursorGenRandom = NULL;
- }
-
- long long ClientCursor::allocCursorId_inlock() {
- // It is important that cursor IDs not be reused within a short period of time.
- if (!cursorGenRandom) {
- scoped_ptr<SecureRandom> sr( SecureRandom::create() );
- cursorGenRandom = new PseudoRandom( sr->nextInt64() );
- }
-
- const long long ts = Listener::getElapsedTimeMillis();
- long long x;
- while ( 1 ) {
- x = ts << 32;
- x |= cursorGenRandom->nextInt32();
-
- if ( x == 0 ) { continue; }
-
- if ( x < 0 ) { x *= -1; }
-
- if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 )
- break;
- }
-
- cursorGenTSLast = ts;
- return x;
- }
-
- // static
- ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) {
- CCById::iterator it = clientCursorsById.find(id);
- if ( it == clientCursorsById.end() ) {
- if ( warn ) {
- OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id
- << "' (ok after a drop)" << endl;
- }
- return 0;
- }
- return it->second;
- }
-
- void ClientCursor::find( const string& ns , set<CursorId>& all ) {
- recursive_scoped_lock lock(ccmutex);
-
- for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) {
- if ( i->second->_ns == ns )
- all.insert( i->first );
- }
- }
-
- // static
- ClientCursor* ClientCursor::find(CursorId id, bool warn) {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor *c = find_inlock(id, warn);
- // if this asserts, your code was not thread safe - you either need to set no timeout
- // for the cursor or keep a ClientCursor::Pointer in scope for it.
- massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue );
- return c;
- }
-
- void ClientCursor::_erase_inlock(ClientCursor* cursor) {
- // Must not have an active ClientCursor::Pin.
- massert( 16089,
- str::stream() << "Cannot kill active cursor " << cursor->cursorid(),
- cursor->_pinValue < 100 );
-
- delete cursor;
- }
-
- bool ClientCursor::erase(CursorId id) {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) { return false; }
- _erase_inlock(cursor);
- return true;
- }
-
- int ClientCursor::erase(int n, long long *ids) {
- int found = 0;
- for ( int i = 0; i < n; i++ ) {
- if ( erase(ids[i]))
- found++;
-
- if ( inShutdown() )
- break;
- }
- return found;
- }
-
- bool ClientCursor::eraseIfAuthorized(CursorId id) {
- NamespaceString ns;
- {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) {
- audit::logKillCursorsAuthzCheck(
- &cc(),
- NamespaceString(),
- id,
- ErrorCodes::CursorNotFound);
- return false;
- }
- ns = NamespaceString(cursor->ns());
- }
-
- // Can't be in a lock when checking authorization
- const bool isAuthorized = cc().getAuthorizationSession()->isAuthorizedForActionsOnNamespace(
- ns, ActionType::killCursors);
- audit::logKillCursorsAuthzCheck(
- &cc(),
- ns,
- id,
- isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized);
- if (!isAuthorized) {
- return false;
- }
-
- // It is safe to lookup the cursor again after temporarily releasing the mutex because
- // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that
- // the namespace associated with a cursor cannot change.
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) {
- // Cursor was deleted in another thread since we found it earlier in this function.
- return false;
- }
- if (ns != cursor->ns()) {
- warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: "
- << cursor->ns() << endl;
- return false;
- }
-
- _erase_inlock(cursor);
- return true;
- }
-
- int ClientCursor::eraseIfAuthorized(int n, long long *ids) {
- int found = 0;
- for ( int i = 0; i < n; i++ ) {
- if ( eraseIfAuthorized(ids[i]))
- found++;
-
- if ( inShutdown() )
- break;
- }
- return found;
- }
-
int ClientCursor::suggestYieldMicros() {
int writers = 0;
int readers = 0;
@@ -606,42 +271,45 @@ namespace mongo {
// deleted from underneath us, so we can save the pointer and ignore the ID.
//
- ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) {
- recursive_scoped_lock lock( ClientCursor::ccmutex );
- ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true );
- if (NULL != cursor) {
- uassert( 12051, "clientcursor already in use? driver problem?",
- cursor->_pinValue < 100 );
- cursor->_pinValue += 100;
- _cursorid = cursorid;
+ ClientCursorPin::ClientCursorPin( const Collection* collection, long long cursorid )
+ : _cursor( NULL ) {
+ cursorStatsOpenPinned.increment();
+ _cursor = collection->cursorCache()->find( cursorid );
+ if ( _cursor ) {
+ uassert( 12051,
+ "clientcursor already in use? driver problem?",
+ _cursor->_pinValue < 100 );
+ _cursor->_pinValue += 100;
}
}
- ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) }
+ ClientCursorPin::~ClientCursorPin() {
+ cursorStatsOpenPinned.decrement();
+ DESTRUCTOR_GUARD( release(); );
+ }
void ClientCursorPin::release() {
- if ( _cursorid == INVALID_CURSOR_ID ) {
+ if ( !_cursor )
return;
- }
- ClientCursor *cursor = c();
- _cursorid = INVALID_CURSOR_ID;
- if ( cursor ) {
- verify( cursor->_pinValue >= 100 );
- cursor->_pinValue -= 100;
+
+ invariant( _cursor->_pinValue >= 100 );
+ _cursor->_pinValue -= 100;
+
+ if ( _cursor->collection() == NULL ) {
+ // the ClientCursor was killed while we had it
+ // therefore its our responsibility to kill it
+ delete _cursor;
+ _cursor = NULL; // defensive
}
}
void ClientCursorPin::deleteUnderlying() {
- if (_cursorid == INVALID_CURSOR_ID) {
- return;
- }
- ClientCursor *cursor = c();
- _cursorid = INVALID_CURSOR_ID;
- delete cursor;
+ delete _cursor;
+ _cursor = NULL;
}
ClientCursor* ClientCursorPin::c() const {
- return ClientCursor::find( _cursorid );
+ return _cursor;
}
//
@@ -708,9 +376,9 @@ namespace mongo {
const int Secs = 4;
unsigned n = 0;
while ( ! inShutdown() ) {
- ClientCursor::idleTimeReport( t.millisReset() );
+ cursorStatsTimedOut.increment( CollectionCursorCache::timeoutCursorsGlobal( t.millisReset() ) );
sleepsecs(Secs);
- if( ++n % (60/4) == 0 /*once a minute*/ ) {
+ if( ++n % (60/Secs) == 0 /*once a minute*/ ) {
sayMemoryStatus();
}
}
@@ -723,14 +391,25 @@ namespace mongo {
// cursorInfo command.
//
+ void _appendCursorStats( BSONObjBuilder& b ) {
+ b.append( "note" , "deprecated, use server status metrics" );
+
+ b.appendNumber("totalOpen", cursorStatsOpen.get() );
+ b.appendNumber("pinned", cursorStatsOpenPinned.get() );
+ b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get() );
+
+ b.appendNumber("timedOut" , cursorStatsTimedOut.get());
+ }
+
// QUESTION: Restrict to the namespace from which this command was issued?
// Alternatively, make this command admin-only?
+ // TODO: remove this for 2.8
class CmdCursorInfo : public Command {
public:
CmdCursorInfo() : Command( "cursorInfo", true ) {}
virtual bool slaveOk() const { return true; }
virtual void help( stringstream& help ) const {
- help << " example: { cursorInfo : 1 }";
+ help << " example: { cursorInfo : 1 }, deprecated";
}
virtual LockType locktype() const { return NONE; }
virtual void addRequiredPrivileges(const std::string& dbname,
@@ -742,7 +421,8 @@ namespace mongo {
}
bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result,
bool fromRepl ) {
- ClientCursor::appendStats( result );
+ _appendCursorStats( result );
+ result.append( "note", "deprecated, get from serverStatus().cursors" );
return true;
}
} cmdCursorInfo;
@@ -758,7 +438,7 @@ namespace mongo {
BSONObj generateSection(const BSONElement& configElement) const {
BSONObjBuilder b;
- ClientCursor::appendStats( b );
+ _appendCursorStats( b );
return b.obj();
}
} cursorServerStats;
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 0d15fca29c0..78075bde52b 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -42,6 +42,7 @@ namespace mongo {
typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
class ClientCursor;
+ class Collection;
class CurOp;
class Database;
class NamespaceDetails;
@@ -56,9 +57,10 @@ namespace mongo {
*/
class ClientCursor : private boost::noncopyable {
public:
- ClientCursor(Runner* runner, int qopts = 0, const BSONObj query = BSONObj());
+ ClientCursor(const Collection* collection, Runner* runner,
+ int qopts = 0, const BSONObj query = BSONObj());
- ClientCursor(const string& ns);
+ ClientCursor(const Collection* collection);
~ClientCursor();
@@ -68,90 +70,29 @@ namespace mongo {
CursorId cursorid() const { return _cursorid; }
string ns() const { return _ns; }
- Database* db() const { return _db; }
-
- //
- // Mutation/Invalidation of DiskLocs and dropping of namespaces
- //
+ const Collection* collection() const { return _collection; }
/**
- * Get rid of cursors for namespaces 'ns'. When dropping a db, ns is "dbname." Used by drop,
- * dropIndexes, dropDatabase.
+ * This is called when someone is dropping a collection or something else that
+ * goes through killing cursors.
+ * It removes the responsiilibty of de-registering from ClientCursor.
+ * Responsibility for deleting the ClientCursor doesn't change from this call
+ * see Runner::kill.
*/
- static void invalidate(const StringData& ns);
-
- /**
- * Broadcast a document invalidation to all relevant Runner(s). invalidateDocument must
- * called *before* the provided DiskLoc is about to be deleted or mutated.
- */
- static void invalidateDocument(const StringData& ns,
- const NamespaceDetails* nsd,
- const DiskLoc& dl,
- InvalidationType type);
-
- /**
- * Register a runner so that it can be notified of deletion/invalidation during yields.
- * Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it
- * MUST NOT be registered; the two are mutually exclusive.
- */
- static void registerRunner(Runner* runner);
-
- /**
- * Remove a runner from the runner registry.
- */
- static void deregisterRunner(Runner* runner);
+ void kill();
//
// Yielding.
- //
+ //
static void staticYield(int micros, const StringData& ns, const Record* rec);
static int suggestYieldMicros();
//
- // Static methods about all ClientCursors TODO: Document.
- //
-
- static void appendStats( BSONObjBuilder& result );
-
- //
- // ClientCursor creation/deletion.
- //
-
- static unsigned numCursors() { return clientCursorsById.size(); }
- static void find( const string& ns , set<CursorId>& all );
- static ClientCursor* find(CursorId id, bool warn = true);
-
- // Same as erase but checks to make sure this thread has read permission on the cursor's
- // namespace. This should be called when receiving killCursors from a client. This should
- // not be called when ccmutex is held.
- static int eraseIfAuthorized(int n, long long* ids);
- static bool eraseIfAuthorized(CursorId id);
-
- /**
- * @return number of cursors found
- */
- static int erase(int n, long long* ids);
-
- /**
- * Deletes the cursor with the provided @param 'id' if one exists.
- * @throw if the cursor with the provided id is pinned.
- * This does not do any auth checking and should be used only when erasing cursors as part
- * of cleaning up internal operations.
- */
- static bool erase(CursorId id);
-
- //
// Timing and timeouts
//
/**
- * called every 4 seconds. millis is amount of idle time passed since the last call --
- * could be zero
- */
- static void idleTimeReport(unsigned millis);
-
- /**
* @param millis amount of idle passed time since last call
* note called outside of locks (other than ccmutex) so care must be exercised
*/
@@ -202,51 +143,20 @@ namespace mongo {
*/
bool isAggCursor;
+ unsigned pinValue() const { return _pinValue; }
+
+ static long long totalOpen();
+
private:
+ friend class ClientCursorMonitor;
friend class ClientCursorPin;
friend class CmdCursorInfo;
- // A map from the CursorId to the ClientCursor behind it.
- // TODO: Consider making this per-connection.
- typedef map<CursorId, ClientCursor*> CCById;
- static CCById clientCursorsById;
-
- // A list of NON-CACHED runners. Any runner that yields must be put into this map before
- // yielding in order to be notified of invalidation and namespace deletion. Before the
- // runner is deleted, it must be removed from this map.
- //
- // TODO: This is temporary and as such is highly NOT optimized.
- typedef unordered_set<Runner*> RunnerSet;
- static RunnerSet nonCachedRunners;
-
- // How many cursors have timed out?
- static long long numberTimedOut;
-
- // This must be held when modifying any static member.
- static boost::recursive_mutex& ccmutex;
-
/**
* Initialization common between both constructors for the ClientCursor.
*/
void init();
- /**
- * Allocates a new CursorId.
- * Called from init(...). Assumes ccmutex held.
- */
- static CursorId allocCursorId_inlock();
-
- /**
- * Find the ClientCursor with the provided ID. Optionally warn if it's not found.
- * Assumes ccmutex is held.
- */
- static ClientCursor* find_inlock(CursorId id, bool warn = true);
-
- /**
- * Delete the ClientCursor with the provided ID. masserts if the cursor is pinned.
- */
- static void _erase_inlock(ClientCursor* cursor);
-
//
// ClientCursor-specific data, independent of the underlying execution type.
//
@@ -263,8 +173,10 @@ namespace mongo {
// The namespace we're operating on.
string _ns;
- // The database we're operating on.
- Database* _db;
+ const Collection* _collection;
+
+ // if we've added it to the total open counter yet
+ bool _countedYet;
// How many objects have been returned by the find() so far?
int _pos;
@@ -303,18 +215,20 @@ namespace mongo {
* against two getMore requests on the same cursor executing at the same time - which might be
* bad. That should never happen, but if a client driver had a bug, it could (or perhaps some
* sort of attack situation).
+ * Must have a read lock on the collection already
*/
class ClientCursorPin : boost::noncopyable {
public:
- ClientCursorPin( long long cursorid );
+ ClientCursorPin( const Collection* collection, long long cursorid );
~ClientCursorPin();
- // This just releases the pin, does not delete the underlying.
+ // This just releases the pin, does not delete the underlying
+ // unless ownership has passed to us after kill
void release();
// Call this to delete the underlying ClientCursor.
void deleteUnderlying();
ClientCursor *c() const;
private:
- CursorId _cursorid;
+ ClientCursor* _cursor;
};
/** thread for timing out old cursors */
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 1da4c5828dc..eebc3fb3330 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1215,7 +1215,9 @@ namespace mongo {
auto_ptr<RangePreserver> rangePreserver;
{
Client::ReadContext ctx(config.ns);
- rangePreserver.reset(new RangePreserver(config.ns));
+ Collection* collection = ctx.ctx().db()->getCollection( config.ns );
+ if ( collection )
+ rangePreserver.reset(new RangePreserver(collection));
// Get metadata before we check our version, to make sure it doesn't increment
// in the meantime. Need to do this in the same lock scope as the block.
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index f249e2f8174..fa3a083a53d 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/privilege.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/commands.h"
@@ -102,9 +103,12 @@ namespace {
// These are all no-ops for PipelineRunners
virtual void setYieldPolicy(YieldPolicy policy) {}
virtual void invalidate(const DiskLoc& dl, InvalidationType type) {}
- virtual void kill() {}
+ virtual void kill() {
+ _pipeline->output()->kill();
+ }
virtual void saveState() {}
virtual bool restoreState() { return true; }
+ virtual const Collection* collection() { return NULL; }
/**
* Make obj the next object returned by getNext().
@@ -160,45 +164,80 @@ namespace {
return true;
}
- static void handleCursorCommand(CursorId id, BSONObj& cmdObj, BSONObjBuilder& result) {
+ static void handleCursorCommand(const string& ns,
+ intrusive_ptr<Pipeline>& pPipeline,
+ BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+
+ scoped_ptr<ClientCursorPin> pin;
+ string cursorNs = ns;
+
+ {
+ // Set up cursor
+ Client::ReadContext ctx(ns);
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
+ if ( collection ) {
+ ClientCursor* cc = new ClientCursor(collection,
+ new PipelineRunner(pPipeline));
+ // enable special locking and ns deletion behavior
+ cc->isAggCursor = true;
+
+ pin.reset( new ClientCursorPin( collection, cc->cursorid() ) );
+
+ // we need this after cursor may have been deleted
+ cursorNs = cc->ns();
+ }
+ }
+
BSONElement batchSizeElem = cmdObj.getFieldDotted("cursor.batchSize");
const long long batchSize = batchSizeElem.isNumber()
? batchSizeElem.numberLong()
: 101; // same as query
- ClientCursorPin pin(id);
- ClientCursor* cursor = pin.c();
-
- massert(16958, "Cursor shouldn't have been deleted",
- cursor);
+ ClientCursor* cursor = NULL;
+ PipelineRunner* runner = NULL;
+ if ( pin ) {
+ cursor = pin->c();
+ massert(16958,
+ "Cursor shouldn't have been deleted",
+ cursor);
+ verify(cursor->isAggCursor);
+
+ runner = dynamic_cast<PipelineRunner*>(cursor->getRunner());
+ verify(runner);
+ }
- verify(cursor->isAggCursor);
- PipelineRunner* runner = dynamic_cast<PipelineRunner*>(cursor->getRunner());
- verify(runner);
try {
- const string cursorNs = cursor->ns(); // we need this after cursor may have been deleted
// can't use result BSONObjBuilder directly since it won't handle exceptions correctly.
BSONArrayBuilder resultsArray;
const int byteLimit = MaxBytesToReturnToClientAtOnce;
BSONObj next;
- for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineRunner may be very expensive so we don't do it
- // when batchSize is 0 since that indicates a desire for a fast return.
- if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
- pin.deleteUnderlying();
- id = 0;
- cursor = NULL; // make it an obvious error to use cursor after this point
- break;
+ if ( runner ) {
+ for (int objCount = 0; objCount < batchSize; objCount++) {
+ // The initial getNext() on a PipelineRunner may be very expensive so we don't
+ // do it when batchSize is 0 since that indicates a desire for a fast return.
+ if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) {
+ pin->deleteUnderlying();
+ cursor = NULL; // make it an obvious error to use cursor after this point
+ break;
+ }
+
+ if (resultsArray.len() + next.objsize() > byteLimit) {
+ // too big. next will be the first doc in the second batch
+ runner->pushBack(next);
+ break;
+ }
+
+ resultsArray.append(next);
}
-
- if (resultsArray.len() + next.objsize() > byteLimit) {
- // too big. next will be the first doc in the second batch
- runner->pushBack(next);
- break;
- }
-
- resultsArray.append(next);
+ }
+ else {
+ // this is to ensure that side-effects such as $out occur,
+ // and that an empty output set is the correct result of this pipeline
+ invariant( pPipeline.get() );
+ invariant( pPipeline->output() );
+ invariant( !pPipeline->output()->getNext() );
}
if (cursor) {
@@ -208,14 +247,19 @@ namespace {
}
BSONObjBuilder cursorObj(result.subobjStart("cursor"));
- cursorObj.append("id", id);
+ if ( cursor )
+ cursorObj.append("id", cursor->cursorid() );
+ else
+ cursorObj.append("id", 0LL );
cursorObj.append("ns", cursorNs);
cursorObj.append("firstBatch", resultsArray.arr());
cursorObj.done();
}
catch (...) {
// Clean up cursor on way out of scope.
- pin.deleteUnderlying();
+ if ( pin ) {
+ pin->deleteUnderlying();
+ }
throw;
}
}
@@ -278,6 +322,7 @@ namespace {
// This does the mongod-specific stuff like creating a cursor
PipelineD::prepareCursorSource(pPipeline, pCtx);
+
pPipeline->stitch();
if (pPipeline->isExplain()) {
@@ -286,16 +331,7 @@ namespace {
}
if (isCursorCommand(cmdObj)) {
- CursorId id;
- {
- // Set up cursor
- Client::ReadContext ctx(ns);
- ClientCursor* cc = new ClientCursor(new PipelineRunner(pPipeline));
- cc->isAggCursor = true; // enable special locking and ns deletion behavior
- id = cc->cursorid();
- }
-
- handleCursorCommand(id, cmdObj, result);
+ handleCursorCommand(ns, pPipeline, cmdObj, result);
}
else {
pPipeline->run(result);
diff --git a/src/mongo/db/index/btree_based_access_method.cpp b/src/mongo/db/index/btree_based_access_method.cpp
index 150e1e35a47..0df6c12adad 100644
--- a/src/mongo/db/index/btree_based_access_method.cpp
+++ b/src/mongo/db/index/btree_based_access_method.cpp
@@ -210,7 +210,7 @@ namespace mongo {
return Status::OK();
}
- DiskLoc BtreeBasedAccessMethod::findSingle( const BSONObj& key ) {
+ DiskLoc BtreeBasedAccessMethod::findSingle( const BSONObj& key ) const {
DiskLoc head = _btreeState->head();
Record* record = _btreeState->recordStore()->recordFor( head );
diff --git a/src/mongo/db/index/btree_based_access_method.h b/src/mongo/db/index/btree_based_access_method.h
index 76cd8bef706..a99db4db6a1 100644
--- a/src/mongo/db/index/btree_based_access_method.h
+++ b/src/mongo/db/index/btree_based_access_method.h
@@ -94,7 +94,7 @@ namespace mongo {
virtual Status validate(int64_t* numKeys);
// XXX: consider migrating callers to use IndexCursor instead
- virtual DiskLoc findSingle( const BSONObj& key );
+ virtual DiskLoc findSingle( const BSONObj& key ) const;
// exposed for testing, used for bulk commit
static ExternalSortComparison* getComparison(int version,
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 09f6a569bb7..c443de1c1b1 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -532,7 +532,7 @@ namespace mongo {
verify( n < 30000 );
}
- int found = ClientCursor::eraseIfAuthorized(n, (long long *) x);
+ int found = CollectionCursorCache::eraseCursorGlobalIfAuthorized(n, (long long *) x);
if ( logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1)) || found != n ) {
LOG( found == n ? 1 : 0 ) << "killcursors: found " << found << " of " << n << endl;
@@ -561,7 +561,6 @@ namespace mongo {
/* important: kill all open cursors on the database */
string prefix(db);
prefix += '.';
- ClientCursor::invalidate(prefix.c_str());
dbHolderW().erase( db, path );
ctx->_clear();
@@ -788,7 +787,7 @@ namespace mongo {
// because it may now be out of sync with the client's iteration state.
// SERVER-7952
// TODO Temporary code, see SERVER-4563 for a cleanup overview.
- ClientCursor::erase( cursorid );
+ CollectionCursorCache::eraseCursorGlobal( cursorid );
}
ex.reset( new AssertionException( e.getInfo().msg, e.getCode() ) );
ok = false;
@@ -1063,7 +1062,7 @@ namespace {
}
void DBDirectClient::killCursor( long long id ) {
- ClientCursor::erase( id );
+ CollectionCursorCache::eraseCursorGlobal( id );
}
HostAndPort DBDirectClient::_clientHost = HostAndPort( "0.0.0.0" , 0 );
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index 4e466e4f189..4c7b3bc61c3 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -707,10 +707,7 @@ namespace mongo {
if (!damages.empty() ) {
// Broadcast the mutation so that query results stay correct.
- ClientCursor::invalidateDocument(nsString.ns(),
- collection->details(),
- loc,
- INVALIDATION_MUTATION);
+ collection->cursorCache()->invalidateDocument(loc, INVALIDATION_MUTATION);
collection->details()->paddingFits();
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index d4c21fca2fd..60ec8772db1 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -63,6 +63,12 @@ namespace mongo {
}
}
+ void DocumentSource::kill() {
+ if ( pSource ) {
+ pSource->kill();
+ }
+ }
+
void DocumentSource::serializeToArray(vector<Value>& array, bool explain) const {
Value entry = serialize(explain);
if (!entry.missing()) {
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index a6bd0e8727b..8965516ab55 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -76,6 +76,11 @@ namespace mongo {
virtual void dispose();
/**
+ * See ClientCursor::kill()
+ */
+ virtual void kill();
+
+ /**
Get the source's name.
@returns the string name of the source as a constant string;
@@ -345,6 +350,7 @@ namespace mongo {
virtual bool coalesce(const intrusive_ptr<DocumentSource>& nextSource);
virtual bool isValidInitialSource() const { return true; }
virtual void dispose();
+ virtual void kill();
/**
* Create a document source based on a passed-in cursor.
@@ -423,6 +429,7 @@ namespace mongo {
string _ns; // namespace
CursorId _cursorId;
+ bool _killed;
};
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 6be57a990d3..3388c31bcb3 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/instance.h"
#include "mongo/db/pipeline/document.h"
@@ -64,9 +66,21 @@ namespace mongo {
return out;
}
+ void DocumentSourceCursor::kill() {
+ _killed = true;
+ _cursorId = 0;
+ }
+
void DocumentSourceCursor::dispose() {
if (_cursorId) {
- ClientCursor::erase(_cursorId);
+ Lock::DBRead lk(_ns);
+ Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
+ Collection* collection = ctx.db()->getCollection( _ns );
+ if ( collection ) {
+ ClientCursor* cc = collection->cursorCache()->find( _cursorId );
+ if ( cc )
+ collection->cursorCache()->deregisterCursor( cc );
+ }
_cursorId = 0;
}
@@ -74,6 +88,11 @@ namespace mongo {
}
void DocumentSourceCursor::loadBatch() {
+
+ Lock::DBRead lk(_ns);
+
+ uassert( 17361, "collection or index disappeared when cursor yielded", !_killed );
+
if (!_cursorId) {
dispose();
return;
@@ -81,10 +100,11 @@ namespace mongo {
// We have already validated the sharding version when we constructed the cursor
// so we shouldn't check it again.
- Lock::DBRead lk(_ns);
Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
+ Collection* collection = ctx.db()->getCollection( _ns );
+ uassert( 17358, "Collection dropped.", collection );
- ClientCursorPin pin(_cursorId);
+ ClientCursorPin pin(collection, _cursorId);
ClientCursor* cursor = pin.c();
uassert(16950, "Cursor deleted. Was the collection or database dropped?",
@@ -211,8 +231,10 @@ namespace {
{
Lock::DBRead lk(_ns);
Client::Context ctx(_ns, storageGlobalParams.dbpath, /*doVersion=*/false);
+ Collection* collection = ctx.db()->getCollection( _ns );
+ uassert( 17362, "Collection dropped.", collection );
- ClientCursorPin pin(_cursorId);
+ ClientCursorPin pin(collection, _cursorId);
ClientCursor* cursor = pin.c();
uassert(17135, "Cursor deleted. Was the collection or database dropped?",
@@ -258,6 +280,7 @@ namespace {
, _docsAddedToBatches(0)
, _ns(ns)
, _cursorId(cursorId)
+ , _killed(false)
{}
intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create(
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index e9424098e5a..4d11f8eed29 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -116,7 +116,7 @@ namespace mongo {
void addInitialSource(intrusive_ptr<DocumentSource> source);
/// The source that represents the output. Returns a non-owning pointer.
- DocumentSource* output() { return sources.back().get(); }
+ DocumentSource* output() { invariant( !sources.empty() ); return sources.back().get(); }
/// Returns true if this pipeline only uses features that work in mongos.
bool canRunInMongos() const;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index de595ebaf28..a3ce12edd18 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -141,6 +141,16 @@ namespace {
// Create the necessary context to use a Runner, including taking a namespace read lock.
// Note: this may throw if the sharding version for this connection is out of date.
Client::ReadContext context(fullName);
+ Collection* collection = context.ctx().db()->getCollection(fullName);
+ if ( !collection ) {
+ intrusive_ptr<DocumentSource> source(DocumentSourceBsonArray::create(BSONObj(),
+ pExpCtx));
+ while (!sources.empty() && source->coalesce(sources.front())) {
+ sources.pop_front();
+ }
+ pPipeline->addInitialSource( source );
+ return;
+ }
// Create the Runner.
//
@@ -205,8 +215,9 @@ namespace {
}
// Now wrap the Runner in ClientCursor
- auto_ptr<ClientCursor> cursor(
- new ClientCursor(runner.release(), QueryOption_NoCursorTimeout));
+ auto_ptr<ClientCursor> cursor(new ClientCursor(collection,
+ runner.release(),
+ QueryOption_NoCursorTimeout));
verify(cursor->getRunner());
CursorId cursorId = cursor->cursorid();
diff --git a/src/mongo/db/query/cached_plan_runner.cpp b/src/mongo/db/query/cached_plan_runner.cpp
index 0139c0bdacf..27d32fae16f 100644
--- a/src/mongo/db/query/cached_plan_runner.cpp
+++ b/src/mongo/db/query/cached_plan_runner.cpp
@@ -45,11 +45,13 @@
namespace mongo {
- CachedPlanRunner::CachedPlanRunner(CanonicalQuery* canonicalQuery,
+ CachedPlanRunner::CachedPlanRunner(const Collection* collection,
+ CanonicalQuery* canonicalQuery,
QuerySolution* solution,
PlanStage* root,
WorkingSet* ws)
- : _canonicalQuery(canonicalQuery),
+ : _collection(collection),
+ _canonicalQuery(canonicalQuery),
_solution(solution),
_exec(new PlanExecutor(ws, root)),
_alreadyProduced(false),
@@ -127,6 +129,7 @@ namespace mongo {
void CachedPlanRunner::kill() {
_killed = true;
+ _collection = NULL;
_exec->kill();
if (NULL != _backupPlan.get()) {
_backupPlan->kill();
diff --git a/src/mongo/db/query/cached_plan_runner.h b/src/mongo/db/query/cached_plan_runner.h
index 5522f49756c..fc0af460188 100644
--- a/src/mongo/db/query/cached_plan_runner.h
+++ b/src/mongo/db/query/cached_plan_runner.h
@@ -58,7 +58,8 @@ namespace mongo {
* Takes ownership of all arguments.
* XXX: what args should this really take? probably a cachekey as well?
*/
- CachedPlanRunner(CanonicalQuery* canonicalQuery, QuerySolution* solution,
+ CachedPlanRunner(const Collection* collection,
+ CanonicalQuery* canonicalQuery, QuerySolution* solution,
PlanStage* root, WorkingSet* ws);
virtual ~CachedPlanRunner();
@@ -79,6 +80,7 @@ namespace mongo {
virtual void kill();
+ virtual const Collection* collection() { return _collection; }
/**
* Returns OK, allocating and filling in '*explain' with details of the cached
* plan. Caller takes ownership of '*explain'. Otherwise, return a status describing
@@ -94,6 +96,8 @@ namespace mongo {
private:
void updateCache();
+ const Collection* _collection;
+
boost::scoped_ptr<CanonicalQuery> _canonicalQuery;
boost::scoped_ptr<QuerySolution> _solution;
boost::scoped_ptr<PlanExecutor> _exec;
diff --git a/src/mongo/db/query/eof_runner.h b/src/mongo/db/query/eof_runner.h
index e155b174293..154e0524644 100644
--- a/src/mongo/db/query/eof_runner.h
+++ b/src/mongo/db/query/eof_runner.h
@@ -69,6 +69,9 @@ namespace mongo {
virtual void kill();
+ // this can return NULL since we never yield or anything over it
+ virtual Collection* collection() { return NULL; }
+
/**
* Always returns OK, allocating and filling in '*explain' with a fake ("zeroed")
* collection scan plan. Caller owns '*explain', though.
diff --git a/src/mongo/db/query/get_runner.cpp b/src/mongo/db/query/get_runner.cpp
index b34993f3c3f..f230f4c95dc 100644
--- a/src/mongo/db/query/get_runner.cpp
+++ b/src/mongo/db/query/get_runner.cpp
@@ -243,7 +243,8 @@ namespace mongo {
WorkingSet* ws;
PlanStage* root;
verify(StageBuilder::build(*qs, &root, &ws));
- CachedPlanRunner* cpr = new CachedPlanRunner(canonicalQuery.release(), qs,
+ CachedPlanRunner* cpr = new CachedPlanRunner(collection,
+ canonicalQuery.release(), qs,
root, ws);
if (NULL != backupQs) {
@@ -277,9 +278,11 @@ namespace mongo {
// We cannot figure out how to answer the query. Should this ever happen?
if (0 == solutions.size()) {
- return Status(ErrorCodes::BadValue,
- "error processing query: " + canonicalQuery->toString() +
- " No query solutions");
+ return Status(ErrorCodes::BadValue,
+ str::stream()
+ << "error processing query: "
+ << canonicalQuery->toString()
+ << " No query solutions");
}
if (1 == solutions.size()) {
@@ -289,12 +292,13 @@ namespace mongo {
verify(StageBuilder::build(*solutions[0], &root, &ws));
// And, run the plan.
- *out = new SingleSolutionRunner(canonicalQuery.release(), solutions[0], root, ws);
+ *out = new SingleSolutionRunner(collection,
+ canonicalQuery.release(),solutions[0], root, ws);
return Status::OK();
}
else {
// Many solutions. Let the MultiPlanRunner pick the best, update the cache, and so on.
- auto_ptr<MultiPlanRunner> mpr(new MultiPlanRunner(canonicalQuery.release()));
+ auto_ptr<MultiPlanRunner> mpr(new MultiPlanRunner(collection,canonicalQuery.release()));
for (size_t i = 0; i < solutions.size(); ++i) {
WorkingSet* ws;
PlanStage* root;
@@ -453,7 +457,7 @@ namespace mongo {
WorkingSet* ws;
PlanStage* root;
verify(StageBuilder::build(*soln, &root, &ws));
- *out = new SingleSolutionRunner(cq, soln, root, ws);
+ *out = new SingleSolutionRunner(collection, cq, soln, root, ws);
return Status::OK();
}
@@ -481,7 +485,7 @@ namespace mongo {
WorkingSet* ws;
PlanStage* root;
verify(StageBuilder::build(*solutions[i], &root, &ws));
- *out = new SingleSolutionRunner(cq, solutions[i], root, ws);
+ *out = new SingleSolutionRunner(collection, cq, solutions[i], root, ws);
return Status::OK();
}
}
@@ -498,11 +502,14 @@ namespace mongo {
ScopedRunnerRegistration::ScopedRunnerRegistration(Runner* runner)
: _runner(runner) {
- ClientCursor::registerRunner(_runner);
+ // Collection can be null for EOFRunner, or other places where registration is not needed
+ if ( _runner->collection() )
+ _runner->collection()->cursorCache()->registerRunner( runner );
}
ScopedRunnerRegistration::~ScopedRunnerRegistration() {
- ClientCursor::deregisterRunner(_runner);
+ if ( _runner->collection() )
+ _runner->collection()->cursorCache()->deregisterRunner( _runner );
}
} // namespace mongo
diff --git a/src/mongo/db/query/idhack_runner.cpp b/src/mongo/db/query/idhack_runner.cpp
index 64b7861a7a7..afdb82b45ed 100644
--- a/src/mongo/db/query/idhack_runner.cpp
+++ b/src/mongo/db/query/idhack_runner.cpp
@@ -44,7 +44,7 @@
namespace mongo {
- IDHackRunner::IDHackRunner(Collection* collection, CanonicalQuery* query)
+ IDHackRunner::IDHackRunner(const Collection* collection, CanonicalQuery* query)
: _collection(collection),
_key(query->getQueryObj()["_id"].wrap()),
_query(query),
@@ -67,18 +67,18 @@ namespace mongo {
if (_done) { return Runner::RUNNER_EOF; }
// Use the index catalog to get the id index.
- IndexCatalog* catalog = _collection->getIndexCatalog();
+ const IndexCatalog* catalog = _collection->getIndexCatalog();
// Find the index we use.
- const IndexDescriptor* idDesc = catalog->findIdIndex();
+ IndexDescriptor* idDesc = catalog->findIdIndex();
if (NULL == idDesc) {
_done = true;
return Runner::RUNNER_EOF;
}
// XXX: This may not be valid always. See SERVER-12397.
- BtreeBasedAccessMethod* accessMethod =
- static_cast<BtreeBasedAccessMethod*>(catalog->getIndex(idDesc));
+ const BtreeBasedAccessMethod* accessMethod =
+ static_cast<const BtreeBasedAccessMethod*>(catalog->getIndex(idDesc));
// Look up the key by going directly to the Btree.
DiskLoc loc = accessMethod->findSingle( _key );
@@ -175,6 +175,7 @@ namespace mongo {
void IDHackRunner::kill() {
_killed = true;
+ _collection = NULL;
}
Status IDHackRunner::getExplainPlan(TypeExplain** explain) const {
diff --git a/src/mongo/db/query/idhack_runner.h b/src/mongo/db/query/idhack_runner.h
index 7b268e94c32..bbe4d767b1c 100644
--- a/src/mongo/db/query/idhack_runner.h
+++ b/src/mongo/db/query/idhack_runner.h
@@ -49,8 +49,8 @@ namespace mongo {
class IDHackRunner : public Runner {
public:
- /** Takes ownership of all the arguments. */
- IDHackRunner(Collection* collection, CanonicalQuery* query);
+ /** Takes ownership of all the arguments -collection. */
+ IDHackRunner(const Collection* collection, CanonicalQuery* query);
IDHackRunner(Collection* collection, const BSONObj& key);
@@ -72,13 +72,15 @@ namespace mongo {
virtual void kill();
+ virtual const Collection* collection() { return _collection; }
+
/**
*/
virtual Status getExplainPlan(TypeExplain** explain) const;
private:
// Not owned here.
- Collection* _collection;
+ const Collection* _collection;
// The value to match against the _id field.
BSONObj _key;
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index be09789d8bb..fa5597dfbc1 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -62,7 +62,7 @@ namespace mongo {
/**
* Return a collection scan. Caller owns pointer.
*/
- static Runner* collectionScan(const StringData& ns,
+ static Runner* collectionScan(const StringData& ns, // TODO: make this a Collection*
const Direction direction = FORWARD,
const DiskLoc startLoc = DiskLoc()) {
Collection* collection = cc().database()->getCollection(ns);
@@ -81,7 +81,7 @@ namespace mongo {
WorkingSet* ws = new WorkingSet();
CollectionScan* cs = new CollectionScan(params, ws, NULL);
- return new InternalRunner(ns.toString(), cs, ws);
+ return new InternalRunner(collection, cs, ws);
}
/**
@@ -92,9 +92,8 @@ namespace mongo {
const BSONObj& startKey, const BSONObj& endKey,
bool endKeyInclusive, Direction direction = FORWARD,
int options = 0) {
- verify(descriptor);
-
- const NamespaceString& ns = collection->ns();
+ invariant(collection);
+ invariant(descriptor);
IndexScanParams params;
params.descriptor = descriptor;
@@ -108,10 +107,10 @@ namespace mongo {
IndexScan* ix = new IndexScan(params, ws, NULL);
if (IXSCAN_FETCH & options) {
- return new InternalRunner(ns.toString(), new FetchStage(ws, ix, NULL), ws);
+ return new InternalRunner(collection, new FetchStage(ws, ix, NULL), ws);
}
else {
- return new InternalRunner(ns.toString(), ix, ws);
+ return new InternalRunner(collection, ix, ws);
}
}
};
diff --git a/src/mongo/db/query/internal_runner.cpp b/src/mongo/db/query/internal_runner.cpp
index f5d1b392830..504e33e2056 100644
--- a/src/mongo/db/query/internal_runner.cpp
+++ b/src/mongo/db/query/internal_runner.cpp
@@ -28,10 +28,11 @@
#include "mongo/db/query/internal_runner.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/diskloc.h"
-#include "mongo/db/jsobj.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/working_set.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/explain_plan.h"
#include "mongo/db/query/plan_executor.h"
@@ -39,14 +40,16 @@
namespace mongo {
- /** Takes ownership of all arguments. */
- InternalRunner::InternalRunner(const std::string& ns, PlanStage* root, WorkingSet* ws)
- : _ns(ns), _exec(new PlanExecutor(ws, root)), _policy(Runner::YIELD_MANUAL) {
+ InternalRunner::InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws)
+ : _collection(collection),
+ _exec(new PlanExecutor(ws, root)),
+ _policy(Runner::YIELD_MANUAL) {
+ invariant( collection );
}
InternalRunner::~InternalRunner() {
- if (Runner::YIELD_AUTO == _policy) {
- ClientCursor::deregisterRunner(this);
+ if (Runner::YIELD_AUTO == _policy && _collection) {
+ _collection->cursorCache()->deregisterRunner(this);
}
}
@@ -67,7 +70,7 @@ namespace mongo {
}
const std::string& InternalRunner::ns() {
- return _ns;
+ return _collection->ns().ns();
}
void InternalRunner::invalidate(const DiskLoc& dl, InvalidationType type) {
@@ -78,13 +81,15 @@ namespace mongo {
// No-op.
if (_policy == policy) { return; }
+ invariant( _collection );
+
if (Runner::YIELD_AUTO == policy) {
// Going from manual to auto.
- ClientCursor::registerRunner(this);
+ _collection->cursorCache()->registerRunner(this);
}
else {
// Going from auto to manual.
- ClientCursor::deregisterRunner(this);
+ _collection->cursorCache()->deregisterRunner(this);
}
_policy = policy;
@@ -93,6 +98,7 @@ namespace mongo {
void InternalRunner::kill() {
_exec->kill();
+ _collection = NULL;
}
Status InternalRunner::getExplainPlan(TypeExplain** explain) const {
diff --git a/src/mongo/db/query/internal_runner.h b/src/mongo/db/query/internal_runner.h
index 699814e1a2a..441224d7c2b 100644
--- a/src/mongo/db/query/internal_runner.h
+++ b/src/mongo/db/query/internal_runner.h
@@ -56,8 +56,8 @@ namespace mongo {
class InternalRunner : public Runner {
public:
- /** Takes ownership of all arguments. */
- InternalRunner(const string& ns, PlanStage* root, WorkingSet* ws);
+ /** Takes ownership of root and ws. */
+ InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws);
virtual ~InternalRunner();
@@ -77,6 +77,8 @@ namespace mongo {
virtual void kill();
+ virtual const Collection* collection() { return _collection; }
+
/**
* Returns OK, allocating and filling in '*explain' with details of the plan used by
* this runner. Caller takes ownership of '*explain'. Otherwise, return a status
@@ -89,7 +91,7 @@ namespace mongo {
virtual Status getExplainPlan(TypeExplain** explain) const;
private:
- std::string _ns;
+ const Collection* _collection;
boost::scoped_ptr<PlanExecutor> _exec;
Runner::YieldPolicy _policy;
diff --git a/src/mongo/db/query/multi_plan_runner.cpp b/src/mongo/db/query/multi_plan_runner.cpp
index 5a74050e948..6716d15d2dc 100644
--- a/src/mongo/db/query/multi_plan_runner.cpp
+++ b/src/mongo/db/query/multi_plan_runner.cpp
@@ -47,8 +47,9 @@
namespace mongo {
- MultiPlanRunner::MultiPlanRunner(CanonicalQuery* query)
- : _killed(false),
+ MultiPlanRunner::MultiPlanRunner(const Collection* collection, CanonicalQuery* query)
+ : _collection(collection),
+ _killed(false),
_failure(false),
_failureCount(0),
_policy(Runner::YIELD_MANUAL),
@@ -208,6 +209,7 @@ namespace mongo {
void MultiPlanRunner::kill() {
_killed = true;
+ _collection = NULL;
if (NULL != _bestPlan) { _bestPlan->kill(); }
}
diff --git a/src/mongo/db/query/multi_plan_runner.h b/src/mongo/db/query/multi_plan_runner.h
index ca4910e1f6f..d1c13c616d8 100644
--- a/src/mongo/db/query/multi_plan_runner.h
+++ b/src/mongo/db/query/multi_plan_runner.h
@@ -57,7 +57,7 @@ namespace mongo {
/**
* Takes ownership of query.
*/
- MultiPlanRunner(CanonicalQuery* query);
+ MultiPlanRunner(const Collection* collection, CanonicalQuery* query);
virtual ~MultiPlanRunner();
/**
@@ -93,6 +93,8 @@ namespace mongo {
virtual void kill();
+ virtual const Collection* collection() { return _collection; }
+
/**
* Returns OK, allocating and filling in '*explain' with details of the "winner"
* plan. Caller takes ownership of '*explain'. Otherwise, return a status describing
@@ -110,6 +112,8 @@ namespace mongo {
void allPlansSaveState();
void allPlansRestoreState();
+ const Collection* _collection;
+
// Were we killed by an invalidate?
bool _killed;
diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp
index b40a1f37073..26807bf1b3d 100644
--- a/src/mongo/db/query/new_find.cpp
+++ b/src/mongo/db/query/new_find.cpp
@@ -126,6 +126,8 @@ namespace mongo {
// This is a read lock.
scoped_ptr<Client::ReadContext> ctx(new Client::ReadContext(ns));
+ Collection* collection = ctx->ctx().db()->getCollection(ns);
+ uassert( 17356, "collection dropped between getMore calls", collection );
QLOG() << "running getMore in new system, cursorid " << cursorid << endl;
@@ -138,7 +140,7 @@ namespace mongo {
// A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it
// doesn't time out. Also informs ClientCursor that there is somebody actively holding the
// CC, so don't delete it.
- ClientCursorPin ccPin(cursorid);
+ ClientCursorPin ccPin(collection, cursorid);
ClientCursor* cc = ccPin.c();
// These are set in the QueryResult msg we return.
@@ -291,13 +293,17 @@ namespace mongo {
return qr;
}
- Status getOplogStartHack(CanonicalQuery* cq, Runner** runnerOut) {
+ Status getOplogStartHack(Collection* collection, CanonicalQuery* cq, Runner** runnerOut) {
+ if ( collection == NULL )
+ return Status(ErrorCodes::InternalError,
+ "getOplogStartHack called with a NULL collection" );
+
// Make an oplog start finding stage.
WorkingSet* oplogws = new WorkingSet();
OplogStart* stage = new OplogStart(cq->ns(), cq->root(), oplogws);
// Takes ownership of ws and stage.
- auto_ptr<InternalRunner> runner(new InternalRunner(cq->ns(), stage, oplogws));
+ auto_ptr<InternalRunner> runner(new InternalRunner(collection, stage, oplogws));
runner->setYieldPolicy(Runner::YIELD_AUTO);
// The stage returns a DiskLoc of where to start.
@@ -325,7 +331,7 @@ namespace mongo {
WorkingSet* ws = new WorkingSet();
CollectionScan* cs = new CollectionScan(params, ws, cq->root());
// Takes ownership of cq, cs, ws.
- *runnerOut = new SingleSolutionRunner(cq, NULL, cs, ws);
+ *runnerOut = new SingleSolutionRunner(collection, cq, NULL, cs, ws);
return Status::OK();
}
@@ -381,6 +387,7 @@ namespace mongo {
// where-specific parsing code assumes we have a lock and creates execution machinery that
// requires it.
Client::ReadContext ctx(q.ns);
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
// Parse the qm into a CanonicalQuery.
CanonicalQuery* cq;
@@ -412,11 +419,11 @@ namespace mongo {
// Otherwise we go through the selection of which runner is most suited to the
// query + run-time context at hand.
Status status = Status::OK();
- if (ctx.ctx().db()->getCollection(cq->ns()) == NULL) {
+ if (collection == NULL) {
rawRunner = new EOFRunner(cq, cq->ns());
}
else if (pq.hasOption(QueryOption_OplogReplay)) {
- status = getOplogStartHack(cq, &rawRunner);
+ status = getOplogStartHack(collection, cq, &rawRunner);
}
else {
// Takes ownership of cq.
@@ -617,7 +624,8 @@ namespace mongo {
// Allocate a new ClientCursor. We don't have to worry about leaking it as it's
// inserted into a global map by its ctor.
- ClientCursor* cc = new ClientCursor(runner.get(), cq->getParsed().getOptions(),
+ ClientCursor* cc = new ClientCursor(collection, runner.get(),
+ cq->getParsed().getOptions(),
cq->getParsed().getFilter());
ccId = cc->cursorid();
diff --git a/src/mongo/db/query/runner.h b/src/mongo/db/query/runner.h
index 01ad9988338..4e39c6c5f41 100644
--- a/src/mongo/db/query/runner.h
+++ b/src/mongo/db/query/runner.h
@@ -34,6 +34,7 @@
namespace mongo {
+ class Collection;
class DiskLoc;
class TypeExplain;
@@ -162,7 +163,7 @@ namespace mongo {
* The runner must take any actions required to continue operating correctly, including
* broadcasting the invalidation request to the PlanStage tree being run.
*
- * Called from ClientCursor::invalidateDocument.
+ * Called from CollectionCursorCache::invalidateDocument.
*
* See db/invalidation_type.h for InvalidationType.
*/
@@ -192,6 +193,11 @@ namespace mongo {
virtual const string& ns() = 0;
/**
+ * Return the Collection that the query is running over.
+ */
+ virtual const Collection* collection() = 0;
+
+ /**
* Returns OK, allocating and filling '*explain' with a description of the chosen plan.
* Caller takes onwership of '*explain'. Otherwise, returns false with a detailed error
* status.
diff --git a/src/mongo/db/query/runner_yield_policy.h b/src/mongo/db/query/runner_yield_policy.h
index 4058108c3bd..4f164c55b01 100644
--- a/src/mongo/db/query/runner_yield_policy.h
+++ b/src/mongo/db/query/runner_yield_policy.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/clientcursor.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
@@ -41,7 +42,9 @@ namespace mongo {
if (NULL != _runnerYielding) {
// We were destructed mid-yield. Since we're being used to yield a runner, we have
// to deregister the runner.
- ClientCursor::deregisterRunner(_runnerYielding);
+ if ( _runnerYielding->collection() ) {
+ _runnerYielding->collection()->cursorCache()->deregisterRunner(_runnerYielding);
+ }
}
}
@@ -56,7 +59,9 @@ namespace mongo {
* Provided runner MUST be YIELD_MANUAL.
*/
bool yieldAndCheckIfOK(Runner* runner, Record* record = NULL) {
- verify(runner);
+ invariant(runner);
+ invariant(runner->collection()); // XXX: should this just return true?
+
int micros = ClientCursor::suggestYieldMicros();
// If micros is not positive, no point in yielding, nobody waiting.
@@ -66,9 +71,16 @@ namespace mongo {
// If micros > 0, we should yield.
runner->saveState();
_runnerYielding = runner;
- ClientCursor::registerRunner(_runnerYielding);
+
+ runner->collection()->cursorCache()->registerRunner( _runnerYielding );
+
staticYield(micros, record);
- ClientCursor::deregisterRunner(_runnerYielding);
+
+ if ( runner->collection() ) {
+ // if the runner was killed, runner->collection() will return NULL
+ // so we don't deregister as it was done when killed
+ runner->collection()->cursorCache()->registerRunner( _runnerYielding );
+ }
_runnerYielding = NULL;
_elapsedTracker.resetLastTime();
return runner->restoreState();
diff --git a/src/mongo/db/query/single_solution_runner.cpp b/src/mongo/db/query/single_solution_runner.cpp
index 3196fc831b9..40c6e875257 100644
--- a/src/mongo/db/query/single_solution_runner.cpp
+++ b/src/mongo/db/query/single_solution_runner.cpp
@@ -40,11 +40,13 @@
namespace mongo {
- SingleSolutionRunner::SingleSolutionRunner(CanonicalQuery* canonicalQuery,
+ SingleSolutionRunner::SingleSolutionRunner(const Collection* collection,
+ CanonicalQuery* canonicalQuery,
QuerySolution* soln,
PlanStage* root,
WorkingSet* ws)
- : _canonicalQuery(canonicalQuery),
+ : _collection( collection ),
+ _canonicalQuery(canonicalQuery),
_solution(soln),
_exec(new PlanExecutor(ws, root)) { }
@@ -84,6 +86,7 @@ namespace mongo {
void SingleSolutionRunner::kill() {
_exec->kill();
+ _collection = NULL;
}
Status SingleSolutionRunner::getExplainPlan(TypeExplain** explain) const {
diff --git a/src/mongo/db/query/single_solution_runner.h b/src/mongo/db/query/single_solution_runner.h
index 7062a70c153..83aa9f8f869 100644
--- a/src/mongo/db/query/single_solution_runner.h
+++ b/src/mongo/db/query/single_solution_runner.h
@@ -52,8 +52,9 @@ namespace mongo {
class SingleSolutionRunner : public Runner {
public:
- /** Takes ownership of all the arguments. */
- SingleSolutionRunner(CanonicalQuery* canonicalQuery, QuerySolution* soln,
+ /** Takes ownership of all the arguments except collection */
+ SingleSolutionRunner(const Collection* collection,
+ CanonicalQuery* canonicalQuery, QuerySolution* soln,
PlanStage* root, WorkingSet* ws);
virtual ~SingleSolutionRunner();
@@ -74,6 +75,7 @@ namespace mongo {
virtual void kill();
+ virtual const Collection* collection() { return _collection; }
/**
* Returns OK, allocating and filling in '*explain' with the details of the plan used
* by this runner. Caller takes ownership of '*explain'. Otherwise, return a status
@@ -82,6 +84,8 @@ namespace mongo {
virtual Status getExplainPlan(TypeExplain** explain) const;
private:
+ const Collection* _collection;
+
boost::scoped_ptr<CanonicalQuery> _canonicalQuery;
boost::scoped_ptr<QuerySolution> _solution;
boost::scoped_ptr<PlanExecutor> _exec;
diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp
index 1be9f169c74..e7c524128f0 100644
--- a/src/mongo/db/range_deleter_db_env.cpp
+++ b/src/mongo/db/range_deleter_db_env.cpp
@@ -150,6 +150,11 @@ namespace mongo {
void RangeDeleterDBEnv::getCursorIds(const StringData& ns,
std::set<CursorId>* openCursors) {
- ClientCursor::find(ns.toString(), *openCursors);
+ Client::ReadContext ctx(ns.toString());
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
+ if ( !collection )
+ return;
+
+ collection->cursorCache()->getCursorIds( openCursors );
}
}
diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h
index ed44dc2bc33..dac32c30fd6 100644
--- a/src/mongo/db/range_preserver.h
+++ b/src/mongo/db/range_preserver.h
@@ -48,12 +48,13 @@ namespace mongo {
* object does. The ClientCursorPin guarantees that the underlying ClientCursor is not
* deleted until this object goes out of scope.
*/
- RangePreserver(const string& ns) {
+ RangePreserver(const Collection* collection) {
+ invariant( collection );
// Not a memory leak. Cached in a static structure by CC's ctor.
- ClientCursor* cc = new ClientCursor(ns);
+ ClientCursor* cc = new ClientCursor(collection);
// Pin keeps the CC from being deleted while it's in scope. We delete it ourselves.
- _pin.reset(new ClientCursorPin(cc->cursorid()));
+ _pin.reset(new ClientCursorPin(collection, cc->cursorid()));
}
~RangePreserver() {
diff --git a/src/mongo/db/restapi.cpp b/src/mongo/db/restapi.cpp
index 2585681796d..30f82f4c591 100644
--- a/src/mongo/db/restapi.cpp
+++ b/src/mongo/db/restapi.cpp
@@ -275,7 +275,7 @@ namespace mongo {
ss << "<pre>\n";
ss << "time to get readlock: " << millis << "ms\n";
ss << "# databases: " << dbHolder().sizeInfo() << '\n';
- ss << "# Cursors: " << ClientCursor::numCursors() << '\n';
+ ss << "# Cursors: " << ClientCursor::totalOpen() << '\n';
ss << "replication: ";
if( *replInfo )
ss << "\nreplInfo: " << replInfo << "\n\n";
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 48d13049d9e..3d8ffa28a71 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -32,6 +32,8 @@
#include <boost/thread/thread.hpp>
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/interrupt_status_mongod.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
@@ -158,12 +160,15 @@ namespace DocumentSourceTests {
{ _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; }
protected:
void createSource() {
- Client::ReadContext ctx (ns);
+ Client::WriteContext ctx (ns);
+ Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns );
CanonicalQuery* cq;
uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq));
Runner* runner;
uassertStatusOK(getRunner(cq, &runner));
- auto_ptr<ClientCursor> cc(new ClientCursor(runner, QueryOption_NoCursorTimeout));
+ auto_ptr<ClientCursor> cc(new ClientCursor(collection,
+ runner,
+ QueryOption_NoCursorTimeout));
verify(cc->getRunner());
cc->getRunner()->setYieldPolicy(Runner::YIELD_AUTO);
CursorId cursorId = cc->cursorid();
@@ -196,9 +201,13 @@ namespace DocumentSourceTests {
}
private:
void assertNumClientCursors( unsigned int expected ) {
- set<CursorId> nsCursors;
- ClientCursor::find( ns, nsCursors );
- ASSERT_EQUALS( expected, nsCursors.size() );
+ Client::ReadContext ctx( ns );
+ Collection* collection = ctx.ctx().db()->getCollection( ns );
+ if ( !collection ) {
+ ASSERT( 0 == expected );
+ return;
+ }
+ ASSERT_EQUALS( expected, collection->cursorCache()->numCursors() );
}
};
diff --git a/src/mongo/dbtests/query_multi_plan_runner.cpp b/src/mongo/dbtests/query_multi_plan_runner.cpp
index 114cf792c3e..e12241ece6c 100644
--- a/src/mongo/dbtests/query_multi_plan_runner.cpp
+++ b/src/mongo/dbtests/query_multi_plan_runner.cpp
@@ -130,7 +130,7 @@ namespace QueryMultiPlanRunner {
CanonicalQuery* cq = NULL;
verify(CanonicalQuery::canonicalize(ns(), BSON("foo" << 7), &cq).isOK());
verify(NULL != cq);
- MultiPlanRunner mpr(cq);
+ MultiPlanRunner mpr(ctx.ctx().db()->getCollection(ns()),cq);
mpr.addPlan(createQuerySolution(), firstRoot.release(), firstWs.release());
mpr.addPlan(createQuerySolution(), secondRoot.release(), secondWs.release());
diff --git a/src/mongo/dbtests/query_single_solution_runner.cpp b/src/mongo/dbtests/query_single_solution_runner.cpp
index 77af61b685f..b11bc35aaed 100644
--- a/src/mongo/dbtests/query_single_solution_runner.cpp
+++ b/src/mongo/dbtests/query_single_solution_runner.cpp
@@ -77,7 +77,8 @@ namespace QuerySingleSolutionRunner {
*
* The caller takes ownership of the returned SingleSolutionRunner*.
*/
- SingleSolutionRunner* makeCollScanRunner(BSONObj& filterObj) {
+ SingleSolutionRunner* makeCollScanRunner(Client::Context& ctx,
+ BSONObj& filterObj) {
CollectionScanParams csparams;
csparams.ns = ns();
csparams.direction = CollectionScanParams::FORWARD;
@@ -94,8 +95,11 @@ namespace QuerySingleSolutionRunner {
verify(NULL != cq);
// Hand the plan off to the single solution runner.
- SingleSolutionRunner* ssr = new SingleSolutionRunner(cq, new QuerySolution(),
- root.release(), ws.release());
+ SingleSolutionRunner* ssr = new SingleSolutionRunner(ctx.db()->getCollection(ns()),
+ cq,
+ new QuerySolution(),
+ root.release(),
+ ws.release());
return ssr;
}
@@ -112,7 +116,8 @@ namespace QuerySingleSolutionRunner {
*
* The caller takes ownership of the returned SingleSolutionRunner*.
*/
- SingleSolutionRunner* makeIndexScanRunner(BSONObj& indexSpec, int start, int end) {
+ SingleSolutionRunner* makeIndexScanRunner(Client::Context& context,
+ BSONObj& indexSpec, int start, int end) {
// Build the index scan stage.
IndexScanParams ixparams;
ixparams.descriptor = getIndex(indexSpec);
@@ -130,12 +135,33 @@ namespace QuerySingleSolutionRunner {
verify(NULL != cq);
// Hand the plan off to the single solution runner.
- return new SingleSolutionRunner(cq, new QuerySolution(),
+ return new SingleSolutionRunner(context.db()->getCollection(ns()),
+ cq, new QuerySolution(),
root.release(), ws.release());
}
static const char* ns() { return "unittests.QueryStageSingleSolutionRunner"; }
+ size_t numCursors() {
+ Client::ReadContext ctx( ns() );
+ Collection* collection = ctx.ctx().db()->getCollection( ns() );
+ if ( !collection )
+ return 0;
+ return collection->cursorCache()->numCursors();
+ }
+
+ void registerRunner( Runner* runner ) {
+ Client::ReadContext ctx( ns() );
+ Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns() );
+ return collection->cursorCache()->registerRunner( runner );
+ }
+
+ void deregisterRunner( Runner* runner ) {
+ Client::ReadContext ctx( ns() );
+ Collection* collection = ctx.ctx().db()->getOrCreateCollection( ns() );
+ return collection->cursorCache()->deregisterRunner( runner );
+ }
+
private:
IndexDescriptor* getIndex(const BSONObj& obj) {
Collection* collection = cc().database()->getCollection( ns() );
@@ -159,10 +185,10 @@ namespace QuerySingleSolutionRunner {
insert(BSON("_id" << 2));
BSONObj filterObj = fromjson("{_id: {$gt: 0}}");
- scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(filterObj));
+ scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj));
// Set up autoyielding.
- ClientCursor::registerRunner(ssr.get());
+ registerRunner(ssr.get());
ssr->setYieldPolicy(Runner::YIELD_AUTO);
BSONObj objOut;
@@ -174,7 +200,7 @@ namespace QuerySingleSolutionRunner {
dropCollection();
ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL));
- ClientCursor::deregisterRunner(ssr.get());
+ deregisterRunner(ssr.get());
}
};
@@ -192,10 +218,10 @@ namespace QuerySingleSolutionRunner {
BSONObj indexSpec = BSON("a" << 1);
addIndex(indexSpec);
- scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(indexSpec, 7, 10));
+ scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 7, 10));
// Set up autoyielding.
- ClientCursor::registerRunner(ssr.get());
+ registerRunner(ssr.get());
ssr->setYieldPolicy(Runner::YIELD_AUTO);
BSONObj objOut;
@@ -207,7 +233,7 @@ namespace QuerySingleSolutionRunner {
dropCollection();
ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL));
- ClientCursor::deregisterRunner(ssr.get());
+ deregisterRunner(ssr.get());
}
};
@@ -263,7 +289,7 @@ namespace QuerySingleSolutionRunner {
setupCollection();
BSONObj filterObj = fromjson("{a: {$gte: 2}}");
- scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(filterObj));
+ scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj));
BSONObj objOut;
ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL));
@@ -290,7 +316,7 @@ namespace QuerySingleSolutionRunner {
addIndex(indexSpec);
BSONObj filterObj = fromjson("{a: {$gte: 2}}");
- scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(indexSpec, 2, 5));
+ scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 2, 5));
BSONObj objOut;
ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL));
@@ -319,16 +345,17 @@ namespace QuerySingleSolutionRunner {
insert(BSON("a" << 1 << "b" << 1));
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- SingleSolutionRunner* ssr = makeCollScanRunner(filterObj);
+ SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj);
// Make a client cursor from the runner.
- new ClientCursor(ssr, 0, BSONObj());
+ new ClientCursor(ctx.ctx().db()->getCollection(ns()),
+ ssr, 0, BSONObj());
// There should be one cursor before invalidation,
// and zero cursors after invalidation.
- ASSERT_EQUALS(1U, ClientCursor::numCursors());
- ClientCursor::invalidate(ns());
- ASSERT_EQUALS(0U, ClientCursor::numCursors());
+ ASSERT_EQUALS(1U, numCursors());
+ ctx.ctx().db()->getCollection( ns() )->cursorCache()->invalidateAll(false);
+ ASSERT_EQUALS(0U, numCursors());
}
};
@@ -342,18 +369,21 @@ namespace QuerySingleSolutionRunner {
Client::WriteContext ctx(ns());
insert(BSON("a" << 1 << "b" << 1));
+ Collection* collection = ctx.ctx().db()->getCollection(ns());
+
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- SingleSolutionRunner* ssr = makeCollScanRunner(filterObj);
+ SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj);
// Make a client cursor from the runner.
- ClientCursor* cc = new ClientCursor(ssr, 0, BSONObj());
- ClientCursorPin ccPin(cc->cursorid());
+ ClientCursor* cc = new ClientCursor(collection,
+ ssr, 0, BSONObj());
+ ClientCursorPin ccPin(collection,cc->cursorid());
// If the cursor is pinned, it sticks around,
// even after invalidation.
- ASSERT_EQUALS(1U, ClientCursor::numCursors());
- ClientCursor::invalidate(ns());
- ASSERT_EQUALS(1U, ClientCursor::numCursors());
+ ASSERT_EQUALS(1U, numCursors());
+ collection->cursorCache()->invalidateAll(false);
+ ASSERT_EQUALS(1U, numCursors());
// The invalidation should have killed the runner.
BSONObj objOut;
@@ -362,7 +392,7 @@ namespace QuerySingleSolutionRunner {
// Deleting the underlying cursor should cause the
// number of cursors to return to 0.
ccPin.deleteUnderlying();
- ASSERT_EQUALS(0U, ClientCursor::numCursors());
+ ASSERT_EQUALS(0U, numCursors());
}
};
@@ -380,19 +410,20 @@ namespace QuerySingleSolutionRunner {
{
Client::ReadContext ctx(ns());
+ Collection* collection = ctx.ctx().db()->getCollection(ns());
BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- SingleSolutionRunner* ssr = makeCollScanRunner(filterObj);
+ SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj);
// Make a client cursor from the runner.
- new ClientCursor(ssr, 0, BSONObj());
+ new ClientCursor(collection, ssr, 0, BSONObj());
}
// There should be one cursor before timeout,
// and zero cursors after timeout.
- ASSERT_EQUALS(1U, ClientCursor::numCursors());
- ClientCursor::idleTimeReport(600001);
- ASSERT_EQUALS(0U, ClientCursor::numCursors());
+ ASSERT_EQUALS(1U, numCursors());
+ CollectionCursorCache::timeoutCursorsGlobal(600001);
+ ASSERT_EQUALS(0U, numCursors());
}
};
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 12a25aba5ec..10f3807bc54 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -238,7 +238,7 @@ namespace QueryTests {
// Check internal server handoff to getmore.
Lock::DBWrite lk(ns);
Client::Context ctx( ns );
- ClientCursorPin clientCursor( cursorId );
+ ClientCursorPin clientCursor( ctx.db()->getCollection(ns), cursorId );
// pq doesn't exist if it's a runner inside of the clientcursor.
// ASSERT( clientCursor.c()->pq );
// ASSERT_EQUALS( 2, clientCursor.c()->pq->getNumToReturn() );
@@ -290,9 +290,11 @@ namespace QueryTests {
killCurrentOp.reset();
// Check that the cursor has been removed.
- set<CursorId> ids;
- ClientCursor::find( ns, ids );
- ASSERT_EQUALS( 0U, ids.count( cursorId ) );
+ {
+ Client::ReadContext ctx( ns );
+ ASSERT( 0 == ctx.ctx().db()->getCollection( ns )->cursorCache()->numCursors() );
+ }
+ ASSERT_FALSE( CollectionCursorCache::eraseCursorGlobal( cursorId ) );
// Check that a subsequent get more fails with the cursor removed.
ASSERT_THROWS( client().getMore( ns, cursorId ), UserException );
@@ -337,10 +339,12 @@ namespace QueryTests {
cursor->getCursorId() );
// Check that the cursor still exists
- set<CursorId> ids;
- ClientCursor::find( ns, ids );
- ASSERT_EQUALS( 1U, ids.count( cursorId ) );
-
+ {
+ Client::ReadContext ctx( ns );
+ ASSERT( 1 == ctx.ctx().db()->getCollection( ns )->cursorCache()->numCursors() );
+ ASSERT( ctx.ctx().db()->getCollection( ns )->cursorCache()->find( cursorId ) );
+ }
+
// Check that the cursor can be iterated until all documents are returned.
while( cursor->more() ) {
cursor->next();
@@ -597,7 +601,7 @@ namespace QueryTests {
ASSERT_EQUALS( two, c->next()["ts"].Date() );
long long cursorId = c->getCursorId();
- ClientCursorPin clientCursor( cursorId );
+ ClientCursorPin clientCursor( ctx.db()->getCollection( ns ), cursorId );
ASSERT_EQUALS( three.millis, clientCursor.c()->getSlaveReadTill().asDate() );
}
};
@@ -1096,6 +1100,14 @@ namespace QueryTests {
return (int) client().count( ns() );
}
+ size_t numCursorsOpen() {
+ Client::ReadContext ctx( _ns );
+ Collection* collection = ctx.ctx().db()->getCollection( _ns );
+ if ( !collection )
+ return 0;
+ return collection->cursorCache()->numCursors();
+ }
+
const char * ns() {
return _ns.c_str();
}
@@ -1297,7 +1309,7 @@ namespace QueryTests {
}
void run() {
- unsigned startNumCursors = ClientCursor::numCursors();
+ size_t startNumCursors = numCursorsOpen();
BSONObj info;
ASSERT( client().runCommand( "unittests", BSON( "create" << "querytests.findingstart" << "capped" << true << "$nExtents" << 5 << "autoIndexId" << false ), info ) );
@@ -1317,7 +1329,7 @@ namespace QueryTests {
}
}
- ASSERT_EQUALS( startNumCursors, ClientCursor::numCursors() );
+ ASSERT_EQUALS( startNumCursors, numCursorsOpen() );
}
};
@@ -1330,7 +1342,7 @@ namespace QueryTests {
FindingStartStale() : CollectionBase( "findingstart" ) {}
void run() {
- unsigned startNumCursors = ClientCursor::numCursors();
+ size_t startNumCursors = numCursorsOpen();
// Check OplogReplay mode with missing collection.
auto_ptr< DBClientCursor > c0 = client().query( ns(), QUERY( "ts" << GTE << 50 ), 0, 0, 0, QueryOption_OplogReplay );
@@ -1350,7 +1362,7 @@ namespace QueryTests {
ASSERT_EQUALS( 100, c->next()[ "ts" ].numberInt() );
// Check that no persistent cursors outlast our queries above.
- ASSERT_EQUALS( startNumCursors, ClientCursor::numCursors() );
+ ASSERT_EQUALS( startNumCursors, numCursorsOpen() );
}
};
@@ -1412,7 +1424,9 @@ namespace QueryTests {
ClientCursor *clientCursor = 0;
{
- ClientCursorPin clientCursorPointer( cursorId );
+ Client::ReadContext ctx( ns() );
+ ClientCursorPin clientCursorPointer( ctx.ctx().db()->getCollection( ns() ),
+ cursorId );
clientCursor = clientCursorPointer.c();
// clientCursorPointer destructor unpins the cursor.
}
@@ -1449,7 +1463,7 @@ namespace QueryTests {
{
Client::WriteContext ctx( ns() );
- ClientCursorPin pinCursor( cursorId );
+ ClientCursorPin pinCursor( ctx.ctx().db()->getCollection( ns() ), cursorId );
ASSERT_THROWS( client().killCursor( cursorId ), MsgAssertionException );
string expectedAssertion =
diff --git a/src/mongo/dbtests/runner_registry.cpp b/src/mongo/dbtests/runner_registry.cpp
index 30cd9971997..9b53453afd4 100644
--- a/src/mongo/dbtests/runner_registry.cpp
+++ b/src/mongo/dbtests/runner_registry.cpp
@@ -71,10 +71,19 @@ namespace RunnerRegistry {
CanonicalQuery* cq;
ASSERT(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK());
// Owns all args
- auto_ptr<Runner> run(new SingleSolutionRunner(cq, NULL, scan.release(), ws.release()));
+ auto_ptr<Runner> run(new SingleSolutionRunner(_ctx->ctx().db()->getCollection( ns() ),
+ cq, NULL, scan.release(), ws.release()));
return run.release();
}
+ void registerRunner( Runner* runner ) {
+ _ctx->ctx().db()->getOrCreateCollection( ns() )->cursorCache()->registerRunner( runner );
+ }
+
+ void deregisterRunner( Runner* runner ) {
+ _ctx->ctx().db()->getOrCreateCollection( ns() )->cursorCache()->deregisterRunner( runner );
+ }
+
int N() { return 50; }
static const char* ns() { return "unittests.RunnerRegistryDiskLocInvalidation"; }
@@ -99,7 +108,7 @@ namespace RunnerRegistry {
// Register it.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// At this point it's safe to yield. forceYield would do that. Let's now simulate some
// stuff going on in the yield.
@@ -110,7 +119,7 @@ namespace RunnerRegistry {
// At this point, we're done yielding. We recover our lock.
// Unregister the runner.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
// And clean up anything that happened before.
run->restoreState();
@@ -141,13 +150,13 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop a collection that's not ours.
_client.dropCollection("unittests.someboguscollection");
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL));
@@ -155,13 +164,13 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop our collection.
_client.dropCollection(ns());
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
// Runner was killed.
@@ -186,13 +195,13 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop all indices.
_client.dropIndexes(ns());
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
// Runner was killed.
@@ -217,13 +226,13 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop a specific index.
_client.dropIndex(ns(), BSON("foo" << 1));
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
// Runner was killed.
@@ -246,7 +255,7 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB
// requires a "global write lock."
@@ -255,7 +264,7 @@ namespace RunnerRegistry {
_ctx.reset(new Client::WriteContext(ns()));
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL));
@@ -263,7 +272,7 @@ namespace RunnerRegistry {
// Save state and register.
run->saveState();
- ClientCursor::registerRunner(run.get());
+ registerRunner(run.get());
// Drop our DB. Once again, must give up the lock.
_ctx.reset();
@@ -271,7 +280,7 @@ namespace RunnerRegistry {
_ctx.reset(new Client::WriteContext(ns()));
// Unregister and restore state.
- ClientCursor::deregisterRunner(run.get());
+ deregisterRunner(run.get());
run->restoreState();
// Runner was killed.