summaryrefslogtreecommitdiff
path: root/src/mongo/db/clientcursor.cpp
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
committerEliot Horowitz <eliot@10gen.com>2014-01-24 15:47:07 -0500
commit7349ba70a0e68627dc322113c561afe3a9ed37a1 (patch)
treeafe597cf004f191288999d8efad785b42833809d /src/mongo/db/clientcursor.cpp
parented58b0dfe564253067b4cab11ab75477b7e48388 (diff)
downloadmongo-7349ba70a0e68627dc322113c561afe3a9ed37a1.tar.gz
SERVER-12392: Move cursor/runner cache into Collection lifecycle via CollectionCursorCache
Diffstat (limited to 'src/mongo/db/clientcursor.cpp')
-rw-r--r--src/mongo/db/clientcursor.cpp512
1 files changed, 96 insertions, 416 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 643d1f4aafd..8c6ca3be192 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -34,6 +34,7 @@
#include <time.h>
#include <vector>
+#include "mongo/base/counter.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_set.h"
@@ -55,35 +56,53 @@
namespace mongo {
- ClientCursor::CCById ClientCursor::clientCursorsById;
- boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) );
- long long ClientCursor::numberTimedOut = 0;
- ClientCursor::RunnerSet ClientCursor::nonCachedRunners;
+ static Counter64 cursorStatsOpen; // gauge
+ static Counter64 cursorStatsOpenPinned; // gauge
+ static Counter64 cursorStatsOpenNoTimeout; // gauge
+ static Counter64 cursorStatsTimedOut;
+
+ static ServerStatusMetricField<Counter64> dCursorStatsOpen( "cursor.open.total",
+ &cursorStatsOpen );
+ static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned( "cursor.open.pinned",
+ &cursorStatsOpenPinned );
+ static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout( "cursor.open.noTimeout",
+ &cursorStatsOpenNoTimeout );
+ static ServerStatusMetricField<Counter64> dCursorStatusTimedout( "cursor.timedOut",
+ &cursorStatsTimedOut );
void aboutToDeleteForSharding(const StringData& ns,
const Database* db,
const NamespaceDetails* nsd,
const DiskLoc& dl ); // from s/d_logic.h
- ClientCursor::ClientCursor(Runner* runner, int qopts, const BSONObj query) {
+ long long ClientCursor::totalOpen() {
+ return cursorStatsOpen.get();
+ }
+
+ ClientCursor::ClientCursor(const Collection* collection, Runner* runner,
+ int qopts, const BSONObj query)
+ : _collection( collection ),
+ _countedYet( false ) {
_runner.reset(runner);
_ns = runner->ns();
_query = query;
_queryOptions = qopts;
+ if ( runner->collection() ) {
+ invariant( collection == runner->collection() );
+ }
init();
}
- ClientCursor::ClientCursor(const string& ns)
- : _ns(ns),
+ ClientCursor::ClientCursor(const Collection* collection)
+ : _ns(collection->ns().ns()),
+ _collection(collection),
+ _countedYet( false ),
_queryOptions(QueryOption_NoCursorTimeout) {
-
init();
}
void ClientCursor::init() {
- _db = cc().database();
- verify( _db );
- verify( _db->ownsNS( _ns ) );
+ invariant( _collection );
isAggCursor = false;
@@ -91,18 +110,20 @@ namespace mongo {
_leftoverMaxTimeMicros = 0;
_pinValue = 0;
_pos = 0;
-
+
Lock::assertAtLeastReadLocked(_ns);
if (_queryOptions & QueryOption_NoCursorTimeout) {
// cursors normally timeout after an inactivity period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
++_pinValue;
+ cursorStatsOpenNoTimeout.increment();
}
- recursive_scoped_lock lock(ccmutex);
- _cursorid = allocCursorId_inlock();
- clientCursorsById.insert( make_pair(_cursorid, this) );
+ _cursorid = _collection->cursorCache()->registerCursor( this );
+
+ cursorStatsOpen.increment();
+ _countedYet = true;
}
ClientCursor::~ClientCursor() {
@@ -112,160 +133,30 @@ namespace mongo {
return;
}
- {
- recursive_scoped_lock lock(ccmutex);
- clientCursorsById.erase(_cursorid);
-
- // defensive:
- _cursorid = INVALID_CURSOR_ID;
- _pos = -2;
- _pinValue = 0;
+ if ( _countedYet ) {
+ _countedYet = false;
+ cursorStatsOpen.decrement();
+ if ( _pinValue == 1 )
+ cursorStatsOpenNoTimeout.decrement();
}
- }
-
- void ClientCursor::invalidate(const StringData& ns) {
- Lock::assertWriteLocked(ns);
-
- size_t dot = ns.find( '.' );
- verify( dot != string::npos );
- // first (and only) dot is the last char
- bool isDB = dot == ns.size() - 1;
-
- Database *db = cc().database();
- verify(db);
- verify(ns.startsWith(db->name()));
-
- recursive_scoped_lock cclock(ccmutex);
- // Look at all active non-cached Runners. These are the runners that are in auto-yield mode
- // that are not attached to the the client cursor. For example, all internal runners don't
- // need to be cached -- there will be no getMore.
- for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
- ++it) {
-
- Runner* runner = *it;
- const string& runnerNS = runner->ns();
- if ( ( isDB && StringData(runnerNS).startsWith(ns) ) || ns == runnerNS ) {
- runner->kill();
- }
+ if ( _collection ) {
+ // this could be null if kill() was killed
+ _collection->cursorCache()->deregisterCursor( this );
}
- // Look at all cached ClientCursor(s). The CC may have a Runner, a Cursor, or nothing (see
- // sharding_block.h).
- CCById::const_iterator it = clientCursorsById.begin();
- while (it != clientCursorsById.end()) {
- ClientCursor* cc = it->second;
-
- // Aggregation cursors don't have their lifetime bound to the underlying collection.
- if (cc->isAggCursor) {
- ++it;
- continue;
- }
-
- // We're only interested in cursors over one db.
- if (cc->_db != db) {
- ++it;
- continue;
- }
-
- // Note that a valid ClientCursor state is "no cursor no runner." This is because
- // the set of active cursor IDs in ClientCursor is used as representation of query
- // state. See sharding_block.h. TODO(greg,hk): Move this out.
- if (NULL == cc->_runner.get()) {
- ++it;
- continue;
- }
-
- bool shouldDelete = false;
-
- // We will only delete CCs with runners that are not actively in use. The runners that
- // are actively in use are instead kill()-ed.
- if (NULL != cc->_runner.get()) {
- if (isDB || cc->_runner->ns() == ns) {
- // If there is a pinValue >= 100, somebody is actively using the CC and we do
- // not delete it. Instead we notify the holder that we killed it. The holder
- // will then delete the CC.
- if (cc->_pinValue >= 100) {
- cc->_runner->kill();
- }
- else {
- // pinvalue is <100, so there is nobody actively holding the CC. We can
- // safely delete it as nobody is holding the CC.
- shouldDelete = true;
- }
- }
- }
-
- if (shouldDelete) {
- ClientCursor* toDelete = it->second;
- CursorId id = toDelete->cursorid();
- delete toDelete;
- // We're not following the usual paradigm of saving it, ++it, and deleting the saved
- // 'it' because deleting 'it' might invalidate the next thing in clientCursorsById.
- // TODO: Why?
- it = clientCursorsById.upper_bound(id);
- }
- else {
- ++it;
- }
- }
- }
-
- void ClientCursor::invalidateDocument(const StringData& ns,
- const NamespaceDetails* nsd,
- const DiskLoc& dl,
- InvalidationType type) {
- // TODO: Do we need this pagefault thing still
- NoPageFaultsAllowed npfa;
- recursive_scoped_lock lock(ccmutex);
-
- Database *db = cc().database();
- verify(db);
- aboutToDeleteForSharding( ns, db, nsd, dl );
-
- // Check our non-cached active runner list.
- for (RunnerSet::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
- ++it) {
-
- Runner* runner = *it;
- if (0 == ns.compare(runner->ns())) {
- runner->invalidate(dl, type);
- }
- }
-
- // TODO: This requires optimization. We walk through *all* CCs and send the delete to every
- // CC open on the db we're deleting from. We could:
- // 1. Map from ns to open runners,
- // 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL)
- //
- // We could also queue invalidations somehow and have them processed later in the runner's
- // read locks.
- for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end();
- ++it) {
-
- ClientCursor* cc = it->second;
- // We're only interested in cursors over one db.
- if (cc->_db != db) { continue; }
- if (NULL == cc->_runner.get()) { continue; }
- cc->_runner->invalidate(dl, type);
- }
+ // defensive:
+ _collection = NULL;
+ _cursorid = INVALID_CURSOR_ID;
+ _pos = -2;
+ _pinValue = 0;
}
- void ClientCursor::registerRunner(Runner* runner) {
- recursive_scoped_lock lock(ccmutex);
- // The second part of the pair returned by unordered_map::insert tells us whether the
- // insert was performed or not, so we can use that to ensure that we have not been
- // double registered.
- const std::pair<RunnerSet::iterator, bool> result = nonCachedRunners.insert(runner);
- verify(result.second);
- }
+ void ClientCursor::kill() {
+ if ( _runner.get() )
+ _runner->kill();
- void ClientCursor::deregisterRunner(Runner* runner) {
- recursive_scoped_lock lock(ccmutex);
- // unordered_set::erase returns a count of how many elements were erased, so we can
- // validate that our de-registration matched an existing register call by ensuring that
- // exactly one item was erased.
- verify(nonCachedRunners.erase(runner) == 1);
+ _collection = NULL;
}
void yieldOrSleepFor1Microsecond() {
@@ -352,238 +243,12 @@ namespace mongo {
_idleAgeMillis = millis;
}
- void ClientCursor::idleTimeReport(unsigned millis) {
- bool foundSomeToTimeout = false;
-
- // two passes so that we don't need to readlock unless we really do some timeouts
- // we assume here that incrementing _idleAgeMillis outside readlock is ok.
- {
- recursive_scoped_lock lock(ccmutex);
- {
- unsigned sz = clientCursorsById.size();
- static time_t last;
- if( sz >= 100000 ) {
- if( time(0) - last > 300 ) {
- last = time(0);
- log() << "warning number of open cursors is very large: " << sz << endl;
- }
- }
- }
- for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) {
- CCById::iterator j = i;
- i++;
- if( j->second->shouldTimeout( millis ) ) {
- foundSomeToTimeout = true;
- }
- }
- }
-
- if( foundSomeToTimeout ) {
- Lock::GlobalRead lk;
-
- recursive_scoped_lock cclock(ccmutex);
- CCById::const_iterator it = clientCursorsById.begin();
- while (it != clientCursorsById.end()) {
- ClientCursor* cc = it->second;
- if( cc->shouldTimeout(0) ) {
- numberTimedOut++;
- LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns
- << " idle:" << cc->idleTime() << "ms\n";
- ClientCursor* toDelete = it->second;
- CursorId id = toDelete->cursorid();
- // This is what winds up removing it from the map.
- delete toDelete;
- it = clientCursorsById.upper_bound(id);
- }
- else {
- ++it;
- }
- }
- }
- }
-
void ClientCursor::updateSlaveLocation( CurOp& curop ) {
if ( _slaveReadTill.isNull() )
return;
mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill );
}
- void ClientCursor::appendStats( BSONObjBuilder& result ) {
- recursive_scoped_lock lock(ccmutex);
- result.appendNumber("totalOpen", clientCursorsById.size() );
- result.appendNumber("clientCursors_size", (int) numCursors());
- result.appendNumber("timedOut" , numberTimedOut);
- unsigned pinned = 0;
- unsigned notimeout = 0;
- for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) {
- unsigned p = i->second->_pinValue;
- if( p >= 100 )
- pinned++;
- else if( p > 0 )
- notimeout++;
- }
- if( pinned )
- result.append("pinned", pinned);
- if( notimeout )
- result.append("totalNoTimeout", notimeout);
- }
-
- //
- // ClientCursor creation/deletion/access.
- //
-
- // Some statics used by allocCursorId_inlock().
- namespace {
- // so we don't have to do find() which is a little slow very often.
- long long cursorGenTSLast = 0;
- PseudoRandom* cursorGenRandom = NULL;
- }
-
- long long ClientCursor::allocCursorId_inlock() {
- // It is important that cursor IDs not be reused within a short period of time.
- if (!cursorGenRandom) {
- scoped_ptr<SecureRandom> sr( SecureRandom::create() );
- cursorGenRandom = new PseudoRandom( sr->nextInt64() );
- }
-
- const long long ts = Listener::getElapsedTimeMillis();
- long long x;
- while ( 1 ) {
- x = ts << 32;
- x |= cursorGenRandom->nextInt32();
-
- if ( x == 0 ) { continue; }
-
- if ( x < 0 ) { x *= -1; }
-
- if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 )
- break;
- }
-
- cursorGenTSLast = ts;
- return x;
- }
-
- // static
- ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) {
- CCById::iterator it = clientCursorsById.find(id);
- if ( it == clientCursorsById.end() ) {
- if ( warn ) {
- OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id
- << "' (ok after a drop)" << endl;
- }
- return 0;
- }
- return it->second;
- }
-
- void ClientCursor::find( const string& ns , set<CursorId>& all ) {
- recursive_scoped_lock lock(ccmutex);
-
- for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) {
- if ( i->second->_ns == ns )
- all.insert( i->first );
- }
- }
-
- // static
- ClientCursor* ClientCursor::find(CursorId id, bool warn) {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor *c = find_inlock(id, warn);
- // if this asserts, your code was not thread safe - you either need to set no timeout
- // for the cursor or keep a ClientCursor::Pointer in scope for it.
- massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue );
- return c;
- }
-
- void ClientCursor::_erase_inlock(ClientCursor* cursor) {
- // Must not have an active ClientCursor::Pin.
- massert( 16089,
- str::stream() << "Cannot kill active cursor " << cursor->cursorid(),
- cursor->_pinValue < 100 );
-
- delete cursor;
- }
-
- bool ClientCursor::erase(CursorId id) {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) { return false; }
- _erase_inlock(cursor);
- return true;
- }
-
- int ClientCursor::erase(int n, long long *ids) {
- int found = 0;
- for ( int i = 0; i < n; i++ ) {
- if ( erase(ids[i]))
- found++;
-
- if ( inShutdown() )
- break;
- }
- return found;
- }
-
- bool ClientCursor::eraseIfAuthorized(CursorId id) {
- NamespaceString ns;
- {
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) {
- audit::logKillCursorsAuthzCheck(
- &cc(),
- NamespaceString(),
- id,
- ErrorCodes::CursorNotFound);
- return false;
- }
- ns = NamespaceString(cursor->ns());
- }
-
- // Can't be in a lock when checking authorization
- const bool isAuthorized = cc().getAuthorizationSession()->isAuthorizedForActionsOnNamespace(
- ns, ActionType::killCursors);
- audit::logKillCursorsAuthzCheck(
- &cc(),
- ns,
- id,
- isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized);
- if (!isAuthorized) {
- return false;
- }
-
- // It is safe to lookup the cursor again after temporarily releasing the mutex because
- // of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that
- // the namespace associated with a cursor cannot change.
- recursive_scoped_lock lock(ccmutex);
- ClientCursor* cursor = find_inlock(id);
- if (!cursor) {
- // Cursor was deleted in another thread since we found it earlier in this function.
- return false;
- }
- if (ns != cursor->ns()) {
- warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: "
- << cursor->ns() << endl;
- return false;
- }
-
- _erase_inlock(cursor);
- return true;
- }
-
- int ClientCursor::eraseIfAuthorized(int n, long long *ids) {
- int found = 0;
- for ( int i = 0; i < n; i++ ) {
- if ( eraseIfAuthorized(ids[i]))
- found++;
-
- if ( inShutdown() )
- break;
- }
- return found;
- }
-
int ClientCursor::suggestYieldMicros() {
int writers = 0;
int readers = 0;
@@ -606,42 +271,45 @@ namespace mongo {
// deleted from underneath us, so we can save the pointer and ignore the ID.
//
- ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) {
- recursive_scoped_lock lock( ClientCursor::ccmutex );
- ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true );
- if (NULL != cursor) {
- uassert( 12051, "clientcursor already in use? driver problem?",
- cursor->_pinValue < 100 );
- cursor->_pinValue += 100;
- _cursorid = cursorid;
+ ClientCursorPin::ClientCursorPin( const Collection* collection, long long cursorid )
+ : _cursor( NULL ) {
+ cursorStatsOpenPinned.increment();
+ _cursor = collection->cursorCache()->find( cursorid );
+ if ( _cursor ) {
+ uassert( 12051,
+ "clientcursor already in use? driver problem?",
+ _cursor->_pinValue < 100 );
+ _cursor->_pinValue += 100;
}
}
- ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) }
+ ClientCursorPin::~ClientCursorPin() {
+ cursorStatsOpenPinned.decrement();
+ DESTRUCTOR_GUARD( release(); );
+ }
void ClientCursorPin::release() {
- if ( _cursorid == INVALID_CURSOR_ID ) {
+ if ( !_cursor )
return;
- }
- ClientCursor *cursor = c();
- _cursorid = INVALID_CURSOR_ID;
- if ( cursor ) {
- verify( cursor->_pinValue >= 100 );
- cursor->_pinValue -= 100;
+
+ invariant( _cursor->_pinValue >= 100 );
+ _cursor->_pinValue -= 100;
+
+ if ( _cursor->collection() == NULL ) {
+ // the ClientCursor was killed while we had it
+ // therefore its our responsibility to kill it
+ delete _cursor;
+ _cursor = NULL; // defensive
}
}
void ClientCursorPin::deleteUnderlying() {
- if (_cursorid == INVALID_CURSOR_ID) {
- return;
- }
- ClientCursor *cursor = c();
- _cursorid = INVALID_CURSOR_ID;
- delete cursor;
+ delete _cursor;
+ _cursor = NULL;
}
ClientCursor* ClientCursorPin::c() const {
- return ClientCursor::find( _cursorid );
+ return _cursor;
}
//
@@ -708,9 +376,9 @@ namespace mongo {
const int Secs = 4;
unsigned n = 0;
while ( ! inShutdown() ) {
- ClientCursor::idleTimeReport( t.millisReset() );
+ cursorStatsTimedOut.increment( CollectionCursorCache::timeoutCursorsGlobal( t.millisReset() ) );
sleepsecs(Secs);
- if( ++n % (60/4) == 0 /*once a minute*/ ) {
+ if( ++n % (60/Secs) == 0 /*once a minute*/ ) {
sayMemoryStatus();
}
}
@@ -723,14 +391,25 @@ namespace mongo {
// cursorInfo command.
//
+ void _appendCursorStats( BSONObjBuilder& b ) {
+ b.append( "note" , "deprecated, use server status metrics" );
+
+ b.appendNumber("totalOpen", cursorStatsOpen.get() );
+ b.appendNumber("pinned", cursorStatsOpenPinned.get() );
+ b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get() );
+
+ b.appendNumber("timedOut" , cursorStatsTimedOut.get());
+ }
+
// QUESTION: Restrict to the namespace from which this command was issued?
// Alternatively, make this command admin-only?
+ // TODO: remove this for 2.8
class CmdCursorInfo : public Command {
public:
CmdCursorInfo() : Command( "cursorInfo", true ) {}
virtual bool slaveOk() const { return true; }
virtual void help( stringstream& help ) const {
- help << " example: { cursorInfo : 1 }";
+ help << " example: { cursorInfo : 1 }, deprecated";
}
virtual LockType locktype() const { return NONE; }
virtual void addRequiredPrivileges(const std::string& dbname,
@@ -742,7 +421,8 @@ namespace mongo {
}
bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result,
bool fromRepl ) {
- ClientCursor::appendStats( result );
+ _appendCursorStats( result );
+ result.append( "note", "deprecated, get from serverStatus().cursors" );
return true;
}
} cmdCursorInfo;
@@ -758,7 +438,7 @@ namespace mongo {
BSONObj generateSection(const BSONElement& configElement) const {
BSONObjBuilder b;
- ClientCursor::appendStats( b );
+ _appendCursorStats( b );
return b.obj();
}
} cursorServerStats;