/** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/pch.h" #include "mongo/base/counter.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/curop.h" #include "mongo/db/catalog/database.h" #include "mongo/db/kill_current_op.h" #include "mongo/util/fail_point_service.h" namespace mongo { // Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a valid // non-zero max time to fail immediately. Any getmore operation on a cursor already created // with a valid non-zero max time will also fail immediately. // // This fail point cannot be used with the maxTimeNeverTimeOut fail point. MONGO_FP_DECLARE(maxTimeAlwaysTimeOut); // Enabling the maxTimeNeverTimeOut fail point will cause the server to never time out any // query, command, or getmore operation, regardless of whether a max time is set. // // This fail point cannot be used with the maxTimeAlwaysTimeOut fail point. MONGO_FP_DECLARE(maxTimeNeverTimeOut); // todo : move more here CurOp::CurOp( Client * client , CurOp * wrapped ) : _client(client), _wrapped(wrapped) { if ( _wrapped ) _client->_curOp = this; _start = 0; _active = false; _reset(); _op = 0; _opNum = _nextOpNum++; _command = NULL; // These addresses should never be written to again. The zeroes are // placed here as a precaution because currentOp may be accessed // without the db mutex. memset(_ns, 0, sizeof(_ns)); } void CurOp::_reset() { _suppressFromCurop = false; _isCommand = false; _dbprofile = 0; _end = 0; _maxTimeMicros = 0; _maxTimeTracker.reset(); _message = ""; _progressMeter.finished(); _killPending.store(0); killCurrentOp.notifyAllWaiters(); _numYields = 0; _expectedLatencyMs = 0; _lockStat.reset(); } void CurOp::reset() { _reset(); _start = 0; _opNum = _nextOpNum++; _ns[0] = 0; _debug.reset(); _query.reset(); _active = true; // this should be last for ui clarity } void CurOp::reset( const HostAndPort& remote, int op ) { reset(); if( _remote != remote ) { // todo : _remote is not thread safe yet is used as such! _remote = remote; } _op = op; } ProgressMeter& CurOp::setMessage(const char * msg, std::string name, unsigned long long progressMeterTotal, int secondsBetween) { if ( progressMeterTotal ) { if ( _progressMeter.isActive() ) { cout << "about to assert, old _message: " << _message << " new message:" << msg << endl; verify( ! _progressMeter.isActive() ); } _progressMeter.reset( progressMeterTotal , secondsBetween ); _progressMeter.setName(name); } else { _progressMeter.finished(); } _message = msg; return _progressMeter; } CurOp::~CurOp() { killCurrentOp.notifyAllWaiters(); if ( _wrapped ) { scoped_lock bl(Client::clientsMutex); _client->_curOp = _wrapped; } _client = 0; } void CurOp::setNS( const StringData& ns ) { ns.substr( 0, Namespace::MaxNsLen ).copyTo( _ns, true ); } void CurOp::ensureStarted() { if ( _start == 0 ) { _start = curTimeMicros64(); // If ensureStarted() is invoked after setMaxTimeMicros(), then time limit tracking will // start here. This is because time limit tracking can only commence after the // operation is assigned a start time. if (_maxTimeMicros > 0) { _maxTimeTracker.setTimeLimit(_start, _maxTimeMicros); } } } void CurOp::enter( Client::Context * context ) { ensureStarted(); strncpy( _ns, context->ns(), Namespace::MaxNsLen); _ns[Namespace::MaxNsLen] = 0; _dbprofile = std::max( context->_db ? context->_db->getProfilingLevel() : 0 , _dbprofile ); } void CurOp::leave( Client::Context * context ) { } void CurOp::recordGlobalTime( long long micros ) const { if ( _client ) { const LockState& ls = _client->lockState(); verify( ls.threadState() ); Top::global.record( _ns , _op , ls.hasAnyWriteLock() ? 1 : -1 , micros , _isCommand ); } } BSONObj CurOp::info() { BSONObjBuilder b; b.append("opid", _opNum); bool a = _active && _start; b.append("active", a); if( a ) { b.append("secs_running", elapsedSeconds() ); b.append("microsecs_running", static_cast(elapsedMicros()) ); } b.append( "op" , opToString( _op ) ); b.append("ns", _ns); if (_op == dbInsert) { _query.append(b, "insert"); } else { _query.append(b , "query"); } if ( !debug().planSummary.empty() ) { b.append( "planSummary" , debug().planSummary.toString() ); } if( !_remote.empty() ) { b.append("client", _remote.toString()); } if ( _client ) { b.append( "desc" , _client->desc() ); if ( _client->_threadId.size() ) b.append( "threadId" , _client->_threadId ); if ( _client->_connectionId ) b.appendNumber( "connectionId" , _client->_connectionId ); _client->_ls.reportState(b); } if ( ! _message.empty() ) { if ( _progressMeter.isActive() ) { StringBuilder buf; buf << _message.toString() << " " << _progressMeter.toString(); b.append( "msg" , buf.str() ); BSONObjBuilder sub( b.subobjStart( "progress" ) ); sub.appendNumber( "done" , (long long)_progressMeter.done() ); sub.appendNumber( "total" , (long long)_progressMeter.total() ); sub.done(); } else { b.append( "msg" , _message.toString() ); } } if( killPending() ) b.append("killPending", true); b.append( "numYields" , _numYields ); b.append( "lockStats" , _lockStat.report() ); return b.obj(); } BSONObj CurOp::description() { BSONObjBuilder bob; bool a = _active && _start; bob.append("active", a); bob.append( "op" , opToString( _op ) ); bob.append("ns", _ns); if (_op == dbInsert) { _query.append(bob, "insert"); } else { _query.append(bob, "query"); } if( killPending() ) bob.append("killPending", true); return bob.obj(); } void CurOp::setKillWaiterFlags() { for (size_t i = 0; i < _notifyList.size(); ++i) *(_notifyList[i]) = true; _notifyList.clear(); } void CurOp::kill(bool* pNotifyFlag /* = NULL */) { _killPending.store(1); if (pNotifyFlag) { _notifyList.push_back(pNotifyFlag); } } void CurOp::setMaxTimeMicros(uint64_t maxTimeMicros) { _maxTimeMicros = maxTimeMicros; if (_maxTimeMicros == 0) { // 0 is "allow to run indefinitely". return; } // If the operation has a start time, then enable the tracker. // // If the operation has no start time yet, then ensureStarted() will take responsibility for // enabling the tracker. if (isStarted()) { _maxTimeTracker.setTimeLimit(startTime(), _maxTimeMicros); } } bool CurOp::maxTimeHasExpired() { if (MONGO_FAIL_POINT(maxTimeNeverTimeOut)) { return false; } if (_maxTimeMicros > 0 && MONGO_FAIL_POINT(maxTimeAlwaysTimeOut)) { return true; } return _maxTimeTracker.checkTimeLimit(); } uint64_t CurOp::getRemainingMaxTimeMicros() const { return _maxTimeTracker.getRemainingMicros(); } AtomicUInt CurOp::_nextOpNum; static Counter64 returnedCounter; static Counter64 insertedCounter; static Counter64 updatedCounter; static Counter64 deletedCounter; static Counter64 scannedCounter; static Counter64 scannedObjectCounter; static ServerStatusMetricField displayReturned( "document.returned", &returnedCounter ); static ServerStatusMetricField displayUpdated( "document.updated", &updatedCounter ); static ServerStatusMetricField displayInserted( "document.inserted", &insertedCounter ); static ServerStatusMetricField displayDeleted( "document.deleted", &deletedCounter ); static ServerStatusMetricField displayScanned( "queryExecutor.scanned", &scannedCounter ); static ServerStatusMetricField displayScannedObjects( "queryExecutor.scannedObjects", &scannedObjectCounter ); static Counter64 idhackCounter; static Counter64 scanAndOrderCounter; static Counter64 fastmodCounter; static ServerStatusMetricField displayIdhack( "operation.idhack", &idhackCounter ); static ServerStatusMetricField displayScanAndOrder( "operation.scanAndOrder", &scanAndOrderCounter ); static ServerStatusMetricField displayFastMod( "operation.fastmod", &fastmodCounter ); void OpDebug::recordStats() { if ( nreturned > 0 ) returnedCounter.increment( nreturned ); if ( ninserted > 0 ) insertedCounter.increment( ninserted ); if ( nMatched > 0 ) updatedCounter.increment( nMatched ); if ( ndeleted > 0 ) deletedCounter.increment( ndeleted ); if ( nscanned > 0 ) scannedCounter.increment( nscanned ); if ( nscannedObjects > 0 ) scannedObjectCounter.increment( nscannedObjects ); if ( idhack ) idhackCounter.increment(); if ( scanAndOrder ) scanAndOrderCounter.increment(); if ( fastmod ) fastmodCounter.increment(); } CurOp::MaxTimeTracker::MaxTimeTracker() { reset(); } void CurOp::MaxTimeTracker::reset() { _enabled = false; _targetEpochMicros = 0; _approxTargetServerMillis = 0; } void CurOp::MaxTimeTracker::setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros) { dassert(durationMicros != 0); _enabled = true; _targetEpochMicros = startEpochMicros + durationMicros; uint64_t now = curTimeMicros64(); // If our accurate time source thinks time is not up yet, calculate the next target for // our approximate time source. if (_targetEpochMicros > now) { _approxTargetServerMillis = Listener::getElapsedTimeMillis() + static_cast((_targetEpochMicros - now) / 1000); } // Otherwise, set our approximate time source target such that it thinks time is already // up. else { _approxTargetServerMillis = Listener::getElapsedTimeMillis(); } } bool CurOp::MaxTimeTracker::checkTimeLimit() { if (!_enabled) { return false; } // Does our approximate time source think time is not up yet? If so, return early. if (_approxTargetServerMillis > Listener::getElapsedTimeMillis()) { return false; } uint64_t now = curTimeMicros64(); // Does our accurate time source think time is not up yet? If so, readjust the target for // our approximate time source and return early. if (_targetEpochMicros > now) { _approxTargetServerMillis = Listener::getElapsedTimeMillis() + static_cast((_targetEpochMicros - now) / 1000); return false; } // Otherwise, time is up. return true; } uint64_t CurOp::MaxTimeTracker::getRemainingMicros() const { if (!_enabled) { // 0 is "allow to run indefinitely". return 0; } // Does our accurate time source think time is up? If so, claim there is 1 microsecond // left for this operation. uint64_t now = curTimeMicros64(); if (_targetEpochMicros <= now) { return 1; } // Otherwise, calculate remaining time. return _targetEpochMicros - now; } }