diff options
author | Eliot Horowitz <eliot@10gen.com> | 2014-01-24 15:47:07 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2014-01-24 15:47:07 -0500 |
commit | 7349ba70a0e68627dc322113c561afe3a9ed37a1 (patch) | |
tree | afe597cf004f191288999d8efad785b42833809d /src/mongo/db/clientcursor.cpp | |
parent | ed58b0dfe564253067b4cab11ab75477b7e48388 (diff) | |
download | mongo-7349ba70a0e68627dc322113c561afe3a9ed37a1.tar.gz |
SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache
Diffstat (limited to 'src/mongo/db/clientcursor.cpp')
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 512 |
1 files changed, 96 insertions, 416 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 643d1f4aafd..8c6ca3be192 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -34,6 +34,7 @@ #include <time.h> #include <vector> +#include "mongo/base/counter.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" @@ -55,35 +56,53 @@ namespace mongo { - ClientCursor::CCById ClientCursor::clientCursorsById; - boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) ); - long long ClientCursor::numberTimedOut = 0; - ClientCursor::RunnerSet ClientCursor::nonCachedRunners; + static Counter64 cursorStatsOpen; // gauge + static Counter64 cursorStatsOpenPinned; // gauge + static Counter64 cursorStatsOpenNoTimeout; // gauge + static Counter64 cursorStatsTimedOut; + + static ServerStatusMetricField<Counter64> dCursorStatsOpen( "cursor.open.total", + &cursorStatsOpen ); + static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned( "cursor.open.pinned", + &cursorStatsOpenPinned ); + static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout( "cursor.open.noTimeout", + &cursorStatsOpenNoTimeout ); + static ServerStatusMetricField<Counter64> dCursorStatusTimedout( "cursor.timedOut", + &cursorStatsTimedOut ); void aboutToDeleteForSharding(const StringData& ns, const Database* db, const NamespaceDetails* nsd, const DiskLoc& dl ); // from s/d_logic.h - ClientCursor::ClientCursor(Runner* runner, int qopts, const BSONObj query) { + long long ClientCursor::totalOpen() { + return cursorStatsOpen.get(); + } + + ClientCursor::ClientCursor(const Collection* collection, Runner* runner, + int qopts, const BSONObj query) + : _collection( collection ), + _countedYet( false ) { _runner.reset(runner); _ns = runner->ns(); _query = query; _queryOptions = qopts; + if ( runner->collection() ) { + invariant( collection == runner->collection() ); + } init(); } - ClientCursor::ClientCursor(const string& ns) - : _ns(ns), + ClientCursor::ClientCursor(const Collection* collection) + : _ns(collection->ns().ns()), + _collection(collection), + _countedYet( false ), _queryOptions(QueryOption_NoCursorTimeout) { - init(); } void ClientCursor::init() { - _db = cc().database(); - verify( _db ); - verify( _db->ownsNS( _ns ) ); + invariant( _collection ); isAggCursor = false; @@ -91,18 +110,20 @@ namespace mongo { _leftoverMaxTimeMicros = 0; _pinValue = 0; _pos = 0; - + Lock::assertAtLeastReadLocked(_ns); if (_queryOptions & QueryOption_NoCursorTimeout) { // cursors normally timeout after an inactivity period to prevent excess memory use // setting this prevents timeout of the cursor in question. ++_pinValue; + cursorStatsOpenNoTimeout.increment(); } - recursive_scoped_lock lock(ccmutex); - _cursorid = allocCursorId_inlock(); - clientCursorsById.insert( make_pair(_cursorid, this) ); + _cursorid = _collection->cursorCache()->registerCursor( this ); + + cursorStatsOpen.increment(); + _countedYet = true; } ClientCursor::~ClientCursor() { @@ -112,160 +133,30 @@ namespace mongo { return; } - { - recursive_scoped_lock lock(ccmutex); - clientCursorsById.erase(_cursorid); - - // defensive: - _cursorid = INVALID_CURSOR_ID; - _pos = -2; - _pinValue = 0; + if ( _countedYet ) { + _countedYet = false; + cursorStatsOpen.decrement(); + if ( _pinValue == 1 ) + cursorStatsOpenNoTimeout.decrement(); } - } - - void ClientCursor::invalidate(const StringData& ns) { - Lock::assertWriteLocked(ns); - - size_t dot = ns.find( '.' ); - verify( dot != string::npos ); - // first (and only) dot is the last char - bool isDB = dot == ns.size() - 1; - - Database *db = cc().database(); - verify(db); - verify(ns.startsWith(db->name())); - - recursive_scoped_lock cclock(ccmutex); - // Look at all active non-cached Runners. These are the runners that are in auto-yield mode - // that are not attached to the the client cursor. For example, all internal runners don't - // need to be cached -- there will be no getMore. - for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); - ++it) { - - Runner* runner = *it; - const string& runnerNS = runner->ns(); - if ( ( isDB && StringData(runnerNS).startsWith(ns) ) || ns == runnerNS ) { - runner->kill(); - } + if ( _collection ) { + // this could be null if kill() was killed + _collection->cursorCache()->deregisterCursor( this ); } - // Look at all cached ClientCursor(s). The CC may have a Runner, a Cursor, or nothing (see - // sharding_block.h). - CCById::const_iterator it = clientCursorsById.begin(); - while (it != clientCursorsById.end()) { - ClientCursor* cc = it->second; - - // Aggregation cursors don't have their lifetime bound to the underlying collection. - if (cc->isAggCursor) { - ++it; - continue; - } - - // We're only interested in cursors over one db. - if (cc->_db != db) { - ++it; - continue; - } - - // Note that a valid ClientCursor state is "no cursor no runner." This is because - // the set of active cursor IDs in ClientCursor is used as representation of query - // state. See sharding_block.h. TODO(greg,hk): Move this out. - if (NULL == cc->_runner.get()) { - ++it; - continue; - } - - bool shouldDelete = false; - - // We will only delete CCs with runners that are not actively in use. The runners that - // are actively in use are instead kill()-ed. - if (NULL != cc->_runner.get()) { - if (isDB || cc->_runner->ns() == ns) { - // If there is a pinValue >= 100, somebody is actively using the CC and we do - // not delete it. Instead we notify the holder that we killed it. The holder - // will then delete the CC. - if (cc->_pinValue >= 100) { - cc->_runner->kill(); - } - else { - // pinvalue is <100, so there is nobody actively holding the CC. We can - // safely delete it as nobody is holding the CC. - shouldDelete = true; - } - } - } - - if (shouldDelete) { - ClientCursor* toDelete = it->second; - CursorId id = toDelete->cursorid(); - delete toDelete; - // We're not following the usual paradigm of saving it, ++it, and deleting the saved - // 'it' because deleting 'it' might invalidate the next thing in clientCursorsById. - // TODO: Why? - it = clientCursorsById.upper_bound(id); - } - else { - ++it; - } - } - } - - void ClientCursor::invalidateDocument(const StringData& ns, - const NamespaceDetails* nsd, - const DiskLoc& dl, - InvalidationType type) { - // TODO: Do we need this pagefault thing still - NoPageFaultsAllowed npfa; - recursive_scoped_lock lock(ccmutex); - - Database *db = cc().database(); - verify(db); - aboutToDeleteForSharding( ns, db, nsd, dl ); - - // Check our non-cached active runner list. - for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end(); - ++it) { - - Runner* runner = *it; - if (0 == ns.compare(runner->ns())) { - runner->invalidate(dl, type); - } - } - - // TODO: This requires optimization. We walk through *all* CCs and send the delete to every - // CC open on the db we're deleting from. We could: - // 1. Map from ns to open runners, - // 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL) - // - // We could also queue invalidations somehow and have them processed later in the runner's - // read locks. - for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end(); - ++it) { - - ClientCursor* cc = it->second; - // We're only interested in cursors over one db. - if (cc->_db != db) { continue; } - if (NULL == cc->_runner.get()) { continue; } - cc->_runner->invalidate(dl, type); - } + // defensive: + _collection = NULL; + _cursorid = INVALID_CURSOR_ID; + _pos = -2; + _pinValue = 0; } - void ClientCursor::registerRunner(Runner* runner) { - recursive_scoped_lock lock(ccmutex); - // The second part of the pair returned by unordered_map::insert tells us whether the - // insert was performed or not, so we can use that to ensure that we have not been - // double registered. - const std::pair<RunnerSet::iterator, bool> result = nonCachedRunners.insert(runner); - verify(result.second); - } + void ClientCursor::kill() { + if ( _runner.get() ) + _runner->kill(); - void ClientCursor::deregisterRunner(Runner* runner) { - recursive_scoped_lock lock(ccmutex); - // unordered_set::erase returns a count of how many elements were erased, so we can - // validate that our de-registration matched an existing register call by ensuring that - // exactly one item was erased. - verify(nonCachedRunners.erase(runner) == 1); + _collection = NULL; } void yieldOrSleepFor1Microsecond() { @@ -352,238 +243,12 @@ namespace mongo { _idleAgeMillis = millis; } - void ClientCursor::idleTimeReport(unsigned millis) { - bool foundSomeToTimeout = false; - - // two passes so that we don't need to readlock unless we really do some timeouts - // we assume here that incrementing _idleAgeMillis outside readlock is ok. - { - recursive_scoped_lock lock(ccmutex); - { - unsigned sz = clientCursorsById.size(); - static time_t last; - if( sz >= 100000 ) { - if( time(0) - last > 300 ) { - last = time(0); - log() << "warning number of open cursors is very large: " << sz << endl; - } - } - } - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) { - CCById::iterator j = i; - i++; - if( j->second->shouldTimeout( millis ) ) { - foundSomeToTimeout = true; - } - } - } - - if( foundSomeToTimeout ) { - Lock::GlobalRead lk; - - recursive_scoped_lock cclock(ccmutex); - CCById::const_iterator it = clientCursorsById.begin(); - while (it != clientCursorsById.end()) { - ClientCursor* cc = it->second; - if( cc->shouldTimeout(0) ) { - numberTimedOut++; - LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns - << " idle:" << cc->idleTime() << "ms\n"; - ClientCursor* toDelete = it->second; - CursorId id = toDelete->cursorid(); - // This is what winds up removing it from the map. - delete toDelete; - it = clientCursorsById.upper_bound(id); - } - else { - ++it; - } - } - } - } - void ClientCursor::updateSlaveLocation( CurOp& curop ) { if ( _slaveReadTill.isNull() ) return; mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill ); } - void ClientCursor::appendStats( BSONObjBuilder& result ) { - recursive_scoped_lock lock(ccmutex); - result.appendNumber("totalOpen", clientCursorsById.size() ); - result.appendNumber("clientCursors_size", (int) numCursors()); - result.appendNumber("timedOut" , numberTimedOut); - unsigned pinned = 0; - unsigned notimeout = 0; - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) { - unsigned p = i->second->_pinValue; - if( p >= 100 ) - pinned++; - else if( p > 0 ) - notimeout++; - } - if( pinned ) - result.append("pinned", pinned); - if( notimeout ) - result.append("totalNoTimeout", notimeout); - } - - // - // ClientCursor creation/deletion/access. - // - - // Some statics used by allocCursorId_inlock(). - namespace { - // so we don't have to do find() which is a little slow very often. - long long cursorGenTSLast = 0; - PseudoRandom* cursorGenRandom = NULL; - } - - long long ClientCursor::allocCursorId_inlock() { - // It is important that cursor IDs not be reused within a short period of time. - if (!cursorGenRandom) { - scoped_ptr<SecureRandom> sr( SecureRandom::create() ); - cursorGenRandom = new PseudoRandom( sr->nextInt64() ); - } - - const long long ts = Listener::getElapsedTimeMillis(); - long long x; - while ( 1 ) { - x = ts << 32; - x |= cursorGenRandom->nextInt32(); - - if ( x == 0 ) { continue; } - - if ( x < 0 ) { x *= -1; } - - if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 ) - break; - } - - cursorGenTSLast = ts; - return x; - } - - // static - ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) { - CCById::iterator it = clientCursorsById.find(id); - if ( it == clientCursorsById.end() ) { - if ( warn ) { - OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id - << "' (ok after a drop)" << endl; - } - return 0; - } - return it->second; - } - - void ClientCursor::find( const string& ns , set<CursorId>& all ) { - recursive_scoped_lock lock(ccmutex); - - for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) { - if ( i->second->_ns == ns ) - all.insert( i->first ); - } - } - - // static - ClientCursor* ClientCursor::find(CursorId id, bool warn) { - recursive_scoped_lock lock(ccmutex); - ClientCursor *c = find_inlock(id, warn); - // if this asserts, your code was not thread safe - you either need to set no timeout - // for the cursor or keep a ClientCursor::Pointer in scope for it. - massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue ); - return c; - } - - void ClientCursor::_erase_inlock(ClientCursor* cursor) { - // Must not have an active ClientCursor::Pin. - massert( 16089, - str::stream() << "Cannot kill active cursor " << cursor->cursorid(), - cursor->_pinValue < 100 ); - - delete cursor; - } - - bool ClientCursor::erase(CursorId id) { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { return false; } - _erase_inlock(cursor); - return true; - } - - int ClientCursor::erase(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( erase(ids[i])) - found++; - - if ( inShutdown() ) - break; - } - return found; - } - - bool ClientCursor::eraseIfAuthorized(CursorId id) { - NamespaceString ns; - { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - audit::logKillCursorsAuthzCheck( - &cc(), - NamespaceString(), - id, - ErrorCodes::CursorNotFound); - return false; - } - ns = NamespaceString(cursor->ns()); - } - - // Can't be in a lock when checking authorization - const bool isAuthorized = cc().getAuthorizationSession()->isAuthorizedForActionsOnNamespace( - ns, ActionType::killCursors); - audit::logKillCursorsAuthzCheck( - &cc(), - ns, - id, - isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized); - if (!isAuthorized) { - return false; - } - - // It is safe to lookup the cursor again after temporarily releasing the mutex because - // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that - // the namespace associated with a cursor cannot change. - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - // Cursor was deleted in another thread since we found it earlier in this function. - return false; - } - if (ns != cursor->ns()) { - warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: " - << cursor->ns() << endl; - return false; - } - - _erase_inlock(cursor); - return true; - } - - int ClientCursor::eraseIfAuthorized(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( eraseIfAuthorized(ids[i])) - found++; - - if ( inShutdown() ) - break; - } - return found; - } - int ClientCursor::suggestYieldMicros() { int writers = 0; int readers = 0; @@ -606,42 +271,45 @@ namespace mongo { // deleted from underneath us, so we can save the pointer and ignore the ID. // - ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) { - recursive_scoped_lock lock( ClientCursor::ccmutex ); - ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true ); - if (NULL != cursor) { - uassert( 12051, "clientcursor already in use? driver problem?", - cursor->_pinValue < 100 ); - cursor->_pinValue += 100; - _cursorid = cursorid; + ClientCursorPin::ClientCursorPin( const Collection* collection, long long cursorid ) + : _cursor( NULL ) { + cursorStatsOpenPinned.increment(); + _cursor = collection->cursorCache()->find( cursorid ); + if ( _cursor ) { + uassert( 12051, + "clientcursor already in use? driver problem?", + _cursor->_pinValue < 100 ); + _cursor->_pinValue += 100; } } - ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) } + ClientCursorPin::~ClientCursorPin() { + cursorStatsOpenPinned.decrement(); + DESTRUCTOR_GUARD( release(); ); + } void ClientCursorPin::release() { - if ( _cursorid == INVALID_CURSOR_ID ) { + if ( !_cursor ) return; - } - ClientCursor *cursor = c(); - _cursorid = INVALID_CURSOR_ID; - if ( cursor ) { - verify( cursor->_pinValue >= 100 ); - cursor->_pinValue -= 100; + + invariant( _cursor->_pinValue >= 100 ); + _cursor->_pinValue -= 100; + + if ( _cursor->collection() == NULL ) { + // the ClientCursor was killed while we had it + // therefore its our responsibility to kill it + delete _cursor; + _cursor = NULL; // defensive } } void ClientCursorPin::deleteUnderlying() { - if (_cursorid == INVALID_CURSOR_ID) { - return; - } - ClientCursor *cursor = c(); - _cursorid = INVALID_CURSOR_ID; - delete cursor; + delete _cursor; + _cursor = NULL; } ClientCursor* ClientCursorPin::c() const { - return ClientCursor::find( _cursorid ); + return _cursor; } // @@ -708,9 +376,9 @@ namespace mongo { const int Secs = 4; unsigned n = 0; while ( ! inShutdown() ) { - ClientCursor::idleTimeReport( t.millisReset() ); + cursorStatsTimedOut.increment( CollectionCursorCache::timeoutCursorsGlobal( t.millisReset() ) ); sleepsecs(Secs); - if( ++n % (60/4) == 0 /*once a minute*/ ) { + if( ++n % (60/Secs) == 0 /*once a minute*/ ) { sayMemoryStatus(); } } @@ -723,14 +391,25 @@ namespace mongo { // cursorInfo command. // + void _appendCursorStats( BSONObjBuilder& b ) { + b.append( "note" , "deprecated, use server status metrics" ); + + b.appendNumber("totalOpen", cursorStatsOpen.get() ); + b.appendNumber("pinned", cursorStatsOpenPinned.get() ); + b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get() ); + + b.appendNumber("timedOut" , cursorStatsTimedOut.get()); + } + // QUESTION: Restrict to the namespace from which this command was issued? // Alternatively, make this command admin-only? + // TODO: remove this for 2.8 class CmdCursorInfo : public Command { public: CmdCursorInfo() : Command( "cursorInfo", true ) {} virtual bool slaveOk() const { return true; } virtual void help( stringstream& help ) const { - help << " example: { cursorInfo : 1 }"; + help << " example: { cursorInfo : 1 }, deprecated"; } virtual LockType locktype() const { return NONE; } virtual void addRequiredPrivileges(const std::string& dbname, @@ -742,7 +421,8 @@ namespace mongo { } bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { - ClientCursor::appendStats( result ); + _appendCursorStats( result ); + result.append( "note", "deprecated, get from serverStatus().cursors" ); return true; } } cmdCursorInfo; @@ -758,7 +438,7 @@ namespace mongo { BSONObj generateSection(const BSONElement& configElement) const { BSONObjBuilder b; - ClientCursor::appendStats( b ); + _appendCursorStats( b ); return b.obj(); } } cursorServerStats; |