summaryrefslogtreecommitdiff
path: root/src/mongo/db/clientcursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/clientcursor.cpp')
-rw-r--r--src/mongo/db/clientcursor.cpp563
1 files changed, 283 insertions, 280 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index c35c75dd88d..c0b61f99404 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -53,319 +53,322 @@
namespace mongo {
- using std::string;
- using std::stringstream;
-
- static Counter64 cursorStatsOpen; // gauge
- static Counter64 cursorStatsOpenPinned; // gauge
- static Counter64 cursorStatsOpenNoTimeout; // gauge
- static Counter64 cursorStatsTimedOut;
-
- static ServerStatusMetricField<Counter64> dCursorStatsOpen( "cursor.open.total",
- &cursorStatsOpen );
- static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned( "cursor.open.pinned",
- &cursorStatsOpenPinned );
- static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout( "cursor.open.noTimeout",
- &cursorStatsOpenNoTimeout );
- static ServerStatusMetricField<Counter64> dCursorStatusTimedout( "cursor.timedOut",
- &cursorStatsTimedOut );
-
- MONGO_EXPORT_SERVER_PARAMETER(cursorTimeoutMillis, int, 10 * 60 * 1000 /* 10 minutes */);
-
- long long ClientCursor::totalOpen() {
- return cursorStatsOpen.get();
+using std::string;
+using std::stringstream;
+
+static Counter64 cursorStatsOpen; // gauge
+static Counter64 cursorStatsOpenPinned; // gauge
+static Counter64 cursorStatsOpenNoTimeout; // gauge
+static Counter64 cursorStatsTimedOut;
+
+static ServerStatusMetricField<Counter64> dCursorStatsOpen("cursor.open.total", &cursorStatsOpen);
+static ServerStatusMetricField<Counter64> dCursorStatsOpenPinned("cursor.open.pinned",
+ &cursorStatsOpenPinned);
+static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout("cursor.open.noTimeout",
+ &cursorStatsOpenNoTimeout);
+static ServerStatusMetricField<Counter64> dCursorStatusTimedout("cursor.timedOut",
+ &cursorStatsTimedOut);
+
+MONGO_EXPORT_SERVER_PARAMETER(cursorTimeoutMillis, int, 10 * 60 * 1000 /* 10 minutes */);
+
+long long ClientCursor::totalOpen() {
+ return cursorStatsOpen.get();
+}
+
+ClientCursor::ClientCursor(CursorManager* cursorManager,
+ PlanExecutor* exec,
+ const std::string& ns,
+ int qopts,
+ const BSONObj query,
+ bool isAggCursor)
+ : _ns(ns),
+ _cursorManager(cursorManager),
+ _countedYet(false),
+ _isAggCursor(isAggCursor),
+ _unownedRU(NULL) {
+ _exec.reset(exec);
+ _query = query;
+ _queryOptions = qopts;
+ if (exec->collection()) {
+ invariant(cursorManager == exec->collection()->getCursorManager());
}
-
- ClientCursor::ClientCursor(CursorManager* cursorManager,
- PlanExecutor* exec,
- const std::string& ns,
- int qopts,
- const BSONObj query,
- bool isAggCursor)
- : _ns(ns),
- _cursorManager(cursorManager),
- _countedYet(false),
- _isAggCursor(isAggCursor),
- _unownedRU(NULL) {
-
- _exec.reset(exec);
- _query = query;
- _queryOptions = qopts;
- if (exec->collection()) {
- invariant(cursorManager == exec->collection()->getCursorManager());
- }
- init();
- }
-
- ClientCursor::ClientCursor(const Collection* collection)
- : _ns(collection->ns().ns()),
- _cursorManager(collection->getCursorManager()),
- _countedYet(false),
- _queryOptions(QueryOption_NoCursorTimeout),
- _isAggCursor(false),
- _unownedRU(NULL) {
- init();
+ init();
+}
+
+ClientCursor::ClientCursor(const Collection* collection)
+ : _ns(collection->ns().ns()),
+ _cursorManager(collection->getCursorManager()),
+ _countedYet(false),
+ _queryOptions(QueryOption_NoCursorTimeout),
+ _isAggCursor(false),
+ _unownedRU(NULL) {
+ init();
+}
+
+void ClientCursor::init() {
+ invariant(_cursorManager);
+
+ _isPinned = false;
+ _isNoTimeout = false;
+
+ _idleAgeMillis = 0;
+ _leftoverMaxTimeMicros = 0;
+ _pos = 0;
+
+ if (_queryOptions & QueryOption_NoCursorTimeout) {
+ // cursors normally timeout after an inactivity period to prevent excess memory use
+ // setting this prevents timeout of the cursor in question.
+ _isNoTimeout = true;
+ cursorStatsOpenNoTimeout.increment();
}
- void ClientCursor::init() {
- invariant( _cursorManager );
-
- _isPinned = false;
- _isNoTimeout = false;
-
- _idleAgeMillis = 0;
- _leftoverMaxTimeMicros = 0;
- _pos = 0;
+ _cursorid = _cursorManager->registerCursor(this);
- if (_queryOptions & QueryOption_NoCursorTimeout) {
- // cursors normally timeout after an inactivity period to prevent excess memory use
- // setting this prevents timeout of the cursor in question.
- _isNoTimeout = true;
- cursorStatsOpenNoTimeout.increment();
- }
-
- _cursorid = _cursorManager->registerCursor( this );
+ cursorStatsOpen.increment();
+ _countedYet = true;
+}
- cursorStatsOpen.increment();
- _countedYet = true;
+ClientCursor::~ClientCursor() {
+ if (_pos == -2) {
+ // defensive: destructor called twice
+ wassert(false);
+ return;
}
- ClientCursor::~ClientCursor() {
- if( _pos == -2 ) {
- // defensive: destructor called twice
- wassert(false);
- return;
- }
-
- invariant( !_isPinned ); // Must call unsetPinned() before invoking destructor.
-
- if ( _countedYet ) {
- _countedYet = false;
- cursorStatsOpen.decrement();
- if ( _isNoTimeout )
- cursorStatsOpenNoTimeout.decrement();
- }
-
- if ( _cursorManager ) {
- // this could be null if kill() was killed
- _cursorManager->deregisterCursor( this );
- }
+ invariant(!_isPinned); // Must call unsetPinned() before invoking destructor.
- // defensive:
- _cursorManager = NULL;
- _cursorid = INVALID_CURSOR_ID;
- _pos = -2;
- _isNoTimeout = false;
+ if (_countedYet) {
+ _countedYet = false;
+ cursorStatsOpen.decrement();
+ if (_isNoTimeout)
+ cursorStatsOpenNoTimeout.decrement();
}
- void ClientCursor::kill() {
- if ( _exec.get() )
- _exec->kill("cursor killed");
-
- _cursorManager = NULL;
+ if (_cursorManager) {
+ // this could be null if kill() was killed
+ _cursorManager->deregisterCursor(this);
}
- //
- // Timing and timeouts
- //
-
- bool ClientCursor::shouldTimeout(int millis) {
- _idleAgeMillis += millis;
- if (_isNoTimeout || _isPinned) {
- return false;
- }
- return _idleAgeMillis > cursorTimeoutMillis;
- }
+ // defensive:
+ _cursorManager = NULL;
+ _cursorid = INVALID_CURSOR_ID;
+ _pos = -2;
+ _isNoTimeout = false;
+}
- void ClientCursor::setIdleTime( int millis ) {
- _idleAgeMillis = millis;
- }
+void ClientCursor::kill() {
+ if (_exec.get())
+ _exec->kill("cursor killed");
- void ClientCursor::updateSlaveLocation(OperationContext* txn) {
- if (_slaveReadTill.isNull())
- return;
+ _cursorManager = NULL;
+}
- verify(str::startsWith(_ns.c_str(), "local.oplog."));
+//
+// Timing and timeouts
+//
- Client* c = txn->getClient();
- verify(c);
- OID rid = repl::ReplClientInfo::forClient(c).getRemoteID();
- if (!rid.isSet())
- return;
+bool ClientCursor::shouldTimeout(int millis) {
+ _idleAgeMillis += millis;
+ if (_isNoTimeout || _isPinned) {
+ return false;
+ }
+ return _idleAgeMillis > cursorTimeoutMillis;
+}
+
+void ClientCursor::setIdleTime(int millis) {
+ _idleAgeMillis = millis;
+}
+
+void ClientCursor::updateSlaveLocation(OperationContext* txn) {
+ if (_slaveReadTill.isNull())
+ return;
+
+ verify(str::startsWith(_ns.c_str(), "local.oplog."));
+
+ Client* c = txn->getClient();
+ verify(c);
+ OID rid = repl::ReplClientInfo::forClient(c).getRemoteID();
+ if (!rid.isSet())
+ return;
+
+ repl::getGlobalReplicationCoordinator()->setLastOptimeForSlave(rid, _slaveReadTill);
+}
+
+//
+// Storage engine state for getMore.
+//
+
+void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) {
+ invariant(!_unownedRU);
+ invariant(!_ownedRU.get());
+ _unownedRU = ru;
+}
+
+RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const {
+ return _unownedRU;
+}
+
+void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) {
+ invariant(!_unownedRU);
+ invariant(!_ownedRU.get());
+ _ownedRU.reset(ru);
+}
+
+RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() {
+ return _ownedRU.release();
+}
+
+//
+// Pin methods
+//
+
+ClientCursorPin::ClientCursorPin(CursorManager* cursorManager, long long cursorid) : _cursor(NULL) {
+ cursorStatsOpenPinned.increment();
+ _cursor = cursorManager->find(cursorid, true);
+}
+
+ClientCursorPin::~ClientCursorPin() {
+ cursorStatsOpenPinned.decrement();
+ release();
+}
+
+void ClientCursorPin::release() {
+ if (!_cursor)
+ return;
+
+ invariant(_cursor->isPinned());
+
+ if (_cursor->cursorManager() == NULL) {
+ // The ClientCursor was killed while we had it. Therefore, it is our responsibility to
+ // kill it.
+ deleteUnderlying();
+ } else {
+ // Unpin the cursor under the collection cursor manager lock.
+ _cursor->cursorManager()->unpin(_cursor);
+ }
- repl::getGlobalReplicationCoordinator()->setLastOptimeForSlave(rid, _slaveReadTill);
+ _cursor = NULL;
+}
+
+void ClientCursorPin::deleteUnderlying() {
+ invariant(_cursor);
+ invariant(_cursor->isPinned());
+ // Note the following subtleties of this method's implementation:
+ // - We must unpin the cursor before destruction, since it is an error to destroy a pinned
+ // cursor.
+ // - In addition, we must deregister the cursor before unpinning, since it is an
+ // error to unpin a registered cursor without holding the cursor manager lock (note that
+ // we can't simply unpin with the cursor manager lock here, since we need to guarantee
+ // exclusive ownership of the cursor when we are deleting it).
+ if (_cursor->cursorManager()) {
+ _cursor->cursorManager()->deregisterCursor(_cursor);
+ _cursor->kill();
}
+ _cursor->unsetPinned();
+ delete _cursor;
+ _cursor = NULL;
+}
- //
- // Storage engine state for getMore.
- //
+ClientCursor* ClientCursorPin::c() const {
+ return _cursor;
+}
- void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) {
- invariant(!_unownedRU);
- invariant(!_ownedRU.get());
- _unownedRU = ru;
- }
+//
+// ClientCursorMonitor
+//
- RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const {
- return _unownedRU;
+/**
+ * Thread for timing out old cursors
+ */
+class ClientCursorMonitor : public BackgroundJob {
+public:
+ std::string name() const {
+ return "ClientCursorMonitor";
}
- void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) {
- invariant(!_unownedRU);
- invariant(!_ownedRU.get());
- _ownedRU.reset(ru);
+ void run() {
+ Client::initThread("clientcursormon");
+ Timer t;
+ const int Secs = 4;
+ while (!inShutdown()) {
+ {
+ OperationContextImpl txn;
+ cursorStatsTimedOut.increment(
+ CursorManager::timeoutCursorsGlobal(&txn, t.millisReset()));
+ }
+ sleepsecs(Secs);
+ }
}
-
- RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() {
- return _ownedRU.release();
+};
+
+namespace {
+// Only one instance of the ClientCursorMonitor exists
+ClientCursorMonitor clientCursorMonitor;
+
+void _appendCursorStats(BSONObjBuilder& b) {
+ b.append("note", "deprecated, use server status metrics");
+ b.appendNumber("clientCursors_size", cursorStatsOpen.get());
+ b.appendNumber("totalOpen", cursorStatsOpen.get());
+ b.appendNumber("pinned", cursorStatsOpenPinned.get());
+ b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get());
+ b.appendNumber("timedOut", cursorStatsTimedOut.get());
+}
+}
+
+void startClientCursorMonitor() {
+ clientCursorMonitor.go();
+}
+
+// QUESTION: Restrict to the namespace from which this command was issued?
+// Alternatively, make this command admin-only?
+// TODO: remove this for 3.0
+class CmdCursorInfo : public Command {
+public:
+ CmdCursorInfo() : Command("cursorInfo") {}
+ virtual bool slaveOk() const {
+ return true;
}
-
- //
- // Pin methods
- //
-
- ClientCursorPin::ClientCursorPin( CursorManager* cursorManager, long long cursorid )
- : _cursor( NULL ) {
- cursorStatsOpenPinned.increment();
- _cursor = cursorManager->find( cursorid, true );
+ virtual void help(stringstream& help) const {
+ help << " example: { cursorInfo : 1 }, deprecated";
}
-
- ClientCursorPin::~ClientCursorPin() {
- cursorStatsOpenPinned.decrement();
- release();
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
}
-
- void ClientCursorPin::release() {
- if ( !_cursor )
- return;
-
- invariant( _cursor->isPinned() );
-
- if ( _cursor->cursorManager() == NULL ) {
- // The ClientCursor was killed while we had it. Therefore, it is our responsibility to
- // kill it.
- deleteUnderlying();
- }
- else {
- // Unpin the cursor under the collection cursor manager lock.
- _cursor->cursorManager()->unpin( _cursor );
- }
-
- _cursor = NULL;
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::cursorInfo);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
-
- void ClientCursorPin::deleteUnderlying() {
- invariant( _cursor );
- invariant( _cursor->isPinned() );
- // Note the following subtleties of this method's implementation:
- // - We must unpin the cursor before destruction, since it is an error to destroy a pinned
- // cursor.
- // - In addition, we must deregister the cursor before unpinning, since it is an
- // error to unpin a registered cursor without holding the cursor manager lock (note that
- // we can't simply unpin with the cursor manager lock here, since we need to guarantee
- // exclusive ownership of the cursor when we are deleting it).
- if ( _cursor->cursorManager() ) {
- _cursor->cursorManager()->deregisterCursor( _cursor );
- _cursor->kill();
- }
- _cursor->unsetPinned();
- delete _cursor;
- _cursor = NULL;
+ bool run(OperationContext* txn,
+ const string& dbname,
+ BSONObj& jsobj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ _appendCursorStats(result);
+ return true;
}
+} cmdCursorInfo;
- ClientCursor* ClientCursorPin::c() const {
- return _cursor;
- }
+//
+// cursors stats.
+//
- //
- // ClientCursorMonitor
- //
-
- /**
- * Thread for timing out old cursors
- */
- class ClientCursorMonitor : public BackgroundJob {
- public:
- std::string name() const { return "ClientCursorMonitor"; }
-
- void run() {
- Client::initThread("clientcursormon");
- Timer t;
- const int Secs = 4;
- while (!inShutdown()) {
- {
- OperationContextImpl txn;
- cursorStatsTimedOut.increment(
- CursorManager::timeoutCursorsGlobal(&txn, t.millisReset()));
- }
- sleepsecs(Secs);
- }
- }
- };
-
- namespace {
- // Only one instance of the ClientCursorMonitor exists
- ClientCursorMonitor clientCursorMonitor;
-
- void _appendCursorStats( BSONObjBuilder& b ) {
- b.append( "note" , "deprecated, use server status metrics" );
- b.appendNumber("clientCursors_size", cursorStatsOpen.get() );
- b.appendNumber("totalOpen", cursorStatsOpen.get() );
- b.appendNumber("pinned", cursorStatsOpenPinned.get() );
- b.appendNumber("totalNoTimeout", cursorStatsOpenNoTimeout.get() );
- b.appendNumber("timedOut" , cursorStatsTimedOut.get());
- }
+class CursorServerStats : public ServerStatusSection {
+public:
+ CursorServerStats() : ServerStatusSection("cursors") {}
+ virtual bool includeByDefault() const {
+ return true;
}
- void startClientCursorMonitor() {
- clientCursorMonitor.go();
+ BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const {
+ BSONObjBuilder b;
+ _appendCursorStats(b);
+ return b.obj();
}
- // QUESTION: Restrict to the namespace from which this command was issued?
- // Alternatively, make this command admin-only?
- // TODO: remove this for 3.0
- class CmdCursorInfo : public Command {
- public:
- CmdCursorInfo() : Command( "cursorInfo" ) {}
- virtual bool slaveOk() const { return true; }
- virtual void help( stringstream& help ) const {
- help << " example: { cursorInfo : 1 }, deprecated";
- }
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::cursorInfo);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string& dbname,
- BSONObj& jsobj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- _appendCursorStats( result );
- return true;
- }
- } cmdCursorInfo;
-
- //
- // cursors stats.
- //
-
- class CursorServerStats : public ServerStatusSection {
- public:
- CursorServerStats() : ServerStatusSection( "cursors" ){}
- virtual bool includeByDefault() const { return true; }
-
- BSONObj generateSection(OperationContext* txn,
- const BSONElement& configElement) const {
- BSONObjBuilder b;
- _appendCursorStats( b );
- return b.obj();
- }
-
- } cursorServerStats;
+} cursorServerStats;
-} // namespace mongo
+} // namespace mongo