From 3d3719bd3881b437f136d8ea8a8dfd22de2f3d52 Mon Sep 17 00:00:00 2001 From: Hari Khalsa Date: Mon, 5 Aug 2013 16:13:06 -0400 Subject: SERVER-10026 SERVER-10376 scrub clientcursor vigorously --- src/mongo/db/clientcursor.cpp | 1173 +++++++++++----------- src/mongo/db/clientcursor.h | 587 +++++------ src/mongo/db/commands/distinct.cpp | 45 +- src/mongo/db/commands/group.cpp | 2 +- src/mongo/db/commands/mr.cpp | 10 +- src/mongo/db/commands/pipeline_command.cpp | 2 +- src/mongo/db/dbcommands.cpp | 2 +- src/mongo/db/ops/count.cpp | 2 +- src/mongo/db/ops/query.cpp | 30 +- src/mongo/db/pipeline/document_source_cursor.cpp | 4 +- src/mongo/db/pipeline/pipeline_d.cpp | 2 +- src/mongo/db/query/cached_plan_runner.h | 10 + src/mongo/db/query/multi_plan_runner.cpp | 29 + src/mongo/db/query/multi_plan_runner.h | 4 + src/mongo/db/query/runner.h | 24 +- src/mongo/db/query/simple_plan_runner.h | 17 +- src/mongo/db/query_optimizer_internal.h | 2 +- src/mongo/db/repl/finding_start_cursor.h | 2 +- src/mongo/dbtests/cursortests.cpp | 14 +- src/mongo/dbtests/documentsourcetests.cpp | 2 +- src/mongo/dbtests/queryoptimizercursortests.cpp | 24 +- src/mongo/dbtests/querytests.cpp | 8 +- 22 files changed, 1051 insertions(+), 944 deletions(-) diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 48cdd69ed28..54bbcf97e84 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -1,26 +1,18 @@ /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -*/ - -/* clientcursor.cpp - - ClientCursor is a wrapper that represents a cursorid from our database - application's perspective. - - Cursor -- and its derived classes -- are our internal cursors. -*/ + * Copyright (C) 2008, 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include "mongo/pch.h" @@ -53,167 +45,150 @@ namespace mongo { - CCById ClientCursor::clientCursorsById; + ClientCursor::CCById ClientCursor::clientCursorsById; boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) ); long long ClientCursor::numberTimedOut = 0; - void aboutToDeleteForSharding( const StringData& ns, - const Database* db, - const NamespaceDetails* nsd, - const DiskLoc& dl ); // from s/d_logic.h + void aboutToDeleteForSharding(const StringData& ns, + const Database* db, + const NamespaceDetails* nsd, + const DiskLoc& dl ); // from s/d_logic.h - /*static*/ void ClientCursor::assertNoCursors() { - recursive_scoped_lock lock(ccmutex); - if( clientCursorsById.size() ) { - log() << "ERROR clientcursors exist but should not at this point" << endl; - ClientCursor *cc = clientCursorsById.begin()->second; - log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl; - clientCursorsById.clear(); - verify(false); - } + ClientCursor::ClientCursor(int qopts, const shared_ptr& c, const string& ns, + BSONObj query) : _ns(ns), _query(query), _c(c), + _yieldSometimesTracker(128, 10) { + init(qopts); + _doingDeletes = false; } + ClientCursor::ClientCursor(int qopts, Runner* runner, const string& ns, BSONObj query) + : _ns(ns), _query(query), _runner(runner), _yieldSometimesTracker(128, 10) { + init(qopts); + } - void ClientCursor::setLastLoc_inlock(DiskLoc L) { - verify( _pos != -2 ); // defensive - see ~ClientCursor + void ClientCursor::init(int qopts) { + _db = cc().database(); + verify( _db ); + verify( _db->ownsNS( _ns ) ); - if ( L == _lastLoc ) - return; + _queryOptions = qopts; + _idleAgeMillis = 0; + _leftoverMaxTimeMicros = 0; + _pinValue = 0; + _pos = 0; + + Lock::assertAtLeastReadLocked(_ns); - CCByLoc& bl = _db->ccByLoc(); + if (qopts & QueryOption_NoCursorTimeout) { + // cursors normally timeout after an inactivity period to prevent excess memory use + // setting this prevents timeout of the cursor in question. + ++_pinValue; + } + recursive_scoped_lock lock(ccmutex); + _cursorid = allocCursorId_inlock(); + clientCursorsById.insert( make_pair(_cursorid, this) ); + } - if ( !_lastLoc.isNull() ) { - bl.erase( ByLocKey( _lastLoc, _cursorid ) ); + ClientCursor::~ClientCursor() { + if( _pos == -2 ) { + // defensive: destructor called twice + wassert(false); + return; } - if ( !L.isNull() ) - bl[ByLocKey(L,_cursorid)] = this; - _lastLoc = L; - } + { + recursive_scoped_lock lock(ccmutex); + if (NULL != _c.get()) { + // Removes 'this' from bylocation map + setLastLoc_inlock( DiskLoc() ); + } + + clientCursorsById.erase(_cursorid); - /* ------------------------------------------- */ + // defensive: + _cursorid = INVALID_CURSOR_ID; + _pos = -2; + _pinValue = 0; + } + } - /* must call this when a btree node is updated */ - //void removedKey(const DiskLoc& btreeLoc, int keyPos) { - //} + // static + void ClientCursor::assertNoCursors() { + recursive_scoped_lock lock(ccmutex); + if (clientCursorsById.size() > 0) { + log() << "ERROR clientcursors exist but should not at this point" << endl; + ClientCursor *cc = clientCursorsById.begin()->second; + log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl; + clientCursorsById.clear(); + verify(false); + } + } - // ns is either a full namespace or "dbname." when invalidating for a whole db void ClientCursor::invalidate(const char *ns) { Lock::assertWriteLocked(ns); int len = strlen(ns); const char* dot = strchr(ns, '.'); - verify( len > 0 && dot); - - bool isDB = (dot == &ns[len-1]); // first (and only) dot is the last char + verify(len > 0 && dot); - { - //cout << "\nTEMP invalidate " << ns << endl; - Database *db = cc().database(); - verify(db); - verify( str::startsWith(ns, db->name()) ); - - for( LockedIterator i; i.ok(); ) { - ClientCursor *cc = i.current(); - - bool shouldDelete = false; - if (cc->c()->shouldDestroyOnNSDeletion() && cc->_db == db) { - if (isDB) { - // already checked that db matched above - dassert( str::startsWith(cc->_ns.c_str(), ns) ); - shouldDelete = true; - } - else { - if ( str::equals(cc->_ns.c_str(), ns) ) - shouldDelete = true; - } - } + // first (and only) dot is the last char + bool isDB = (dot == &ns[len-1]); - if ( shouldDelete ) { - i.deleteAndAdvance(); - } - else { - i.advance(); - } - } + Database *db = cc().database(); + verify(db); + verify(str::startsWith(ns, db->name())); - /* - note : we can't iterate byloc because clientcursors may exist with a loc of null in which case - they are not in the map. perhaps they should not exist though in the future? something to - change??? - - CCByLoc& bl = db->ccByLoc; - for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); ++i ) { - ClientCursor *cc = i->second; - if ( strncmp(ns, cc->ns.c_str(), len) == 0 ) { - verify( cc->_db == db ); - toDelete.push_back(i->second); - } - }*/ + recursive_scoped_lock cclock(ccmutex); + CCById::const_iterator it = clientCursorsById.begin(); + while (it != clientCursorsById.end()) { + ClientCursor* cc = it->second; - /*cout << "TEMP after invalidate " << endl; - for( auto i = clientCursorsById.begin(); i != clientCursorsById.end(); ++i ) { - cout << " " << i->second->ns << endl; + // We're only interested in cursors over one db. + if (cc->_db != db) { + ++it; + continue; } - cout << "TEMP after invalidate done" << endl;*/ - } - } - - /* note called outside of locks (other than ccmutex) so care must be exercised */ - bool ClientCursor::shouldTimeout( unsigned millis ) { - _idleAgeMillis += millis; - return _idleAgeMillis > 600000 && _pinValue == 0; - } - - /* called every 4 seconds. millis is amount of idle time passed since the last call -- could be zero */ - void ClientCursor::idleTimeReport(unsigned millis) { - bool foundSomeToTimeout = false; - // two passes so that we don't need to readlock unless we really do some timeouts - // we assume here that incrementing _idleAgeMillis outside readlock is ok. - { - recursive_scoped_lock lock(ccmutex); - { - unsigned sz = clientCursorsById.size(); - static time_t last; - if( sz >= 100000 ) { - if( time(0) - last > 300 ) { - last = time(0); - log() << "warning number of open cursors is very large: " << sz << endl; - } - } + bool shouldDelete = false; + if (NULL != cc->_runner.get()) { + shouldDelete = true; } - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) { - CCById::iterator j = i; - i++; - if( j->second->shouldTimeout( millis ) ) { - foundSomeToTimeout = true; - } - } - } - - if( foundSomeToTimeout ) { - Lock::GlobalRead lk; - for( LockedIterator i; i.ok(); ) { - ClientCursor *cc = i.current(); - if( cc->shouldTimeout(0) ) { - numberTimedOut++; - LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns - << " idle:" << cc->idleTime() << "ms\n"; - i.deleteAndAdvance(); + // Begin cursor-only + else if (cc->c()->shouldDestroyOnNSDeletion()) { + if (isDB) { + // already checked that db matched above + dassert(str::startsWith(cc->_ns.c_str(), ns)); + shouldDelete = true; } else { - i.advance(); + if (str::equals(cc->_ns.c_str(), ns)) { + shouldDelete = true; + } } } + // End cursor-only + + if (shouldDelete) { + ClientCursor* toDelete = it->second; + CursorId id = toDelete->cursorid(); + delete toDelete; + // We're not following the usual paradigm of saving it, ++it, and deleting the saved + // 'it' because deleting 'it' might invalidate the next thing in clientCursorsById. + // TODO: Why? + it = clientCursorsById.upper_bound(id); + } + else { + ++it; + } } } /* must call this on a delete so we clean up the cursors. */ - void ClientCursor::aboutToDelete( const StringData& ns, - const NamespaceDetails* nsd, - const DiskLoc& dl ) - { + void ClientCursor::aboutToDelete(const StringData& ns, + const NamespaceDetails* nsd, + const DiskLoc& dl) { + // Begin cursor-only NoPageFaultsAllowed npfa; + // End cursor-only recursive_scoped_lock lock(ccmutex); @@ -222,6 +197,24 @@ namespace mongo { aboutToDeleteForSharding( ns, db, nsd, dl ); + // TODO: This requires optimization. We walk through *all* CCs and send the delete to every + // CC open on the db we're deleting from. We could: + // 1. Map from ns to open runners, + // 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL) + // + // We could also queue invalidations somehow and have them processed later in the runner's + // read locks. + for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end(); + ++it) { + + ClientCursor* cc = it->second; + // We're only interested in cursors over one db. + if (cc->_db != db) { continue; } + if (NULL == cc->_runner.get()) { continue; } + cc->_runner->invalidate(dl); + } + + // Begin cursor-only CCByLoc& bl = db->ccByLoc(); CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl)); CCByLoc::iterator stop = bl.upper_bound(ByLocKey::max(dl)); @@ -293,197 +286,338 @@ namespace mongo { } cc->updateLocation(); } + // End cursor-only } - void ClientCursor::LockedIterator::deleteAndAdvance() { - ClientCursor *cc = current(); - CursorId id = cc->cursorid(); - delete cc; - _i = clientCursorsById.upper_bound( id ); + void yieldOrSleepFor1Microsecond() { +#ifdef _WIN32 + SwitchToThread(); +#elif defined(__linux__) + pthread_yield(); +#else + sleepmicros(1); +#endif } - - ClientCursor::ClientCursor(int queryOptions, const shared_ptr& c, const string& ns, BSONObj query ) : - _ns(ns), _db( cc().database() ), - _c(c), _pos(0), - _query(query), _queryOptions(queryOptions), - _idleAgeMillis(0), _leftoverMaxTimeMicros(0), _pinValue(0), - _doingDeletes(false), _yieldSometimesTracker(128,10) { - Lock::assertAtLeastReadLocked(ns); + void ClientCursor::staticYield(int micros, const StringData& ns, Record* rec) { + bool haveReadLock = Lock::isReadLocked(); - verify( _db ); - verify( _db->ownsNS( _ns ) ); - if( queryOptions & QueryOption_NoCursorTimeout ) - noTimeout(); - recursive_scoped_lock lock(ccmutex); - _cursorid = allocCursorId_inlock(); - clientCursorsById.insert( make_pair(_cursorid, this) ); + killCurrentOp.checkForInterrupt( false ); + { + auto_ptr lk; + if ( rec ) { + // need to lock this else rec->touch won't be safe file could disappear + lk.reset( new LockMongoFilesShared() ); + } - if ( ! _c->modifiedKeys() ) { - // store index information so we can decide if we can - // get something out of the index key rather than full object - - int x = 0; - BSONObjIterator i( _c->indexKeyPattern() ); - while ( i.more() ) { - BSONElement e = i.next(); - if ( e.isNumber() ) { - // only want basic index fields, not "2d" etc - _indexedFields[e.fieldName()] = x; + dbtempreleasecond unlock; + if ( unlock.unlocked() ) { + if ( haveReadLock ) { + // This sleep helps reader threads yield to writer threads. + // Without this, the underlying reader/writer lock implementations + // are not sufficiently writer-greedy. +#ifdef _WIN32 + SwitchToThread(); +#else + if ( micros == 0 ) { + yieldOrSleepFor1Microsecond(); + } + else { + sleepmicros(1); + } +#endif + } + else { + if ( micros == -1 ) { + micros = Client::recommendedYieldMicros(); + } + else if ( micros == 0 ) { + yieldOrSleepFor1Microsecond(); + } + else if ( micros > 0 ) { + sleepmicros( micros ); + } } - x++; + } - } + else if ( Listener::getTimeTracker() == 0 ) { + // we aren't running a server, so likely a repair, so don't complain + } + else { + CurOp * c = cc().curop(); + while ( c->parent() ) + c = c->parent(); + warning() << "ClientCursor::yield can't unlock b/c of recursive lock" + << " ns: " << ns + << " top: " << c->info() + << endl; + } + + if ( rec ) + rec->touch(); + lk.reset(0); // need to release this before dbtempreleasecond + } } + // + // Timing and timeouts + // - ClientCursor::~ClientCursor() { - if( _pos == -2 ) { - // defensive: destructor called twice - wassert(false); - return; - } + bool ClientCursor::shouldTimeout(unsigned millis) { + _idleAgeMillis += millis; + return _idleAgeMillis > 600000 && _pinValue == 0; + } + void ClientCursor::idleTimeReport(unsigned millis) { + bool foundSomeToTimeout = false; + + // two passes so that we don't need to readlock unless we really do some timeouts + // we assume here that incrementing _idleAgeMillis outside readlock is ok. { recursive_scoped_lock lock(ccmutex); - setLastLoc_inlock( DiskLoc() ); // removes us from bylocation multimap - clientCursorsById.erase(_cursorid); + { + unsigned sz = clientCursorsById.size(); + static time_t last; + if( sz >= 100000 ) { + if( time(0) - last > 300 ) { + last = time(0); + log() << "warning number of open cursors is very large: " << sz << endl; + } + } + } + for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) { + CCById::iterator j = i; + i++; + if( j->second->shouldTimeout( millis ) ) { + foundSomeToTimeout = true; + } + } + } - // defensive: - _cursorid = INVALID_CURSOR_ID; - _pos = -2; - _pinValue = 0; + if( foundSomeToTimeout ) { + Lock::GlobalRead lk; + + recursive_scoped_lock cclock(ccmutex); + CCById::const_iterator it = clientCursorsById.begin(); + while (it != clientCursorsById.end()) { + ClientCursor* cc = it->second; + if( cc->shouldTimeout(0) ) { + numberTimedOut++; + LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns + << " idle:" << cc->idleTime() << "ms\n"; + ClientCursor* toDelete = it->second; + CursorId id = toDelete->cursorid(); + // This is what winds up removing it from the map. + delete toDelete; + it = clientCursorsById.upper_bound(id); + } + else { + ++it; + } + } } } - bool ClientCursor::getFieldsDotted( const string& name, BSONElementSet &ret, BSONObj& holder ) { + void ClientCursor::storeOpForSlave( DiskLoc last ) { + if ( ! ( _queryOptions & QueryOption_OplogReplay )) + return; - map::const_iterator i = _indexedFields.find( name ); - if ( i == _indexedFields.end() ) { - current().getFieldsDotted( name , ret ); - return false; - } + if ( last.isNull() ) + return; - int x = i->second; + BSONElement e = last.obj()["ts"]; + if ( e.type() == Date || e.type() == Timestamp ) + _slaveReadTill = e._opTime(); + } - holder = currKey(); - BSONObjIterator it( holder ); - while ( x && it.more() ) { - it.next(); - x--; + void ClientCursor::updateSlaveLocation( CurOp& curop ) { + if ( _slaveReadTill.isNull() ) + return; + mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill ); + } + + void ClientCursor::appendStats( BSONObjBuilder& result ) { + recursive_scoped_lock lock(ccmutex); + result.appendNumber("totalOpen", clientCursorsById.size() ); + result.appendNumber("clientCursors_size", (int) numCursors()); + result.appendNumber("timedOut" , numberTimedOut); + unsigned pinned = 0; + unsigned notimeout = 0; + for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) { + unsigned p = i->second->_pinValue; + if( p >= 100 ) + pinned++; + else if( p > 0 ) + notimeout++; } - verify( x == 0 ); - ret.insert( it.next() ); - return true; + if( pinned ) + result.append("pinned", pinned); + if( notimeout ) + result.append("totalNoTimeout", notimeout); } - BSONElement ClientCursor::getFieldDotted( const string& name , BSONObj& holder , bool * fromKey ) { + // + // ClientCursor creation/deletion/access. + // - map::const_iterator i = _indexedFields.find( name ); - if ( i == _indexedFields.end() ) { - if ( fromKey ) - *fromKey = false; - holder = current(); - return holder.getFieldDotted( name ); - } - - int x = i->second; + // Some statics used by allocCursorId_inlock(). + namespace { + // so we don't have to do find() which is a little slow very often. + long long cursorGenTSLast = 0; + PseudoRandom* cursorGenRandom = NULL; + } - holder = currKey(); - BSONObjIterator it( holder ); - while ( x && it.more() ) { - it.next(); - x--; + long long ClientCursor::allocCursorId_inlock() { + // It is important that cursor IDs not be reused within a short period of time. + if (!cursorGenRandom) { + scoped_ptr sr( SecureRandom::create() ); + cursorGenRandom = new PseudoRandom( sr->nextInt64() ); } - verify( x == 0 ); - if ( fromKey ) - *fromKey = true; - return it.next(); - } + const long long ts = Listener::getElapsedTimeMillis(); + long long x; + while ( 1 ) { + x = ts << 32; + x |= cursorGenRandom->nextInt32(); - BSONObj ClientCursor::extractFields(const BSONObj &pattern , bool fillWithNull ) { - BSONObjBuilder b( pattern.objsize() * 2 ); + if ( x == 0 ) { continue; } - BSONObj holder; - - BSONObjIterator i( pattern ); - while ( i.more() ) { - BSONElement key = i.next(); - BSONElement value = getFieldDotted( key.fieldName() , holder ); + if ( x < 0 ) { x *= -1; } - if ( value.type() ) { - b.appendAs( value , key.fieldName() ); - continue; + if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 ) + break; + } + + cursorGenTSLast = ts; + return x; + } + + // static + ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) { + CCById::iterator it = clientCursorsById.find(id); + if ( it == clientCursorsById.end() ) { + if ( warn ) { + OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id + << "' (ok after a drop)" << endl; } + return 0; + } + return it->second; + } + + void ClientCursor::find( const string& ns , set& all ) { + recursive_scoped_lock lock(ccmutex); - if ( fillWithNull ) - b.appendNull( key.fieldName() ); - + for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) { + if ( i->second->_ns == ns ) + all.insert( i->first ); } + } - return b.obj(); + // static + ClientCursor* ClientCursor::find(CursorId id, bool warn) { + recursive_scoped_lock lock(ccmutex); + ClientCursor *c = find_inlock(id, warn); + // if this asserts, your code was not thread safe - you either need to set no timeout + // for the cursor or keep a ClientCursor::Pointer in scope for it. + massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue ); + return c; } - - BSONObj ClientCursor::extractKey( const KeyPattern& usingKeyPattern ) const { - KeyPattern currentIndex( _c->indexKeyPattern() ); - if ( usingKeyPattern.isCoveredBy( currentIndex ) && ! currentIndex.isSpecial() ){ - BSONObj currKey = _c->currKey(); - BSONObj prettyKey = currKey.replaceFieldNames( currentIndex.toBSON() ); - return usingKeyPattern.extractSingleKey( prettyKey ); + + void ClientCursor::_erase_inlock(ClientCursor* cursor) { + // Must not have an active ClientCursor::Pin. + massert( 16089, + str::stream() << "Cannot kill active cursor " << cursor->cursorid(), + cursor->_pinValue < 100 ); + + delete cursor; + } + + bool ClientCursor::erase(CursorId id) { + recursive_scoped_lock lock(ccmutex); + ClientCursor* cursor = find_inlock(id); + if (!cursor) { return false; } + _erase_inlock(cursor); + return true; + } + + int ClientCursor::erase(int n, long long *ids) { + int found = 0; + for ( int i = 0; i < n; i++ ) { + if ( erase(ids[i])) + found++; + + if ( inShutdown() ) + break; } - return usingKeyPattern.extractSingleKey( _c->current() ); + return found; } - void ClientCursor::fillQueryResultFromObj( BufBuilder &b, const MatchDetails* details ) const { - const Projection::KeyOnly *keyFieldsOnly = c()->keyFieldsOnly(); - if ( keyFieldsOnly ) { - mongo::fillQueryResultFromObj( b, 0, keyFieldsOnly->hydrate( c()->currKey() ), details ); + bool ClientCursor::eraseIfAuthorized(CursorId id) { + std::string ns; + { + recursive_scoped_lock lock(ccmutex); + ClientCursor* cursor = find_inlock(id); + if (!cursor) { + audit::logKillCursorsAuthzCheck( + &cc(), + NamespaceString(""), + id, + ErrorCodes::CursorNotFound); + return false; + } + ns = cursor->ns(); } - else { - DiskLoc loc = c()->currLoc(); - mongo::fillQueryResultFromObj( b, fields.get(), c()->current(), details, - ( ( pq && pq->showDiskLoc() ) ? &loc : 0 ) ); + + // Can't be in a lock when checking authorization + const bool isAuthorized = cc().getAuthorizationSession()->checkAuthorization( + ns, ActionType::killCursors); + audit::logKillCursorsAuthzCheck( + &cc(), + NamespaceString(ns), + id, + isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized); + if (!isAuthorized) { + return false; } - } - /* call when cursor's location changes so that we can update the - cursorsbylocation map. if you are locked and internally iterating, only - need to call when you are ready to "unlock". - */ - void ClientCursor::updateLocation() { - verify( _cursorid ); - _idleAgeMillis = 0; - _c->prepareToYield(); - DiskLoc cl = _c->refLoc(); - if ( lastLoc() == cl ) { - //log() << "info: lastloc==curloc " << ns << endl; + // It is safe to lookup the cursor again after temporarily releasing the mutex because + // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that + // the namespace associated with a cursor cannot change. + recursive_scoped_lock lock(ccmutex); + ClientCursor* cursor = find_inlock(id); + if (!cursor) { + // Cursor was deleted in another thread since we found it earlier in this function. + return false; } - else { - recursive_scoped_lock lock(ccmutex); - setLastLoc_inlock(cl); + if (cursor->ns() != ns) { + warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: " + << cursor->ns() << endl; + return false; } - } - int ClientCursor::suggestYieldMicros() { - int writers = 0; - int readers = 0; + _erase_inlock(cursor); + return true; + } - int micros = Client::recommendedYieldMicros( &writers , &readers ); + int ClientCursor::eraseIfAuthorized(int n, long long *ids) { + int found = 0; + for ( int i = 0; i < n; i++ ) { + if ( eraseIfAuthorized(ids[i])) + found++; - if ( micros > 0 && writers == 0 && Lock::isR() ) { - // we have a read lock, and only reads are coming on, so why bother unlocking - return 0; + if ( inShutdown() ) + break; } - - wassert( micros < 10000000 ); - dassert( micros < 1000001 ); - return micros; + return found; } - + + // + // Yielding that is DEPRECATED. Will be removed when we use runners and they yield internally. + // + Record* ClientCursor::_recordForYield( ClientCursor::RecordNeeds need ) { - if ( ! ok() ) return 0; @@ -513,116 +647,85 @@ namespace mongo { return rec; } - bool ClientCursor::yieldSometimes( RecordNeeds need, bool *yielded ) { - if ( yielded ) { - *yielded = false; + void ClientCursor::updateLocation() { + verify( _cursorid ); + _idleAgeMillis = 0; + // Cursor-specific + _c->prepareToYield(); + DiskLoc cl = _c->refLoc(); + if ( lastLoc() == cl ) { + //log() << "info: lastloc==curloc " << ns << endl; } - if ( ! _yieldSometimesTracker.intervalHasElapsed() ) { - Record* rec = _recordForYield( need ); - if ( rec ) { - // yield for page fault - if ( yielded ) { - *yielded = true; - } - bool res = yield( suggestYieldMicros() , rec ); - if ( res ) - _yieldSometimesTracker.resetLastTime(); - return res; - } - return true; + else { + recursive_scoped_lock lock(ccmutex); + setLastLoc_inlock(cl); } + } - int micros = suggestYieldMicros(); - if ( micros > 0 ) { - if ( yielded ) { - *yielded = true; - } - bool res = yield( micros , _recordForYield( need ) ); - if ( res ) - _yieldSometimesTracker.resetLastTime(); - return res; + void ClientCursor::setLastLoc_inlock(DiskLoc L) { + verify(NULL == _runner.get()); + verify( _pos != -2 ); // defensive - see ~ClientCursor + + if (L == _lastLoc) { return; } + CCByLoc& bl = _db->ccByLoc(); + + if (!_lastLoc.isNull()) { + bl.erase(ByLocKey(_lastLoc, _cursorid)); } - return true; - } - void yieldOrSleepFor1Microsecond() { -#ifdef _WIN32 - SwitchToThread(); -#elif defined(__linux__) - pthread_yield(); -#else - sleepmicros(1); -#endif + if (!L.isNull()) { + bl[ByLocKey(L,_cursorid)] = this; + } + + _lastLoc = L; } - void ClientCursor::staticYield( int micros , const StringData& ns , Record * rec ) { - bool haveReadLock = Lock::isReadLocked(); + bool ClientCursor::yield( int micros , Record * recordToLoad ) { + // some cursors (geo@oct2011) don't support yielding + if (!_c->supportYields()) { return true; } - killCurrentOp.checkForInterrupt( false ); - { - auto_ptr lk; - if ( rec ) { - // need to lock this else rec->touch won't be safe file could disappear - lk.reset( new LockMongoFilesShared() ); - } + YieldData data; + prepareToYield( data ); + staticYield( micros , _ns , recordToLoad ); + return ClientCursor::recoverFromYield( data ); + } - dbtempreleasecond unlock; - if ( unlock.unlocked() ) { - if ( haveReadLock ) { - // This sleep helps reader threads yield to writer threads. - // Without this, the underlying reader/writer lock implementations - // are not sufficiently writer-greedy. -#ifdef _WIN32 - SwitchToThread(); -#else - if ( micros == 0 ) { - yieldOrSleepFor1Microsecond(); - } - else { - sleepmicros(1); - } -#endif - } - else { - if ( micros == -1 ) { - micros = Client::recommendedYieldMicros(); - } - else if ( micros == 0 ) { - yieldOrSleepFor1Microsecond(); - } - else if ( micros > 0 ) { - sleepmicros( micros ); - } - } + bool ClientCursor::yieldSometimes(RecordNeeds need, bool* yielded) { + if (yielded) { *yielded = false; } + if ( ! _yieldSometimesTracker.intervalHasElapsed() ) { + Record* rec = _recordForYield( need ); + if ( rec ) { + // yield for page fault + if ( yielded ) { + *yielded = true; + } + bool res = yield( suggestYieldMicros() , rec ); + if ( res ) + _yieldSometimesTracker.resetLastTime(); + return res; } - else if ( Listener::getTimeTracker() == 0 ) { - // we aren't running a server, so likely a repair, so don't complain - } - else { - CurOp * c = cc().curop(); - while ( c->parent() ) - c = c->parent(); - warning() << "ClientCursor::yield can't unlock b/c of recursive lock" - << " ns: " << ns - << " top: " << c->info() - << endl; - } - - if ( rec ) - rec->touch(); + return true; + } - lk.reset(0); // need to release this before dbtempreleasecond + int micros = suggestYieldMicros(); + if ( micros > 0 ) { + if ( yielded ) { + *yielded = true; + } + bool res = yield( micros , _recordForYield( need ) ); + if ( res ) + _yieldSometimesTracker.resetLastTime(); + return res; } + return true; } bool ClientCursor::prepareToYield( YieldData &data ) { - if ( ! _c->supportYields() ) - return false; + if (!_c->supportYields()) { return false; } // need to store in case 'this' gets deleted data._id = _cursorid; - data._doingDeletes = _doingDeletes; _doingDeletes = false; @@ -664,131 +767,95 @@ namespace mongo { return true; } - /** @return true if cursor is still ok */ - bool ClientCursor::yield( int micros , Record * recordToLoad ) { - - if ( ! _c->supportYields() ) // so me cursors (geo@oct2011) don't support yielding - return true; - - YieldData data; - prepareToYield( data ); - staticYield( micros , _ns , recordToLoad ); - return ClientCursor::recoverFromYield( data ); - } - - namespace { - // so we don't have to do find() which is a little slow very often. - long long cursorGenTSLast = 0; - PseudoRandom* cursorGenRandom = NULL; - } + int ClientCursor::suggestYieldMicros() { + int writers = 0; + int readers = 0; - long long ClientCursor::allocCursorId_inlock() { - // It is important that cursor IDs not be reused within a short period of time. + int micros = Client::recommendedYieldMicros( &writers , &readers ); - if ( ! cursorGenRandom ) { - scoped_ptr sr( SecureRandom::create() ); - cursorGenRandom = new PseudoRandom( sr->nextInt64() ); + if ( micros > 0 && writers == 0 && Lock::isR() ) { + // we have a read lock, and only reads are coming on, so why bother unlocking + return 0; } - const long long ts = Listener::getElapsedTimeMillis(); - - long long x; - - while ( 1 ) { - x = ts << 32; - x |= cursorGenRandom->nextInt32(); - - if ( x == 0 ) - continue; + wassert( micros < 10000000 ); + dassert( micros < 1000001 ); + return micros; + } - if ( x < 0 ) - x *= -1; + // + // Pin methods + // - if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 ) - break; + ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) { + recursive_scoped_lock lock( ClientCursor::ccmutex ); + ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true ); + if (NULL != cursor) { + uassert( 12051, "clientcursor already in use? driver problem?", + cursor->_pinValue < 100 ); + cursor->_pinValue += 100; + _cursorid = cursorid; } - - cursorGenTSLast = ts; - - return x; } - void ClientCursor::storeOpForSlave( DiskLoc last ) { - if ( ! ( _queryOptions & QueryOption_OplogReplay )) - return; + ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) } - if ( last.isNull() ) + void ClientCursorPin::release() { + if ( _cursorid == INVALID_CURSOR_ID ) { return; - - BSONElement e = last.obj()["ts"]; - if ( e.type() == Date || e.type() == Timestamp ) - _slaveReadTill = e._opTime(); + } + ClientCursor *cursor = c(); + _cursorid = INVALID_CURSOR_ID; + if ( cursor ) { + verify( cursor->_pinValue >= 100 ); + cursor->_pinValue -= 100; + } } - void ClientCursor::updateSlaveLocation( CurOp& curop ) { - if ( _slaveReadTill.isNull() ) - return; - mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill ); + ClientCursor* ClientCursorPin::c() const { + return ClientCursor::find( _cursorid ); } + // + // Holder methods + // + ClientCursorHolder::ClientCursorHolder(ClientCursor *c) : _c(0), _id(INVALID_CURSOR_ID) { + reset(c); + } - void ClientCursor::appendStats( BSONObjBuilder& result ) { - recursive_scoped_lock lock(ccmutex); - result.appendNumber("totalOpen", clientCursorsById.size() ); - result.appendNumber("clientCursors_size", (int) numCursors()); - result.appendNumber("timedOut" , numberTimedOut); - unsigned pinned = 0; - unsigned notimeout = 0; - for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) { - unsigned p = i->second->_pinValue; - if( p >= 100 ) - pinned++; - else if( p > 0 ) - notimeout++; - } - if( pinned ) - result.append("pinned", pinned); - if( notimeout ) - result.append("totalNoTimeout", notimeout); + ClientCursorHolder::~ClientCursorHolder() { + DESTRUCTOR_GUARD(reset();); } - // QUESTION: Restrict to the namespace from which this command was issued? - // Alternatively, make this command admin-only? - class CmdCursorInfo : public Command { - public: - CmdCursorInfo() : Command( "cursorInfo", true ) {} - virtual bool slaveOk() const { return true; } - virtual void help( stringstream& help ) const { - help << " example: { cursorInfo : 1 }"; - } - virtual LockType locktype() const { return NONE; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector* out) { - ActionSet actions; - actions.addAction(ActionType::cursorInfo); - out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions)); + void ClientCursorHolder::reset(ClientCursor *c) { + if ( c == _c ) + return; + if ( _c ) { + // be careful in case cursor was deleted by someone else + ClientCursor::erase( _id ); } - bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { - ClientCursor::appendStats( result ); - return true; + if ( c ) { + _c = c; + _id = c->_cursorid; } - } cmdCursorInfo; - - class CursorServerStats : public ServerStatusSection { - public: - - CursorServerStats() : ServerStatusSection( "cursors" ){} - virtual bool includeByDefault() const { return true; } - - BSONObj generateSection(const BSONElement& configElement) const { - BSONObjBuilder b; - ClientCursor::appendStats( b ); - return b.obj(); + else { + _c = 0; + _id = INVALID_CURSOR_ID; } + } + ClientCursor* ClientCursorHolder::get() { return _c; } + ClientCursor * ClientCursorHolder::operator-> () { return _c; } + const ClientCursor * ClientCursorHolder::operator-> () const { return _c; } + void ClientCursorHolder::release() { + _c = 0; + _id = INVALID_CURSOR_ID; + } - } cursorServerStats; + // + // ClientCursorMonitor + // + // Used by sayMemoryStatus below. struct Mem { Mem() { res = virt = mapped = 0; } long long res; @@ -801,7 +868,9 @@ namespace mongo { } }; - /** called once a minute from killcursors thread */ + /** + * called once a minute from killcursors thread + */ void sayMemoryStatus() { static time_t last; static Mem mlast; @@ -839,7 +908,6 @@ namespace mongo { } } - /** thread for timing out old cursors */ void ClientCursorMonitor::run() { Client::initThread("clientcursormon"); Client& client = cc(); @@ -856,106 +924,57 @@ namespace mongo { client.shutdown(); } - void ClientCursor::find( const string& ns , set& all ) { - recursive_scoped_lock lock(ccmutex); - - for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) { - if ( i->second->_ns == ns ) - all.insert( i->first ); - } - } - - bool ClientCursor::_erase_inlock(ClientCursor* cursor) { - // Must not have an active ClientCursor::Pin. - massert( 16089, - str::stream() << "Cannot kill active cursor " << cursor->cursorid(), - cursor->_pinValue < 100 ); - - delete cursor; - return true; - } - - bool ClientCursor::erase(CursorId id) { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - return false; - } - - return _erase_inlock(cursor); - } + ClientCursorMonitor clientCursorMonitor; - bool ClientCursor::eraseIfAuthorized(CursorId id) { - std::string ns; - { - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - audit::logKillCursorsAuthzCheck( - &cc(), - NamespaceString(""), - id, - ErrorCodes::CursorNotFound); - return false; - } - ns = cursor->ns(); - } + // + // cursorInfo command. + // - // Can't be in a lock when checking authorization - const bool isAuthorized = cc().getAuthorizationSession()->checkAuthorization( - ns, ActionType::killCursors); - audit::logKillCursorsAuthzCheck( - &cc(), - NamespaceString(ns), - id, - isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized); - if (!isAuthorized) { - return false; + // QUESTION: Restrict to the namespace from which this command was issued? + // Alternatively, make this command admin-only? + class CmdCursorInfo : public Command { + public: + CmdCursorInfo() : Command( "cursorInfo", true ) {} + virtual bool slaveOk() const { return true; } + virtual void help( stringstream& help ) const { + help << " example: { cursorInfo : 1 }"; } - - // It is safe to lookup the cursor again after temporarily releasing the mutex because - // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that - // the namespace associated with a cursor cannot change. - recursive_scoped_lock lock(ccmutex); - ClientCursor* cursor = find_inlock(id); - if (!cursor) { - // Cursor was deleted in another thread since we found it earlier in this function. - return false; + virtual LockType locktype() const { return NONE; } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector* out) { + ActionSet actions; + actions.addAction(ActionType::cursorInfo); + out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions)); } - if (cursor->ns() != ns) { - warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: " - << cursor->ns() << endl; - return false; + bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, + bool fromRepl ) { + ClientCursor::appendStats( result ); + return true; } + } cmdCursorInfo; - return _erase_inlock(cursor); - } + // + // cursors stats. + // - int ClientCursor::erase(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( erase(ids[i])) - found++; + class CursorServerStats : public ServerStatusSection { + public: + CursorServerStats() : ServerStatusSection( "cursors" ){} + virtual bool includeByDefault() const { return true; } - if ( inShutdown() ) - break; + BSONObj generateSection(const BSONElement& configElement) const { + BSONObjBuilder b; + ClientCursor::appendStats( b ); + return b.obj(); } - return found; - } - - int ClientCursor::eraseIfAuthorized(int n, long long *ids) { - int found = 0; - for ( int i = 0; i < n; i++ ) { - if ( eraseIfAuthorized(ids[i])) - found++; + } cursorServerStats; - if ( inShutdown() ) - break; - } - return found; - } + // + // YieldLock + // - ClientCursor::YieldLock::YieldLock( ptr cc ) + ClientCursorYieldLock::ClientCursorYieldLock( ptr cc ) : _canYield(cc->_c->supportYields()) { if ( _canYield ) { @@ -965,25 +984,23 @@ namespace mongo { } - ClientCursor::YieldLock::~YieldLock() { + ClientCursorYieldLock::~ClientCursorYieldLock() { if ( _unlock ) { - warning() << "ClientCursor::YieldLock not closed properly" << endl; + warning() << "ClientCursorYieldLock not closed properly" << endl; relock(); } } - bool ClientCursor::YieldLock::stillOk() { + bool ClientCursorYieldLock::stillOk() { if ( ! _canYield ) return true; relock(); return ClientCursor::recoverFromYield( _data ); } - void ClientCursor::YieldLock::relock() { + void ClientCursorYieldLock::relock() { _unlock.reset(); } - ClientCursorMonitor clientCursorMonitor; - } // namespace mongo diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 12a7ff9c5c3..4f739a126b3 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -1,26 +1,18 @@ -/* clientcursor.h */ - /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -*/ - -/* Cursor -- and its derived classes -- are our internal cursors. - - ClientCursor is a wrapper that represents a cursorid from our database - application's perspective. -*/ + * Copyright (C) 2008 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #pragma once @@ -36,6 +28,7 @@ #include "mongo/db/keypattern.h" #include "mongo/db/matcher.h" #include "mongo/db/projection.h" +#include "mongo/db/query/runner.h" #include "mongo/s/collection_metadata.h" #include "mongo/util/net/message.h" #include "mongo/util/background.h" @@ -44,139 +37,149 @@ namespace mongo { typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; - class Cursor; /* internal server cursor base class */ class ClientCursor; class ParsedQuery; - /* todo: make this map be per connection. this will prevent cursor hijacking security attacks perhaps. - * ERH: 9/2010 this may not work since some drivers send getMore over a different connection - */ - typedef map CCById; - - extern BSONObj id_obj; - + /** + * ClientCursor is a wrapper that represents a cursorid from our database application's + * perspective. + */ class ClientCursor : private boost::noncopyable { - friend class CmdCursorInfo; public: - static void assertNoCursors(); + ClientCursor(int qopts, const shared_ptr& c, const string& ns, + BSONObj query = BSONObj()); - /* use this to assure we don't in the background time out cursor while it is under use. - if you are using noTimeout() already, there is no risk anyway. - Further, this mechanism guards against two getMore requests on the same cursor executing - at the same time - which might be bad. That should never happen, but if a client driver - had a bug, it could (or perhaps some sort of attack situation). - */ - class Pin : boost::noncopyable { - public: - Pin( long long cursorid ) : - _cursorid( INVALID_CURSOR_ID ) { - recursive_scoped_lock lock( ccmutex ); - ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true ); - if ( cursor ) { - uassert( 12051, "clientcursor already in use? driver problem?", - cursor->_pinValue < 100 ); - cursor->_pinValue += 100; - _cursorid = cursorid; - } - } - void release() { - if ( _cursorid == INVALID_CURSOR_ID ) { - return; - } - ClientCursor *cursor = c(); - _cursorid = INVALID_CURSOR_ID; - if ( cursor ) { - verify( cursor->_pinValue >= 100 ); - cursor->_pinValue -= 100; - } - } - ~Pin() { DESTRUCTOR_GUARD( release(); ) } - ClientCursor *c() const { return ClientCursor::find( _cursorid ); } - private: - CursorId _cursorid; - }; + ClientCursor(int qopts, Runner* runner, const string& ns, BSONObj query = BSONObj()); - /** Assures safe and reliable cleanup of a ClientCursor. */ - class Holder : boost::noncopyable { - public: - Holder( ClientCursor *c = 0 ) : - _c( 0 ), - _id( INVALID_CURSOR_ID ) { - reset( c ); - } - void reset( ClientCursor *c = 0 ) { - if ( c == _c ) - return; - if ( _c ) { - // be careful in case cursor was deleted by someone else - ClientCursor::erase( _id ); - } - if ( c ) { - _c = c; - _id = c->_cursorid; - } - else { - _c = 0; - _id = INVALID_CURSOR_ID; - } - } - ~Holder() { - DESTRUCTOR_GUARD ( reset(); ); - } - ClientCursor* get() { return _c; } - operator bool() { return _c; } - ClientCursor * operator-> () { return _c; } - const ClientCursor * operator-> () const { return _c; } - /** Release ownership of the ClientCursor. */ - void release() { - _c = 0; - _id = INVALID_CURSOR_ID; - } - private: - ClientCursor *_c; - CursorId _id; - }; + ~ClientCursor(); /** - * Iterates through all ClientCursors, under its own ccmutex lock. - * Also supports deletion on the fly. + * Assert that there are no open cursors. + * Called from DatabaseHolder::closeAll. */ - class LockedIterator : boost::noncopyable { - public: - LockedIterator() : _lock( ccmutex ), _i( clientCursorsById.begin() ) {} - bool ok() const { return _i != clientCursorsById.end(); } - ClientCursor *current() const { return _i->second; } - void advance() { ++_i; } - /** - * Delete 'current' and advance. Properly handles cascading deletions that may occur - * when one ClientCursor is directly deleted. - */ - void deleteAndAdvance(); - private: - recursive_scoped_lock _lock; - CCById::const_iterator _i; - }; - - ClientCursor(int queryOptions, const shared_ptr& c, const string& ns, BSONObj query = BSONObj() ); - - ~ClientCursor(); + static void assertNoCursors(); - // *************** basic accessors ******************* + // + // Basic accessors + // CursorId cursorid() const { return _cursorid; } string ns() const { return _ns; } Database * db() const { return _db; } - const BSONObj& query() const { return _query; } - int queryOptions() const { return _queryOptions; } - DiskLoc lastLoc() const { return _lastLoc; } + // + // Invalidation of DiskLocs and dropping of namespaces + // - /* Get rid of cursors for namespaces 'ns'. When dropping a db, ns is "dbname." - Used by drop, dropIndexes, dropDatabase. - */ + /** + * Get rid of cursors for namespaces 'ns'. When dropping a db, ns is "dbname." Used by drop, + * dropIndexes, dropDatabase. + */ static void invalidate(const char *ns); /** + * Called when the provided DiskLoc is about to change state via a deletion or an update. + * All runners/cursors that might be using that DiskLoc must adapt. + */ + static void aboutToDelete(const StringData& ns, + const NamespaceDetails* nsd, + const DiskLoc& dl); + + // + // Yielding. + // + + static void staticYield(int micros, const StringData& ns, Record* rec ); + + // + // Static methods about all ClientCursors TODO: Document. + // + + static void appendStats( BSONObjBuilder& result ); + + // + // ClientCursor creation/deletion. + // + + static unsigned numCursors() { return clientCursorsById.size(); } + static void find( const string& ns , set& all ); + static ClientCursor* find(CursorId id, bool warn = true); + + // Same as erase but checks to make sure this thread has read permission on the cursor's + // namespace. This should be called when receiving killCursors from a client. This should + // not be called when ccmutex is held. + static int eraseIfAuthorized(int n, long long* ids); + static bool eraseIfAuthorized(CursorId id); + + /** + * @return number of cursors found + */ + static int erase(int n, long long* ids); + + /** + * Deletes the cursor with the provided @param 'id' if one exists. + * @throw if the cursor with the provided id is pinned. + * This does not do any auth checking and should be used only when erasing cursors as part + * of cleaning up internal operations. + */ + static bool erase(CursorId id); + + // + // Timing and timeouts + // + + /** + * called every 4 seconds. millis is amount of idle time passed since the last call -- + * could be zero + */ + static void idleTimeReport(unsigned millis); + + /** + * @param millis amount of idle passed time since last call + * note called outside of locks (other than ccmutex) so care must be exercised + */ + bool shouldTimeout( unsigned millis ); + unsigned idleTime() const { return _idleAgeMillis; } + + uint64_t getLeftoverMaxTimeMicros() const { return _leftoverMaxTimeMicros; } + void setLeftoverMaxTimeMicros( uint64_t leftoverMaxTimeMicros ) { + _leftoverMaxTimeMicros = leftoverMaxTimeMicros; + } + + // + // Sharding-specific data. TODO: Document. + // + + // future getMore. + void setCollMetadata( CollectionMetadataPtr metadata ){ _collMetadata = metadata; } + CollectionMetadataPtr getCollMetadata(){ return _collMetadata; } + + // + // Replication-related stuff. TODO: Document and clean. + // + + void storeOpForSlave( DiskLoc last ); + void updateSlaveLocation( CurOp& curop ); + void slaveReadTill( const OpTime& t ) { _slaveReadTill = t; } + /** Just for testing. */ + OpTime getSlaveReadTill() const { return _slaveReadTill; } + + // + // Query-specific functionality that may be adapted for the Runner. + // + + // Used by ops/query.cpp to stash how many results hvae been returned by a query. + int pos() const { return _pos; } + void incPos(int n) { _pos += n; } + void setPos(int n) { _pos = n; } + + // + // Yielding that is DEPRECATED. Will be removed when we use runners and they yield + // internally. + // + + /** + * DEPRECATED * @param microsToSleep -1 : ask client * 0 : pthread_yield or equivilant * >0 : sleep for that amount @@ -202,248 +205,208 @@ namespace mongo { * @return same as yield() */ bool yieldSometimes( RecordNeeds need, bool *yielded = 0 ); - - static int suggestYieldMicros(); - static void staticYield( int micros , const StringData& ns , Record * rec ); - struct YieldData { CursorId _id; bool _doingDeletes; }; bool prepareToYield( YieldData &data ); static bool recoverFromYield( const YieldData &data ); - - struct YieldLock : boost::noncopyable { - - explicit YieldLock( ptr cc ); - - ~YieldLock(); - - /** - * @return if the cursor is still ok - * if it is, we also relock - */ - bool stillOk(); - - void relock(); + static int suggestYieldMicros(); - private: - const bool _canYield; - YieldData _data; - scoped_ptr _unlock; - }; + // + // Cursor-only DEPRECATED methods. + // - // --- some pass through helpers for Cursor --- + // Only used by ops/query.cpp, which will stop using them when queries are answered only by + // a runner. + const BSONObj& query() const { return _query; } + int queryOptions() const { return _queryOptions; } + shared_ptr pq; + // This one is used also by pipeline/document_source_cursor.cpp + shared_ptr fields; // which fields query wants returned + DiskLoc lastLoc() const { return _lastLoc; } Cursor* c() const { return _c.get(); } - int pos() const { return _pos; } - - void incPos( int n ) { _pos += n; } // TODO: this is bad - void setPos( int n ) { _pos = n; } // TODO : this is bad too - - BSONObj indexKeyPattern() { return _c->indexKeyPattern(); } - bool modifiedKeys() const { return _c->modifiedKeys(); } - bool isMultiKey() const { return _c->isMultiKey(); } - bool ok() { return _c->ok(); } bool advance() { return _c->advance(); } BSONObj current() { return _c->current(); } DiskLoc currLoc() { return _c->currLoc(); } BSONObj currKey() const { return _c->currKey(); } - /** - * same as BSONObj::getFieldsDotted - * if it can be retrieved from key, it is - * @param holder keeps the currKey in scope by keeping a reference to it here. generally you'll want - * holder and ret to destruct about the same time. - * @return if this was retrieved from key - */ - bool getFieldsDotted( const string& name, BSONElementSet &ret, BSONObj& holder ); - - /** - * same as BSONObj::getFieldDotted - * if it can be retrieved from key, it is - * @return if this was retrieved from key - */ - BSONElement getFieldDotted( const string& name , BSONObj& holder , bool * fromKey = 0 ) ; - - /** extract items from object which match a pattern object. - * e.g., if pattern is { x : 1, y : 1 }, builds an object with - * x and y elements of this object, if they are present. - * returns elements with original field names - * NOTE: copied from BSONObj::extractFields - */ - BSONObj extractFields(const BSONObj &pattern , bool fillWithNull = false) ; - - /** Extract elements from the object this cursor currently points to, using the expression - * specified in KeyPattern. Will use a covered index if the one in this cursor is usable. - * TODO: there are some cases where a covered index could be used but is not, for instance - * if both this index and the keyPattern are {a : "hashed"} - */ - BSONObj extractKey( const KeyPattern& usingKeyPattern ) const; - - void fillQueryResultFromObj( BufBuilder &b, const MatchDetails* details = NULL ) const; - bool currentIsDup() { return _c->getsetdup( _c->currLoc() ); } - bool currentMatches() { if ( ! _c->matcher() ) return true; return _c->matcher()->matchesCurrent( _c.get() ); } - void setCollMetadata( CollectionMetadataPtr metadata ){ _collMetadata = metadata; } - CollectionMetadataPtr getCollMetadata(){ return _collMetadata; } + void setDoingDeletes( bool doingDeletes ) {_doingDeletes = doingDeletes; } private: - void setLastLoc_inlock(DiskLoc); + friend class ClientCursorHolder; + friend class ClientCursorPin; + friend class ClientCursorYieldLock; + friend class CmdCursorInfo; - static ClientCursor* find_inlock(CursorId id, bool warn = true) { - CCById::iterator it = clientCursorsById.find(id); - if ( it == clientCursorsById.end() ) { - if ( warn ) { - OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id - << "' (ok after a drop)" << endl; - } - return 0; - } - return it->second; - } - - /* call when cursor's location changes so that we can update the - cursorsbylocation map. if you are locked and internally iterating, only - need to call when you are ready to "unlock". - */ - void updateLocation(); + // A map from the CursorId to the ClientCursor behind it. + // TODO: Consider making this per-connection. + typedef map CCById; + static CCById clientCursorsById; - public: - static ClientCursor* find(CursorId id, bool warn = true) { - recursive_scoped_lock lock(ccmutex); - ClientCursor *c = find_inlock(id, warn); - // if this asserts, your code was not thread safe - you either need to set no timeout - // for the cursor or keep a ClientCursor::Pointer in scope for it. - massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue ); - return c; - } + // How many cursors have timed out? + static long long numberTimedOut; + + // This must be held when modifying any static member. + static boost::recursive_mutex& ccmutex; /** - * Deletes the cursor with the provided @param 'id' if one exists. - * @throw if the cursor with the provided id is pinned. - * This does not do any auth checking and should be used only when erasing cursors as part - * of cleaning up internal operations. + * Initialization common between Cursor and Runner. + * TODO: Remove when we're all-runner. */ - static bool erase(CursorId id); - // Same as erase but checks to make sure this thread has read permission on the cursor's - // namespace. This should be called when receiving killCursors from a client. This should - // not be called when ccmutex is held. - static bool eraseIfAuthorized(CursorId id); + void init(int qopts); /** - * @return number of cursors found + * Allocates a new CursorId. + * Called from init(...). Assumes ccmutex held. */ - static int erase(int n, long long* ids); - static int eraseIfAuthorized(int n, long long* ids); - - void mayUpgradeStorage() { - /* if ( !ids_.get() ) - return; - stringstream ss; - ss << ns << "." << cursorid; - ids_->mayUpgradeStorage( ss.str() );*/ - } + static CursorId allocCursorId_inlock(); /** - * @param millis amount of idle passed time since last call + * Find the ClientCursor with the provided ID. Optionally warn if it's not found. + * Assumes ccmutex is held. */ - bool shouldTimeout( unsigned millis ); - void storeOpForSlave( DiskLoc last ); - void updateSlaveLocation( CurOp& curop ); + static ClientCursor* find_inlock(CursorId id, bool warn = true); - unsigned idleTime() const { return _idleAgeMillis; } + /** + * Delete the ClientCursor with the provided ID. masserts if the cursor is pinned. + */ + static void _erase_inlock(ClientCursor* cursor); - uint64_t getLeftoverMaxTimeMicros() const { return _leftoverMaxTimeMicros; } - void setLeftoverMaxTimeMicros( uint64_t leftoverMaxTimeMicros ) { - _leftoverMaxTimeMicros = leftoverMaxTimeMicros; - } + // + // ClientCursor-specific data, independent of the underlying execution type. + // - void setDoingDeletes( bool doingDeletes ) {_doingDeletes = doingDeletes; } + // The ID of the ClientCursor. + CursorId _cursorid; - void slaveReadTill( const OpTime& t ) { _slaveReadTill = t; } - - /** Just for testing. */ - OpTime getSlaveReadTill() const { return _slaveReadTill; } + // A variable indicating the state of the ClientCursor. Possible values: + // 0: Normal behavior. May time out. + // 1: No timing out of this ClientCursor. + // 100: Currently in use (via ClientCursorPin). + unsigned _pinValue; - public: // static methods + // The namespace we're operating on. + const string _ns; - static void idleTimeReport(unsigned millis); + // The database we're operating on. + Database* _db; - static void appendStats( BSONObjBuilder& result ); - static unsigned numCursors() { return clientCursorsById.size(); } - static void aboutToDelete( const StringData& ns, - const NamespaceDetails* nsd, - const DiskLoc& dl ); - static void find( const string& ns , set& all ); + // How many objects have been returned by the find() so far? + int _pos; + // The query that prompted this ClientCursor. Only used for debugging. + const BSONObj _query; - private: // methods + // See the QueryOptions enum in dbclient.h + int _queryOptions; - // cursors normally timeout after an inactivity period to prevent excess memory use - // setting this prevents timeout of the cursor in question. - void noTimeout() { _pinValue++; } + // TODO: document better. + OpTime _slaveReadTill; - Record* _recordForYield( RecordNeeds need ); - static bool _erase_inlock(ClientCursor* cursor); + // How long has the cursor been idle? + unsigned _idleAgeMillis; - private: + // TODO: Document. + uint64_t _leftoverMaxTimeMicros; - CursorId _cursorid; + // TODO: document better. Somehow used in sharding. + CollectionMetadataPtr _collMetadata; - const string _ns; - Database * _db; + // + // The underlying execution machinery. + // - const shared_ptr _c; - map _indexedFields; // map from indexed field to offset in key object - int _pos; // # objects into the cursor so far + // The new world: a runner. + scoped_ptr _runner; - const BSONObj _query; // used for logging diags only; optional in constructor - int _queryOptions; // see enum QueryOptions dbclient.h + // + // Cursor-only private data and methods. DEPRECATED. + // - OpTime _slaveReadTill; + // The old world: a cursor. DEPRECATED. + const shared_ptr _c; + + /** + * call when cursor's location changes so that we can update the cursorsbylocation map. if + * you are locked and internally iterating, only need to call when you are ready to + * "unlock". + */ + void updateLocation(); + void setLastLoc_inlock(DiskLoc); + Record* _recordForYield( RecordNeeds need ); DiskLoc _lastLoc; // use getter and setter not this (important) - unsigned _idleAgeMillis; // how long has the cursor been around, relative to server idle time - - // For time-limited operations ($maxTimeMS): time remaining for future getmore operations - // on the cursor. 0 if original operation had no time limit set. - uint64_t _leftoverMaxTimeMicros; - - /* 0 = normal - 1 = no timeout allowed - 100 = in use (pinned) -- see Pointer class - */ - unsigned _pinValue; - bool _doingDeletes; // when true we are the delete and aboutToDelete shouldn't manipulate us - ElapsedTracker _yieldSometimesTracker; - CollectionMetadataPtr _collMetadata; + // TODO: This will be moved into the runner. + ElapsedTracker _yieldSometimesTracker; + }; + /** + * use this to assure we don't in the background time out cursor while it is under use. if you + * are using noTimeout() already, there is no risk anyway. Further, this mechanism guards + * against two getMore requests on the same cursor executing at the same time - which might be + * bad. That should never happen, but if a client driver had a bug, it could (or perhaps some + * sort of attack situation). + */ + class ClientCursorPin : boost::noncopyable { public: - shared_ptr pq; - shared_ptr fields; // which fields query wants returned - - private: // static members - - static CCById clientCursorsById; - static long long numberTimedOut; - static boost::recursive_mutex& ccmutex; // must use this for all statics above! - static CursorId allocCursorId_inlock(); + ClientCursorPin( long long cursorid ); + ~ClientCursorPin(); + void release(); + ClientCursor *c() const; + private: + CursorId _cursorid; + }; + /** Assures safe and reliable cleanup of a ClientCursor. */ + class ClientCursorHolder : boost::noncopyable { + public: + ClientCursorHolder( ClientCursor *c = 0 ); + ~ClientCursorHolder(); + void reset( ClientCursor *c = 0 ); + ClientCursor* get(); + operator bool() { return _c; } + ClientCursor * operator-> (); + const ClientCursor * operator-> () const; + /** Release ownership of the ClientCursor. */ + void release(); + private: + ClientCursor *_c; + CursorId _id; }; + /** thread for timing out old cursors */ class ClientCursorMonitor : public BackgroundJob { public: string name() const { return "ClientCursorMonitor"; } void run(); }; + + struct ClientCursorYieldLock : boost::noncopyable { + explicit ClientCursorYieldLock( ptr cc ); + ~ClientCursorYieldLock(); + + /** + * @return if the cursor is still ok + * if it is, we also relock + */ + bool stillOk(); + void relock(); + + private: + const bool _canYield; + ClientCursor::YieldData _data; + scoped_ptr _unlock; + }; } // namespace mongo diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index b3fc69062aa..f9303964e71 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -110,6 +110,24 @@ namespace mongo { auto_ptr cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns)); + // map from indexed field to offset in key object + map indexedFields; + if (!cursor->modifiedKeys()) { + // store index information so we can decide if we can + // get something out of the index key rather than full object + + int x = 0; + BSONObjIterator i( cursor->indexKeyPattern() ); + while ( i.more() ) { + BSONElement e = i.next(); + if ( e.isNumber() ) { + // only want basic index fields, not "2d" etc + indexedFields[e.fieldName()] = x; + } + x++; + } + } + while ( cursor->ok() ) { nscanned++; bool loadedRecord = false; @@ -119,7 +137,8 @@ namespace mongo { BSONObj holder; BSONElementSet temp; - loadedRecord = ! cc->getFieldsDotted( key , temp, holder ); + // Try to get the record from the key fields. + loadedRecord = !getFieldsDotted(indexedFields, cursor, key, temp, holder); for ( BSONElementSet::iterator i=temp.begin(); i!=temp.end(); ++i ) { BSONElement e = *i; @@ -166,6 +185,30 @@ namespace mongo { return true; } + private: + /** + * Tries to get the fields from the key first, then the object if the keys don't have it. + */ + bool getFieldsDotted(const map& indexedFields, shared_ptr cursor, + const string& name, BSONElementSet &ret, BSONObj& holder) { + map::const_iterator i = indexedFields.find( name ); + if ( i == indexedFields.end() ) { + cursor->current().getFieldsDotted( name , ret ); + return false; + } + + int x = i->second; + + holder = cursor->currKey(); + BSONObjIterator it( holder ); + while ( x && it.more() ) { + it.next(); + x--; + } + verify( x == 0 ); + ret.insert( it.next() ); + return true; + } } distinctCmd; diff --git a/src/mongo/db/commands/group.cpp b/src/mongo/db/commands/group.cpp index 25680882c3e..979fcd4f9aa 100644 --- a/src/mongo/db/commands/group.cpp +++ b/src/mongo/db/commands/group.cpp @@ -107,7 +107,7 @@ namespace mongo { list blah; shared_ptr cursor = getOptimizedCursor(ns.c_str() , query); - ClientCursor::Holder ccPointer( new ClientCursor( QueryOption_NoCursorTimeout, cursor, + ClientCursorHolder ccPointer( new ClientCursor( QueryOption_NoCursorTimeout, cursor, ns ) ); while ( cursor->ok() ) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 6d8ddd6fe7b..3e66c57e04d 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -868,7 +868,7 @@ namespace mongo { shared_ptr temp = getBestGuessCursor(_config.incLong.c_str(), BSONObj(), sortKey); - ClientCursor::Holder cursor(new ClientCursor(QueryOption_NoCursorTimeout, + ClientCursorHolder cursor(new ClientCursor(QueryOption_NoCursorTimeout, temp, _config.incLong.c_str())); // iterate over all sorted objects @@ -890,7 +890,7 @@ namespace mongo { continue; } - ClientCursor::YieldLock yield (cursor.get()); + ClientCursorYieldLock yield (cursor.get()); try { // reduce a finalize array @@ -1123,7 +1123,7 @@ namespace mongo { uassert( 16149 , "cannot run map reduce without the js engine", globalScriptEngine ); - ClientCursor::Holder holdCursor; + ClientCursorHolder holdCursor; CollectionMetadataPtr collMetadata; { @@ -1206,7 +1206,7 @@ namespace mongo { config.filter, config.sort ); uassert( 16052, str::stream() << "could not create cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, temp.get() ); - ClientCursor::Holder cursor(new ClientCursor(QueryOption_NoCursorTimeout, + ClientCursorHolder cursor(new ClientCursor(QueryOption_NoCursorTimeout, temp, config.ns.c_str())); uassert( 16053, str::stream() << "could not create client cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, cursor.get() ); @@ -1251,7 +1251,7 @@ namespace mongo { num++; if ( num % 100 == 0 ) { // try to yield lock regularly - ClientCursor::YieldLock yield (cursor.get()); + ClientCursorYieldLock yield (cursor.get()); Timer t; // check if map needs to be dumped to disk state.checkSize(); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index fbac8eb328d..e4ad99ca0d1 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -67,7 +67,7 @@ namespace mongo { : 101; // same as query // Using limited cursor API that ignores many edge cases. Should be sufficient for commands. - ClientCursor::Pin pin(id); + ClientCursorPin pin(id); ClientCursor* cursor = pin.c(); massert(16958, "Cursor shouldn't have been deleted", diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 9a2465c44a8..1357ae347fd 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -989,7 +989,7 @@ namespace mongo { int len; const char * data = owned["data"].binDataClean( len ); - ClientCursor::YieldLock yield (cc.get()); + ClientCursorYieldLock yield (cc.get()); try { md5_append( &st , (const md5_byte_t*)(data) , len ); n++; diff --git a/src/mongo/db/ops/count.cpp b/src/mongo/db/ops/count.cpp index a22a6f67d41..6868206198d 100644 --- a/src/mongo/db/ops/count.cpp +++ b/src/mongo/db/ops/count.cpp @@ -77,7 +77,7 @@ namespace mongo { } shared_ptr cursor = getOptimizedCursor( ns, query, BSONObj(), _countPlanPolicies ); - ClientCursor::Holder ccPointer; + ClientCursorHolder ccPointer; ElapsedTracker timeToStartYielding( 256, 20 ); try { while( cursor->ok() ) { diff --git a/src/mongo/db/ops/query.cpp b/src/mongo/db/ops/query.cpp index 5ea41e38c4e..41a390d397e 100644 --- a/src/mongo/db/ops/query.cpp +++ b/src/mongo/db/ops/query.cpp @@ -97,6 +97,17 @@ namespace mongo { return qr; } + static BSONObj extractKey(Cursor* c, const KeyPattern& usingKeyPattern ) { + KeyPattern currentIndex( c->indexKeyPattern() ); + if ( usingKeyPattern.isCoveredBy( currentIndex ) && ! currentIndex.isSpecial() ){ + BSONObj currKey = c->currKey(); + BSONObj prettyKey = currKey.replaceFieldNames( currentIndex.toBSON() ); + return usingKeyPattern.extractSingleKey( prettyKey ); + } + + return usingKeyPattern.extractSingleKey( c->current() ); + } + QueryResult* processGetMore(const char* ns, int ntoreturn, long long cursorid, @@ -118,7 +129,7 @@ namespace mongo { // call this readlocked so state can't change replVerifyReadsOk(); - ClientCursor::Pin p(cursorid); + ClientCursorPin p(cursorid); ClientCursor *cc = p.c(); @@ -200,7 +211,7 @@ namespace mongo { // in some cases (clone collection) there won't be a matcher if ( !c->currentMatches( &details ) ) { } - else if ( metadata && !metadata->keyBelongsToMe( cc->extractKey( keyPattern ) ) ) { + else if ( metadata && !metadata->keyBelongsToMe( extractKey(c, keyPattern ) ) ) { LOG(2) << "cursor skipping document in un-owned chunk: " << c->current() << endl; } @@ -212,7 +223,17 @@ namespace mongo { last = c->currLoc(); n++; - cc->fillQueryResultFromObj( b, &details ); + // Fill out the fields requested by the query. + const Projection::KeyOnly *keyFieldsOnly = c->keyFieldsOnly(); + if ( keyFieldsOnly ) { + fillQueryResultFromObj( b, 0, keyFieldsOnly->hydrate( + c->currKey() ), &details ); + } + else { + DiskLoc loc = c->currLoc(); + fillQueryResultFromObj( b, cc->fields.get(), c->current(), &details, + ( ( cc->pq.get() && cc->pq->showDiskLoc() ) ? &loc : 0 ) ); + } if ( ( ntoreturn && n >= ntoreturn ) || b.len() > MaxBytesToReturnToClientAtOnce ) { c->advance(); @@ -240,7 +261,6 @@ namespace mongo { else { cc->c()->noteLocation(); } - cc->mayUpgradeStorage(); cc->storeOpForSlave( last ); exhaust = cc->queryOptions() & QueryOption_Exhaust; @@ -740,7 +760,7 @@ namespace mongo { ( QueryResponseBuilder::make( pq, cursor, queryPlan, oldPlan ) ); bool saveClientCursor = false; OpTime slaveReadTill; - ClientCursor::Holder ccPointer( new ClientCursor( QueryOption_NoCursorTimeout, cursor, + ClientCursorHolder ccPointer( new ClientCursor( QueryOption_NoCursorTimeout, cursor, ns ) ); for( ; cursor->ok(); cursor->advance() ) { diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index a93fd0032dc..cdc3cc0070d 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -108,7 +108,7 @@ namespace mongo { Lock::DBRead lk(ns); Client::Context ctx(ns, dbpath, /*doVersion=*/false); - ClientCursor::Pin pin(_cursorId); + ClientCursorPin pin(_cursorId); ClientCursor* cursor = pin.c(); uassert(16950, "Cursor deleted. Was the collection or database dropped?", @@ -272,7 +272,7 @@ namespace mongo { _projection.reset(new Projection); _projection->init(projection); - ClientCursor::Pin pin (_cursorId); + ClientCursorPin pin (_cursorId); verify(pin.c()); pin.c()->fields = _projection; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 9d57250e381..f1fa7f2f22c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -197,7 +197,7 @@ namespace mongo { } // Now wrap the Cursor in ClientCursor - ClientCursor::Holder cursor( + ClientCursorHolder cursor( new ClientCursor(QueryOption_NoCursorTimeout, pCursor, fullName)); CursorId cursorId = cursor->cursorid(); massert(16917, str::stream() diff --git a/src/mongo/db/query/cached_plan_runner.h b/src/mongo/db/query/cached_plan_runner.h index 23faee9412c..c034b3659fa 100644 --- a/src/mongo/db/query/cached_plan_runner.h +++ b/src/mongo/db/query/cached_plan_runner.h @@ -72,6 +72,16 @@ namespace mongo { return false; } + /** + * TODO: Explicit yielding is deprecated pending a ClientCursor rewrite. + */ + virtual void yield() { _runner->yield(); } + virtual void unYield() { _runner->unYield(); } + + virtual void invalidate(const DiskLoc& dl) { + _runner->invalidate(dl); + } + private: scoped_ptr _canonicalQuery; scoped_ptr _cachedQuery; diff --git a/src/mongo/db/query/multi_plan_runner.cpp b/src/mongo/db/query/multi_plan_runner.cpp index 97e643ab667..fc4eb33adf7 100644 --- a/src/mongo/db/query/multi_plan_runner.cpp +++ b/src/mongo/db/query/multi_plan_runner.cpp @@ -38,6 +38,35 @@ namespace mongo { _candidates.push_back(CandidatePlan(solution, root, ws)); } + void MultiPlanRunner::yield() { + if (NULL != _bestPlanRunner) { + _bestPlanRunner->yield(); + } + else { + yieldAllPlans(); + } + } + + void MultiPlanRunner::unYield() { + if (NULL != _bestPlanRunner) { + _bestPlanRunner->yield(); + } + else { + unyieldAllPlans(); + } + } + + void MultiPlanRunner::invalidate(const DiskLoc& dl) { + if (NULL != _bestPlanRunner) { + _bestPlanRunner->invalidate(dl); + } + else { + for (size_t i = 0; i < _candidates.size(); ++i) { + _candidates[i].root->invalidate(dl); + } + } + } + bool MultiPlanRunner::getNext(BSONObj* objOut) { verify(!_failure); diff --git a/src/mongo/db/query/multi_plan_runner.h b/src/mongo/db/query/multi_plan_runner.h index d365a843e0b..c96121987e2 100644 --- a/src/mongo/db/query/multi_plan_runner.h +++ b/src/mongo/db/query/multi_plan_runner.h @@ -67,6 +67,10 @@ namespace mongo { */ bool pickBestPlan(size_t* out); + virtual void yield(); + virtual void unYield(); + virtual void invalidate(const DiskLoc& dl); + private: /** * Have all our candidate plans do something. diff --git a/src/mongo/db/query/runner.h b/src/mongo/db/query/runner.h index 148a956f861..43ad7500ac8 100644 --- a/src/mongo/db/query/runner.h +++ b/src/mongo/db/query/runner.h @@ -19,21 +19,31 @@ namespace mongo { /** - * A runner runs a query. All yielding, fetching, and other query details are taken care of by - * the runner. - * - * TODO: Do we want to expand the interface to allow yielding? IE, if update is running a query - * and updating at the same time? + * A runner runs a query. */ class Runner { public: + virtual ~Runner() { } + /** * Get the next result from the query. */ - // TODO: This is inefficient and should probably append to some message buffer or similar. virtual bool getNext(BSONObj* objOut) = 0; - virtual ~Runner() { } + /** + * Inform the runner that the provided DiskLoc is about to disappear (or change entirely). + * The runner then takes any actions required to continue operating correctly, including + * broadcasting the invalidation request to the PlanStage tree being run. + * + * Called from ClientCursor::aboutToDelete. + */ + virtual void invalidate(const DiskLoc& dl) = 0; + + /** + * TODO: Kill these once yielding is controlled inside of a runner. + */ + virtual void yield() = 0; + virtual void unYield() = 0; }; } // namespace mongo diff --git a/src/mongo/db/query/simple_plan_runner.h b/src/mongo/db/query/simple_plan_runner.h index f1ca6902736..d83f286d30b 100644 --- a/src/mongo/db/query/simple_plan_runner.h +++ b/src/mongo/db/query/simple_plan_runner.h @@ -47,10 +47,18 @@ namespace mongo { _root.reset(root); } - PlanStageStats* getStats() { - return _root->getStats(); + /** + * TODO: Explicit yielding is deprecated pending a ClientCursor rewrite. + */ + virtual void yield() { _root->prepareToYield(); } + virtual void unYield() { _root->recoverFromYield(); } + + virtual void invalidate(const DiskLoc& dl) { + _root->invalidate(dl); } + PlanStageStats* getStats() { return _root->getStats(); } + bool getNext(BSONObj* objOut) { for (;;) { WorkingSetID id; @@ -65,7 +73,8 @@ namespace mongo { return true; } else if (code == PlanStage::NEED_TIME) { - // TODO: Occasionally yield. For now, we run until we get another result. + // TODO: Runners can't yield themselves until we rework ClientCursor to not + // delete itself. } else if (PlanStage::NEED_FETCH == code) { // id has a loc and refers to an obj we need to fetch. @@ -78,6 +87,8 @@ namespace mongo { // Actually bring record into memory. Record* record = member->loc.rec(); + // TODO: We would yield ourselves here and call ClientCursor::staticYield once + // we rework ClientCursor to not delete itself. record->touch(); // Record should be in memory now. Log if it's not. diff --git a/src/mongo/db/query_optimizer_internal.h b/src/mongo/db/query_optimizer_internal.h index d3326d8d980..0f09cb46484 100644 --- a/src/mongo/db/query_optimizer_internal.h +++ b/src/mongo/db/query_optimizer_internal.h @@ -242,7 +242,7 @@ namespace mongo { bool _mustAdvance; bool _capped; shared_ptr _c; - ClientCursor::Holder _cc; + ClientCursorHolder _cc; DiskLoc _posBeforeYield; ClientCursor::YieldData _yieldData; const QueryPlanSelectionPolicy& _selectionPolicy; diff --git a/src/mongo/db/repl/finding_start_cursor.h b/src/mongo/db/repl/finding_start_cursor.h index e493a36c51a..2b1bc2268f1 100644 --- a/src/mongo/db/repl/finding_start_cursor.h +++ b/src/mongo/db/repl/finding_start_cursor.h @@ -95,7 +95,7 @@ namespace mongo { FindingStartMode _findingStartMode; auto_ptr< CoveredIndexMatcher > _matcher; Timer _findingStartTimer; - ClientCursor::Holder _findingStartCursor; + ClientCursorHolder _findingStartCursor; shared_ptr _c; ClientCursor::YieldData _yieldData; static int _initialTimeout; diff --git a/src/mongo/dbtests/cursortests.cpp b/src/mongo/dbtests/cursortests.cpp index 382d7c7e9d9..1fae3b47014 100644 --- a/src/mongo/dbtests/cursortests.cpp +++ b/src/mongo/dbtests/cursortests.cpp @@ -579,7 +579,7 @@ namespace CursorTests { } boost::shared_ptr cursor; - ClientCursor::Holder clientCursor; + ClientCursorHolder clientCursor; ClientCursor::YieldData yieldData; { @@ -629,7 +629,7 @@ namespace CursorTests { while( !isExpectedIterate( cursor->current() ) ) { ASSERT( cursor->advance() ); } - ClientCursor::Holder clientCursor( new ClientCursor( QueryOption_NoCursorTimeout, + ClientCursorHolder clientCursor( new ClientCursor( QueryOption_NoCursorTimeout, cursor, ns() ) ); DiskLoc loc = clientCursor->currLoc(); ASSERT( !loc.isNull() ); @@ -712,7 +712,7 @@ namespace CursorTests { private: Client::WriteContext _ctx; boost::shared_ptr _cursor; - ClientCursor::Holder _clientCursor; + ClientCursorHolder _clientCursor; }; /** Pin pins a ClientCursor over its lifetime. */ @@ -721,7 +721,7 @@ namespace CursorTests { void run() { assertNotPinned(); { - ClientCursor::Pin pin( cursorid() ); + ClientCursorPin pin( cursorid() ); assertPinned(); ASSERT_THROWS( erase(), AssertionException ); } @@ -744,12 +744,12 @@ namespace CursorTests { class PinTwice : public Base { public: void run() { - ClientCursor::Pin pin( cursorid() ); + ClientCursorPin pin( cursorid() ); ASSERT_THROWS( pinCursor(), AssertionException ); } private: void pinCursor() const { - ClientCursor::Pin pin( cursorid() ); + ClientCursorPin pin( cursorid() ); } }; @@ -757,7 +757,7 @@ namespace CursorTests { class CursorDeleted : public Base { public: void run() { - ClientCursor::Pin pin( cursorid() ); + ClientCursorPin pin( cursorid() ); ASSERT( pin.c() ); // Delete the pinned cursor. ClientCursor::invalidate( ns() ); diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index ef0aec52a8d..f235e05a2b3 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -111,7 +111,7 @@ namespace DocumentSourceTests { void createSource() { Client::ReadContext ctx (ns); boost::shared_ptr cursor = theDataFileMgr.findAll( ns ); - ClientCursor::Holder cc( + ClientCursorHolder cc( new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns)); CursorId cursorId = cc->cursorid(); cc->c()->prepareToYield(); diff --git a/src/mongo/dbtests/queryoptimizercursortests.cpp b/src/mongo/dbtests/queryoptimizercursortests.cpp index 32bc7f7787b..e9dec89d023 100644 --- a/src/mongo/dbtests/queryoptimizercursortests.cpp +++ b/src/mongo/dbtests/queryoptimizercursortests.cpp @@ -2417,7 +2417,7 @@ namespace QueryOptimizerCursorTests { setQueryOptimizerCursor( BSON( "x" << GT << 0 ) ); ASSERT_EQUALS( 1, current().getIntField( "x" ) ); - ClientCursor::Holder p( new ClientCursor( QueryOption_NoCursorTimeout, c(), ns() ) ); + ClientCursorHolder p( new ClientCursor( QueryOption_NoCursorTimeout, c(), ns() ) ); ClientCursor::YieldData yieldData; p->prepareToYield( yieldData ); @@ -2441,7 +2441,7 @@ namespace QueryOptimizerCursorTests { _cli.insert( ns(), BSON( "_id" << i << "x" << i ) ); } - ClientCursor::Holder p; + ClientCursorHolder p; ClientCursor::YieldData yieldData; { Lock::GlobalWrite lk; @@ -2482,7 +2482,7 @@ namespace QueryOptimizerCursorTests { Lock::GlobalWrite lk; Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( QueryOption_NoCursorTimeout, getOptimizedCursor @@ -2502,7 +2502,7 @@ namespace QueryOptimizerCursorTests { _cli.insert( ns(), BSON( "a" << 1 << "b" << 1 ) ); Lock::GlobalWrite lk; Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( 0, getOptimizedCursor @@ -2529,7 +2529,7 @@ namespace QueryOptimizerCursorTests { Lock::GlobalWrite lk; Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( 0, getOptimizedCursor @@ -2565,7 +2565,7 @@ namespace QueryOptimizerCursorTests { Lock::GlobalWrite lk; Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( QueryOption_NoCursorTimeout, getOptimizedCursor @@ -2579,7 +2579,7 @@ namespace QueryOptimizerCursorTests { } // Check that the btree plan was picked. - ASSERT_EQUALS( BSON( "a" << 1 ), p->indexKeyPattern() ); + ASSERT_EQUALS( BSON( "a" << 1 ), p->c()->indexKeyPattern() ); // Yield the cursor. ClientCursor::YieldData yieldData; @@ -2626,7 +2626,7 @@ namespace QueryOptimizerCursorTests { Lock::GlobalWrite lk; Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( QueryOption_NoCursorTimeout, getOptimizedCursor @@ -2640,7 +2640,7 @@ namespace QueryOptimizerCursorTests { } // Check the key pattern. - ASSERT_EQUALS( BSON( "a" << 1 ), p->indexKeyPattern() ); + ASSERT_EQUALS( BSON( "a" << 1 ), p->c()->indexKeyPattern() ); // Yield the cursor. ClientCursor::YieldData yieldData; @@ -2674,7 +2674,7 @@ namespace QueryOptimizerCursorTests { { Lock::DBWrite lk(ns()); Client::Context ctx( ns() ); - ClientCursor::Holder p + ClientCursorHolder p ( new ClientCursor ( QueryOption_NoCursorTimeout, getOptimizedCursor @@ -4738,7 +4738,7 @@ namespace QueryOptimizerCursorTests { // The cursor was yielded once. ASSERT_EQUALS( 1, _explain[ "nYields" ].number() ); } - mongo::ClientCursor::Holder _clientCursor; + mongo::ClientCursorHolder _clientCursor; }; /** nYields reporting of a QueryOptimizerCursor before it enters takeover mode. */ @@ -4815,7 +4815,7 @@ namespace QueryOptimizerCursorTests { ASSERT( 0 < plans.next().Obj()[ "n" ].number() ); } } - mongo::ClientCursor::Holder _clientCursor; + mongo::ClientCursorHolder _clientCursor; }; } // namespace Explain diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 2e7e7e59216..b3a96279e59 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -205,7 +205,7 @@ namespace QueryTests { // Check internal server handoff to getmore. Lock::DBWrite lk(ns); Client::Context ctx( ns ); - ClientCursor::Pin clientCursor( cursorId ); + ClientCursorPin clientCursor( cursorId ); ASSERT( clientCursor.c()->pq ); ASSERT_EQUALS( 2, clientCursor.c()->pq->getNumToReturn() ); ASSERT_EQUALS( 2, clientCursor.c()->pos() ); @@ -552,7 +552,7 @@ namespace QueryTests { ASSERT_EQUALS( two, c->next()["ts"].Date() ); long long cursorId = c->getCursorId(); - ClientCursor::Pin clientCursor( cursorId ); + ClientCursorPin clientCursor( cursorId ); ASSERT_EQUALS( three.millis, clientCursor.c()->getSlaveReadTill().asDate() ); } }; @@ -1386,7 +1386,7 @@ namespace QueryTests { ClientCursor *clientCursor = 0; { - ClientCursor::Pin clientCursorPointer( cursorId ); + ClientCursorPin clientCursorPointer( cursorId ); clientCursor = clientCursorPointer.c(); // clientCursorPointer destructor unpins the cursor. } @@ -1423,7 +1423,7 @@ namespace QueryTests { { Client::WriteContext ctx( ns() ); - ClientCursor::Pin pinCursor( cursorId ); + ClientCursorPin pinCursor( cursorId ); ASSERT_THROWS( client().killCursor( cursorId ), MsgAssertionException ); string expectedAssertion = -- cgit v1.2.1