/**
* Copyright (C) 2008, 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/db/clientcursor.h"
#include
#include
#include
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/server_status.h"
#include "mongo/db/db.h"
#include "mongo/db/introspect.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/kill_current_op.h"
#include "mongo/db/pagefault.h"
#include "mongo/db/parsed_query.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/write_concern.h"
#include "mongo/db/scanandorder.h"
#include "mongo/platform/random.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/timer.h"
namespace mongo {
ClientCursor::CCById ClientCursor::clientCursorsById;
boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) );
long long ClientCursor::numberTimedOut = 0;
set ClientCursor::nonCachedRunners;
void aboutToDeleteForSharding(const StringData& ns,
const Database* db,
const NamespaceDetails* nsd,
const DiskLoc& dl ); // from s/d_logic.h
ClientCursor::ClientCursor(int qopts, const shared_ptr& c, const StringData& ns,
BSONObj query)
: _ns(ns.toString()), _query(query), _runner(NULL), _c(c), _yieldSometimesTracker(128, 10) {
_queryOptions = qopts;
_doingDeletes = false;
init();
}
ClientCursor::ClientCursor(Runner* runner, int qopts, const BSONObj query)
: _yieldSometimesTracker(128, 10) {
_runner.reset(runner);
_ns = runner->ns();
_query = query;
_queryOptions = qopts;
init();
}
void ClientCursor::init() {
_db = cc().database();
verify( _db );
verify( _db->ownsNS( _ns ) );
_idleAgeMillis = 0;
_leftoverMaxTimeMicros = 0;
_pinValue = 0;
_pos = 0;
Lock::assertAtLeastReadLocked(_ns);
if (_queryOptions & QueryOption_NoCursorTimeout) {
// cursors normally timeout after an inactivity period to prevent excess memory use
// setting this prevents timeout of the cursor in question.
++_pinValue;
}
recursive_scoped_lock lock(ccmutex);
_cursorid = allocCursorId_inlock();
clientCursorsById.insert( make_pair(_cursorid, this) );
}
ClientCursor::~ClientCursor() {
if( _pos == -2 ) {
// defensive: destructor called twice
wassert(false);
return;
}
{
recursive_scoped_lock lock(ccmutex);
if (NULL != _c.get()) {
// Removes 'this' from bylocation map
setLastLoc_inlock( DiskLoc() );
}
clientCursorsById.erase(_cursorid);
// defensive:
_cursorid = INVALID_CURSOR_ID;
_pos = -2;
_pinValue = 0;
}
}
// static
void ClientCursor::assertNoCursors() {
recursive_scoped_lock lock(ccmutex);
if (clientCursorsById.size() > 0) {
log() << "ERROR clientcursors exist but should not at this point" << endl;
ClientCursor *cc = clientCursorsById.begin()->second;
log() << "first one: " << cc->_cursorid << ' ' << cc->_ns << endl;
clientCursorsById.clear();
verify(false);
}
}
void ClientCursor::invalidate(const StringData& ns) {
Lock::assertWriteLocked(ns);
size_t dot = ns.find( '.' );
verify( dot != string::npos );
// first (and only) dot is the last char
bool isDB = dot == ns.size() - 1;
Database *db = cc().database();
verify(db);
verify(ns.startsWith(db->name()));
recursive_scoped_lock cclock(ccmutex);
// Look at all active non-cached Runners. These are the runners that are in auto-yield mode
// that are not attached to the the client cursor. For example, all internal runners don't
// need to be cached -- there will be no getMore.
for (set::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
++it) {
Runner* runner = *it;
const string& runnerNS = runner->ns();
if ( ( isDB && StringData(runnerNS).startsWith(ns) ) || ns == runnerNS ) {
runner->kill();
}
}
CCById::const_iterator it = clientCursorsById.begin();
while (it != clientCursorsById.end()) {
ClientCursor* cc = it->second;
// We're only interested in cursors over one db.
if (cc->_db != db) {
++it;
continue;
}
bool shouldDelete = false;
// We will only delete CCs with runners that are not actively in use. The runners that
// are actively in use are instead kill()-ed.
if (NULL != cc->_runner.get()) {
verify(NULL == cc->c());
if (isDB || cc->_runner->ns() == ns) {
// If there is a pinValue >= 100, somebody is actively using the CC and we do
// not delete it. Instead we notify the holder that we killed it. The holder
// will then delete the CC.
if (cc->_pinValue >= 100) {
cc->_runner->kill();
}
else {
// pinvalue is <100, so there is nobody actively holding the CC. We can
// safely delete it as nobody is holding the CC.
shouldDelete = true;
}
}
}
// Begin cursor-only DEPRECATED
else if (cc->c()->shouldDestroyOnNSDeletion()) {
verify(NULL == cc->_runner.get());
if (isDB) {
// already checked that db matched above
dassert( StringData(cc->_ns).startsWith( ns ) );
shouldDelete = true;
}
else {
if ( ns == cc->_ns ) {
shouldDelete = true;
}
}
}
// End cursor-only DEPRECATED
if (shouldDelete) {
ClientCursor* toDelete = it->second;
CursorId id = toDelete->cursorid();
delete toDelete;
// We're not following the usual paradigm of saving it, ++it, and deleting the saved
// 'it' because deleting 'it' might invalidate the next thing in clientCursorsById.
// TODO: Why?
it = clientCursorsById.upper_bound(id);
}
else {
++it;
}
}
}
/* must call this on a delete so we clean up the cursors. */
void ClientCursor::aboutToDelete(const StringData& ns,
const NamespaceDetails* nsd,
const DiskLoc& dl) {
// Begin cursor-only
NoPageFaultsAllowed npfa;
// End cursor-only
recursive_scoped_lock lock(ccmutex);
Database *db = cc().database();
verify(db);
aboutToDeleteForSharding( ns, db, nsd, dl );
// Check our non-cached active runner list.
for (set::iterator it = nonCachedRunners.begin(); it != nonCachedRunners.end();
++it) {
Runner* runner = *it;
if (0 == ns.compare(runner->ns())) {
runner->invalidate(dl);
}
}
// TODO: This requires optimization. We walk through *all* CCs and send the delete to every
// CC open on the db we're deleting from. We could:
// 1. Map from ns to open runners,
// 2. Map from ns -> (a map of DiskLoc -> runners who care about that DL)
//
// We could also queue invalidations somehow and have them processed later in the runner's
// read locks.
for (CCById::const_iterator it = clientCursorsById.begin(); it != clientCursorsById.end();
++it) {
ClientCursor* cc = it->second;
// We're only interested in cursors over one db.
if (cc->_db != db) { continue; }
if (NULL == cc->_runner.get()) { continue; }
cc->_runner->invalidate(dl);
}
// Begin cursor-only
CCByLoc& bl = db->ccByLoc();
CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl));
CCByLoc::iterator stop = bl.upper_bound(ByLocKey::max(dl));
if ( j == stop )
return;
vector toAdvance;
while ( 1 ) {
toAdvance.push_back(j->second);
DEV verify( j->first.loc == dl );
++j;
if ( j == stop )
break;
}
if( toAdvance.size() >= 3000 ) {
log() << "perf warning MPW101: " << toAdvance.size() << " cursors for one diskloc "
<< dl.toString()
<< ' ' << toAdvance[1000]->_ns
<< ' ' << toAdvance[2000]->_ns
<< ' ' << toAdvance[1000]->_pinValue
<< ' ' << toAdvance[2000]->_pinValue
<< ' ' << toAdvance[1000]->_pos
<< ' ' << toAdvance[2000]->_pos
<< ' ' << toAdvance[1000]->_idleAgeMillis
<< ' ' << toAdvance[2000]->_idleAgeMillis
<< ' ' << toAdvance[1000]->_doingDeletes
<< ' ' << toAdvance[2000]->_doingDeletes
<< endl;
//wassert( toAdvance.size() < 5000 );
}
for ( vector::iterator i = toAdvance.begin(); i != toAdvance.end(); ++i ) {
ClientCursor* cc = *i;
wassert(cc->_db == db);
if ( cc->_doingDeletes ) continue;
Cursor *c = cc->_c.get();
if ( c->capped() ) {
/* note we cannot advance here. if this condition occurs, writes to the oplog
have "caught" the reader. skipping ahead, the reader would miss postentially
important data.
*/
delete cc;
continue;
}
c->recoverFromYield();
DiskLoc tmp1 = c->refLoc();
if ( tmp1 != dl ) {
// This might indicate a failure to call ClientCursor::prepareToYield() but it can
// also happen during correct operation, see SERVER-2009.
problem() << "warning: cursor loc " << tmp1 << " does not match byLoc position " << dl << " !" << endl;
}
else {
c->advance();
}
while (!c->eof() && c->refLoc() == dl) {
/* We don't delete at EOF because we want to return "no more results" rather than "no such cursor".
* The loop is to handle MultiKey indexes where the deleted record is pointed to by multiple adjacent keys.
* In that case we need to advance until we get to the next distinct record or EOF.
* SERVER-4154
* SERVER-5198
* But see SERVER-5725.
*/
c->advance();
}
cc->updateLocation();
}
// End cursor-only
}
void ClientCursor::registerRunner(Runner* runner) {
recursive_scoped_lock lock(ccmutex);
verify(nonCachedRunners.end() == nonCachedRunners.find(runner));
nonCachedRunners.insert(runner);
}
void ClientCursor::deregisterRunner(Runner* runner) {
recursive_scoped_lock lock(ccmutex);
verify(nonCachedRunners.end() != nonCachedRunners.find(runner));
nonCachedRunners.erase(runner);
}
void yieldOrSleepFor1Microsecond() {
#ifdef _WIN32
SwitchToThread();
#elif defined(__linux__)
pthread_yield();
#else
sleepmicros(1);
#endif
}
void ClientCursor::staticYield(int micros, const StringData& ns, Record* rec) {
bool haveReadLock = Lock::isReadLocked();
killCurrentOp.checkForInterrupt( false );
{
auto_ptr lk;
if ( rec ) {
// need to lock this else rec->touch won't be safe file could disappear
lk.reset( new LockMongoFilesShared() );
}
dbtempreleasecond unlock;
if ( unlock.unlocked() ) {
if ( haveReadLock ) {
// This sleep helps reader threads yield to writer threads.
// Without this, the underlying reader/writer lock implementations
// are not sufficiently writer-greedy.
#ifdef _WIN32
SwitchToThread();
#else
if ( micros == 0 ) {
yieldOrSleepFor1Microsecond();
}
else {
sleepmicros(1);
}
#endif
}
else {
if ( micros == -1 ) {
sleepmicros(Client::recommendedYieldMicros());
}
else if ( micros == 0 ) {
yieldOrSleepFor1Microsecond();
}
else if ( micros > 0 ) {
sleepmicros( micros );
}
}
}
else if ( Listener::getTimeTracker() == 0 ) {
// we aren't running a server, so likely a repair, so don't complain
}
else {
CurOp * c = cc().curop();
while ( c->parent() )
c = c->parent();
warning() << "ClientCursor::yield can't unlock b/c of recursive lock"
<< " ns: " << ns
<< " top: " << c->info()
<< endl;
}
if ( rec )
rec->touch();
lk.reset(0); // need to release this before dbtempreleasecond
}
}
//
// Timing and timeouts
//
bool ClientCursor::shouldTimeout(unsigned millis) {
_idleAgeMillis += millis;
return _idleAgeMillis > 600000 && _pinValue == 0;
}
void ClientCursor::idleTimeReport(unsigned millis) {
bool foundSomeToTimeout = false;
// two passes so that we don't need to readlock unless we really do some timeouts
// we assume here that incrementing _idleAgeMillis outside readlock is ok.
{
recursive_scoped_lock lock(ccmutex);
{
unsigned sz = clientCursorsById.size();
static time_t last;
if( sz >= 100000 ) {
if( time(0) - last > 300 ) {
last = time(0);
log() << "warning number of open cursors is very large: " << sz << endl;
}
}
}
for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); ) {
CCById::iterator j = i;
i++;
if( j->second->shouldTimeout( millis ) ) {
foundSomeToTimeout = true;
}
}
}
if( foundSomeToTimeout ) {
Lock::GlobalRead lk;
recursive_scoped_lock cclock(ccmutex);
CCById::const_iterator it = clientCursorsById.begin();
while (it != clientCursorsById.end()) {
ClientCursor* cc = it->second;
if( cc->shouldTimeout(0) ) {
numberTimedOut++;
LOG(1) << "killing old cursor " << cc->_cursorid << ' ' << cc->_ns
<< " idle:" << cc->idleTime() << "ms\n";
ClientCursor* toDelete = it->second;
CursorId id = toDelete->cursorid();
// This is what winds up removing it from the map.
delete toDelete;
it = clientCursorsById.upper_bound(id);
}
else {
++it;
}
}
}
}
// DEPRECATED only used by Cursor.
void ClientCursor::storeOpForSlave( DiskLoc last ) {
verify(NULL == _runner.get());
if ( ! ( _queryOptions & QueryOption_OplogReplay ))
return;
if ( last.isNull() )
return;
BSONElement e = last.obj()["ts"];
if ( e.type() == Date || e.type() == Timestamp )
_slaveReadTill = e._opTime();
}
void ClientCursor::updateSlaveLocation( CurOp& curop ) {
if ( _slaveReadTill.isNull() )
return;
mongo::updateSlaveLocation( curop , _ns.c_str() , _slaveReadTill );
}
void ClientCursor::appendStats( BSONObjBuilder& result ) {
recursive_scoped_lock lock(ccmutex);
result.appendNumber("totalOpen", clientCursorsById.size() );
result.appendNumber("clientCursors_size", (int) numCursors());
result.appendNumber("timedOut" , numberTimedOut);
unsigned pinned = 0;
unsigned notimeout = 0;
for ( CCById::iterator i = clientCursorsById.begin(); i != clientCursorsById.end(); i++ ) {
unsigned p = i->second->_pinValue;
if( p >= 100 )
pinned++;
else if( p > 0 )
notimeout++;
}
if( pinned )
result.append("pinned", pinned);
if( notimeout )
result.append("totalNoTimeout", notimeout);
}
//
// ClientCursor creation/deletion/access.
//
// Some statics used by allocCursorId_inlock().
namespace {
// so we don't have to do find() which is a little slow very often.
long long cursorGenTSLast = 0;
PseudoRandom* cursorGenRandom = NULL;
}
long long ClientCursor::allocCursorId_inlock() {
// It is important that cursor IDs not be reused within a short period of time.
if (!cursorGenRandom) {
scoped_ptr sr( SecureRandom::create() );
cursorGenRandom = new PseudoRandom( sr->nextInt64() );
}
const long long ts = Listener::getElapsedTimeMillis();
long long x;
while ( 1 ) {
x = ts << 32;
x |= cursorGenRandom->nextInt32();
if ( x == 0 ) { continue; }
if ( x < 0 ) { x *= -1; }
if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 )
break;
}
cursorGenTSLast = ts;
return x;
}
// static
ClientCursor* ClientCursor::find_inlock(CursorId id, bool warn) {
CCById::iterator it = clientCursorsById.find(id);
if ( it == clientCursorsById.end() ) {
if ( warn ) {
OCCASIONALLY out() << "ClientCursor::find(): cursor not found in map '" << id
<< "' (ok after a drop)" << endl;
}
return 0;
}
return it->second;
}
void ClientCursor::find( const string& ns , set& all ) {
recursive_scoped_lock lock(ccmutex);
for ( CCById::iterator i=clientCursorsById.begin(); i!=clientCursorsById.end(); ++i ) {
if ( i->second->_ns == ns )
all.insert( i->first );
}
}
// static
ClientCursor* ClientCursor::find(CursorId id, bool warn) {
recursive_scoped_lock lock(ccmutex);
ClientCursor *c = find_inlock(id, warn);
// if this asserts, your code was not thread safe - you either need to set no timeout
// for the cursor or keep a ClientCursor::Pointer in scope for it.
massert( 12521, "internal error: use of an unlocked ClientCursor", c == 0 || c->_pinValue );
return c;
}
void ClientCursor::_erase_inlock(ClientCursor* cursor) {
// Must not have an active ClientCursor::Pin.
massert( 16089,
str::stream() << "Cannot kill active cursor " << cursor->cursorid(),
cursor->_pinValue < 100 );
delete cursor;
}
bool ClientCursor::erase(CursorId id) {
recursive_scoped_lock lock(ccmutex);
ClientCursor* cursor = find_inlock(id);
if (!cursor) { return false; }
_erase_inlock(cursor);
return true;
}
int ClientCursor::erase(int n, long long *ids) {
int found = 0;
for ( int i = 0; i < n; i++ ) {
if ( erase(ids[i]))
found++;
if ( inShutdown() )
break;
}
return found;
}
bool ClientCursor::eraseIfAuthorized(CursorId id) {
NamespaceString ns;
{
recursive_scoped_lock lock(ccmutex);
ClientCursor* cursor = find_inlock(id);
if (!cursor) {
audit::logKillCursorsAuthzCheck(
&cc(),
NamespaceString(),
id,
ErrorCodes::CursorNotFound);
return false;
}
ns = NamespaceString(cursor->ns());
}
// Can't be in a lock when checking authorization
const bool isAuthorized = cc().getAuthorizationSession()->isAuthorizedForActionsOnNamespace(
ns, ActionType::killCursors);
audit::logKillCursorsAuthzCheck(
&cc(),
ns,
id,
isAuthorized ? ErrorCodes::OK : ErrorCodes::Unauthorized);
if (!isAuthorized) {
return false;
}
// It is safe to lookup the cursor again after temporarily releasing the mutex because
// of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that
// the namespace associated with a cursor cannot change.
recursive_scoped_lock lock(ccmutex);
ClientCursor* cursor = find_inlock(id);
if (!cursor) {
// Cursor was deleted in another thread since we found it earlier in this function.
return false;
}
if (ns != cursor->ns()) {
warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: "
<< cursor->ns() << endl;
return false;
}
_erase_inlock(cursor);
return true;
}
int ClientCursor::eraseIfAuthorized(int n, long long *ids) {
int found = 0;
for ( int i = 0; i < n; i++ ) {
if ( eraseIfAuthorized(ids[i]))
found++;
if ( inShutdown() )
break;
}
return found;
}
//
// Yielding that is DEPRECATED. Will be removed when we use runners and they yield internally.
//
Record* ClientCursor::_recordForYield( ClientCursor::RecordNeeds need ) {
if ( ! ok() )
return 0;
if ( need == DontNeed ) {
return 0;
}
else if ( need == MaybeCovered ) {
// TODO
return 0;
}
else if ( need == WillNeed ) {
// no-op
}
else {
warning() << "don't understand RecordNeeds: " << (int)need << endl;
return 0;
}
DiskLoc l = currLoc();
if ( l.isNull() )
return 0;
Record * rec = l.rec();
if ( rec->likelyInPhysicalMemory() )
return 0;
return rec;
}
void ClientCursor::updateLocation() {
verify( _cursorid );
_idleAgeMillis = 0;
// Cursor-specific
_c->prepareToYield();
DiskLoc cl = _c->refLoc();
if ( lastLoc() == cl ) {
//log() << "info: lastloc==curloc " << ns << endl;
}
else {
recursive_scoped_lock lock(ccmutex);
setLastLoc_inlock(cl);
}
}
void ClientCursor::setLastLoc_inlock(DiskLoc L) {
verify(NULL == _runner.get());
verify( _pos != -2 ); // defensive - see ~ClientCursor
if (L == _lastLoc) { return; }
CCByLoc& bl = _db->ccByLoc();
if (!_lastLoc.isNull()) {
bl.erase(ByLocKey(_lastLoc, _cursorid));
}
if (!L.isNull()) {
bl[ByLocKey(L,_cursorid)] = this;
}
_lastLoc = L;
}
bool ClientCursor::yield( int micros , Record * recordToLoad ) {
// some cursors (geo@oct2011) don't support yielding
if (!_c->supportYields()) { return true; }
YieldData data;
prepareToYield( data );
staticYield( micros , _ns , recordToLoad );
return ClientCursor::recoverFromYield( data );
}
bool ClientCursor::yieldSometimes(RecordNeeds need, bool* yielded) {
if (yielded) { *yielded = false; }
if ( ! _yieldSometimesTracker.intervalHasElapsed() ) {
Record* rec = _recordForYield( need );
if ( rec ) {
// yield for page fault
if ( yielded ) {
*yielded = true;
}
bool res = yield( suggestYieldMicros() , rec );
if ( res )
_yieldSometimesTracker.resetLastTime();
return res;
}
return true;
}
int micros = suggestYieldMicros();
if ( micros > 0 ) {
if ( yielded ) {
*yielded = true;
}
bool res = yield( micros , _recordForYield( need ) );
if ( res )
_yieldSometimesTracker.resetLastTime();
return res;
}
return true;
}
bool ClientCursor::prepareToYield( YieldData &data ) {
if (!_c->supportYields()) { return false; }
// need to store in case 'this' gets deleted
data._id = _cursorid;
data._doingDeletes = _doingDeletes;
_doingDeletes = false;
updateLocation();
return true;
}
bool ClientCursor::recoverFromYield( const YieldData &data ) {
ClientCursor *cc = ClientCursor::find( data._id , false );
if ( cc == 0 ) {
// id was deleted
return false;
}
cc->_doingDeletes = data._doingDeletes;
cc->_c->recoverFromYield();
return true;
}
int ClientCursor::suggestYieldMicros() {
int writers = 0;
int readers = 0;
int micros = Client::recommendedYieldMicros( &writers , &readers );
if ( micros > 0 && writers == 0 && Lock::isR() ) {
// we have a read lock, and only reads are coming on, so why bother unlocking
return 0;
}
wassert( micros < 10000000 );
dassert( micros < 1000001 );
return micros;
}
//
// Pin methods
// TODO: Simplify when we kill Cursor. In particular, once we've pinned a CC, it won't be
// deleted from underneath us, so we can save the pointer and ignore the ID.
//
ClientCursorPin::ClientCursorPin(long long cursorid) : _cursorid( INVALID_CURSOR_ID ) {
recursive_scoped_lock lock( ClientCursor::ccmutex );
ClientCursor *cursor = ClientCursor::find_inlock( cursorid, true );
if (NULL != cursor) {
uassert( 12051, "clientcursor already in use? driver problem?",
cursor->_pinValue < 100 );
cursor->_pinValue += 100;
_cursorid = cursorid;
}
}
ClientCursorPin::~ClientCursorPin() { DESTRUCTOR_GUARD( release(); ) }
void ClientCursorPin::release() {
if ( _cursorid == INVALID_CURSOR_ID ) {
return;
}
ClientCursor *cursor = c();
_cursorid = INVALID_CURSOR_ID;
if ( cursor ) {
verify( cursor->_pinValue >= 100 );
cursor->_pinValue -= 100;
}
}
void ClientCursorPin::free() {
if (_cursorid == INVALID_CURSOR_ID) {
return;
}
ClientCursor *cursor = c();
_cursorid = INVALID_CURSOR_ID;
delete cursor;
}
ClientCursor* ClientCursorPin::c() const {
return ClientCursor::find( _cursorid );
}
//
// Holder methods DEPRECATED
//
ClientCursorHolder::ClientCursorHolder(ClientCursor *c) : _c(0), _id(INVALID_CURSOR_ID) {
reset(c);
}
ClientCursorHolder::~ClientCursorHolder() {
DESTRUCTOR_GUARD(reset(););
}
void ClientCursorHolder::reset(ClientCursor *c) {
if ( c == _c )
return;
if ( _c ) {
// be careful in case cursor was deleted by someone else
ClientCursor::erase( _id );
}
if ( c ) {
_c = c;
_id = c->_cursorid;
}
else {
_c = 0;
_id = INVALID_CURSOR_ID;
}
}
ClientCursor* ClientCursorHolder::get() { return _c; }
ClientCursor * ClientCursorHolder::operator-> () { return _c; }
const ClientCursor * ClientCursorHolder::operator-> () const { return _c; }
void ClientCursorHolder::release() {
_c = 0;
_id = INVALID_CURSOR_ID;
}
//
// ClientCursorMonitor
//
// Used by sayMemoryStatus below.
struct Mem {
Mem() { res = virt = mapped = 0; }
long long res;
long long virt;
long long mapped;
bool grew(const Mem& r) {
return (r.res && (((double)res)/r.res)>1.1 ) ||
(r.virt && (((double)virt)/r.virt)>1.1 ) ||
(r.mapped && (((double)mapped)/r.mapped)>1.1 );
}
};
/**
* called once a minute from killcursors thread
*/
void sayMemoryStatus() {
static time_t last;
static Mem mlast;
try {
ProcessInfo p;
if (!serverGlobalParams.quiet && p.supported()) {
Mem m;
m.res = p.getResidentSize();
m.virt = p.getVirtualMemorySize();
m.mapped = MemoryMappedFile::totalMappedLength() / (1024 * 1024);
time_t now = time(0);
if( now - last >= 300 || m.grew(mlast) ) {
log() << "mem (MB) res:" << m.res << " virt:" << m.virt;
long long totalMapped = m.mapped;
if (storageGlobalParams.dur) {
totalMapped *= 2;
log() << " mapped (incl journal view):" << totalMapped;
}
else {
log() << " mapped:" << totalMapped;
}
log() << " connections:" << Listener::globalTicketHolder.used();
if (theReplSet) {
log() << " replication threads:" <<
ReplSetImpl::replWriterThreadCount +
ReplSetImpl::replPrefetcherThreadCount;
}
last = now;
mlast = m;
}
}
}
catch(const std::exception&) {
log() << "ProcessInfo exception" << endl;
}
}
void ClientCursorMonitor::run() {
Client::initThread("clientcursormon");
Client& client = cc();
Timer t;
const int Secs = 4;
unsigned n = 0;
while ( ! inShutdown() ) {
ClientCursor::idleTimeReport( t.millisReset() );
sleepsecs(Secs);
if( ++n % (60/4) == 0 /*once a minute*/ ) {
sayMemoryStatus();
}
}
client.shutdown();
}
ClientCursorMonitor clientCursorMonitor;
//
// cursorInfo command.
//
// QUESTION: Restrict to the namespace from which this command was issued?
// Alternatively, make this command admin-only?
class CmdCursorInfo : public Command {
public:
CmdCursorInfo() : Command( "cursorInfo", true ) {}
virtual bool slaveOk() const { return true; }
virtual void help( stringstream& help ) const {
help << " example: { cursorInfo : 1 }";
}
virtual LockType locktype() const { return NONE; }
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector* out) {
ActionSet actions;
actions.addAction(ActionType::cursorInfo);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result,
bool fromRepl ) {
ClientCursor::appendStats( result );
return true;
}
} cmdCursorInfo;
//
// cursors stats.
//
class CursorServerStats : public ServerStatusSection {
public:
CursorServerStats() : ServerStatusSection( "cursors" ){}
virtual bool includeByDefault() const { return true; }
BSONObj generateSection(const BSONElement& configElement) const {
BSONObjBuilder b;
ClientCursor::appendStats( b );
return b.obj();
}
} cursorServerStats;
//
// YieldLock
//
ClientCursorYieldLock::ClientCursorYieldLock( ptr cc )
: _canYield(cc->_c->supportYields()) {
if ( _canYield ) {
cc->prepareToYield( _data );
_unlock.reset(new dbtempreleasecond());
}
}
ClientCursorYieldLock::~ClientCursorYieldLock() {
if ( _unlock ) {
warning() << "ClientCursorYieldLock not closed properly" << endl;
relock();
}
}
bool ClientCursorYieldLock::stillOk() {
if ( ! _canYield )
return true;
relock();
return ClientCursor::recoverFromYield( _data );
}
void ClientCursorYieldLock::relock() {
_unlock.reset();
}
} // namespace mongo