/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/base/counter.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/curop.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/kill_current_op.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
// Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a valid
// non-zero max time to fail immediately. Any getmore operation on a cursor already created
// with a valid non-zero max time will also fail immediately.
//
// This fail point cannot be used with the maxTimeNeverTimeOut fail point.
MONGO_FP_DECLARE(maxTimeAlwaysTimeOut);
// Enabling the maxTimeNeverTimeOut fail point will cause the server to never time out any
// query, command, or getmore operation, regardless of whether a max time is set.
//
// This fail point cannot be used with the maxTimeAlwaysTimeOut fail point.
MONGO_FP_DECLARE(maxTimeNeverTimeOut);
// todo : move more here
CurOp::CurOp( Client * client , CurOp * wrapped ) :
_client(client),
_wrapped(wrapped)
{
if ( _wrapped )
_client->_curOp = this;
_start = 0;
_active = false;
_reset();
_op = 0;
_opNum = _nextOpNum++;
_command = NULL;
// These addresses should never be written to again. The zeroes are
// placed here as a precaution because currentOp may be accessed
// without the db mutex.
memset(_ns, 0, sizeof(_ns));
}
void CurOp::_reset() {
_suppressFromCurop = false;
_isCommand = false;
_dbprofile = 0;
_end = 0;
_maxTimeMicros = 0;
_maxTimeTracker.reset();
_message = "";
_progressMeter.finished();
_killPending.store(0);
killCurrentOp.notifyAllWaiters();
_numYields = 0;
_expectedLatencyMs = 0;
_lockStat.reset();
}
void CurOp::reset() {
_reset();
_start = 0;
_opNum = _nextOpNum++;
_ns[0] = 0;
_debug.reset();
_query.reset();
_active = true; // this should be last for ui clarity
}
void CurOp::reset( const HostAndPort& remote, int op ) {
reset();
if( _remote != remote ) {
// todo : _remote is not thread safe yet is used as such!
_remote = remote;
}
_op = op;
}
ProgressMeter& CurOp::setMessage(const char * msg,
std::string name,
unsigned long long progressMeterTotal,
int secondsBetween) {
if ( progressMeterTotal ) {
if ( _progressMeter.isActive() ) {
cout << "about to assert, old _message: " << _message << " new message:" << msg << endl;
verify( ! _progressMeter.isActive() );
}
_progressMeter.reset( progressMeterTotal , secondsBetween );
_progressMeter.setName(name);
}
else {
_progressMeter.finished();
}
_message = msg;
return _progressMeter;
}
CurOp::~CurOp() {
killCurrentOp.notifyAllWaiters();
if ( _wrapped ) {
scoped_lock bl(Client::clientsMutex);
_client->_curOp = _wrapped;
}
_client = 0;
}
void CurOp::setNS( const StringData& ns ) {
ns.substr( 0, Namespace::MaxNsLen ).copyTo( _ns, true );
}
void CurOp::ensureStarted() {
if ( _start == 0 ) {
_start = curTimeMicros64();
// If ensureStarted() is invoked after setMaxTimeMicros(), then time limit tracking will
// start here. This is because time limit tracking can only commence after the
// operation is assigned a start time.
if (_maxTimeMicros > 0) {
_maxTimeTracker.setTimeLimit(_start, _maxTimeMicros);
}
}
}
void CurOp::enter( Client::Context * context ) {
ensureStarted();
strncpy( _ns, context->ns(), Namespace::MaxNsLen);
_ns[Namespace::MaxNsLen] = 0;
_dbprofile = std::max( context->_db ? context->_db->getProfilingLevel() : 0 , _dbprofile );
}
void CurOp::leave( Client::Context * context ) {
}
void CurOp::recordGlobalTime( long long micros ) const {
if ( _client ) {
const LockState& ls = _client->lockState();
verify( ls.threadState() );
Top::global.record( _ns , _op , ls.hasAnyWriteLock() ? 1 : -1 , micros , _isCommand );
}
}
BSONObj CurOp::info() {
BSONObjBuilder b;
b.append("opid", _opNum);
bool a = _active && _start;
b.append("active", a);
if( a ) {
b.append("secs_running", elapsedSeconds() );
b.append("microsecs_running", static_cast(elapsedMicros()) );
}
b.append( "op" , opToString( _op ) );
b.append("ns", _ns);
if (_op == dbInsert) {
_query.append(b, "insert");
}
else {
_query.append(b , "query");
}
if ( !debug().planSummary.empty() ) {
b.append( "planSummary" , debug().planSummary.toString() );
}
if( !_remote.empty() ) {
b.append("client", _remote.toString());
}
if ( _client ) {
b.append( "desc" , _client->desc() );
if ( _client->_threadId.size() )
b.append( "threadId" , _client->_threadId );
if ( _client->_connectionId )
b.appendNumber( "connectionId" , _client->_connectionId );
_client->_ls.reportState(b);
}
if ( ! _message.empty() ) {
if ( _progressMeter.isActive() ) {
StringBuilder buf;
buf << _message.toString() << " " << _progressMeter.toString();
b.append( "msg" , buf.str() );
BSONObjBuilder sub( b.subobjStart( "progress" ) );
sub.appendNumber( "done" , (long long)_progressMeter.done() );
sub.appendNumber( "total" , (long long)_progressMeter.total() );
sub.done();
}
else {
b.append( "msg" , _message.toString() );
}
}
if( killPending() )
b.append("killPending", true);
b.append( "numYields" , _numYields );
b.append( "lockStats" , _lockStat.report() );
return b.obj();
}
BSONObj CurOp::description() {
BSONObjBuilder bob;
bool a = _active && _start;
bob.append("active", a);
bob.append( "op" , opToString( _op ) );
bob.append("ns", _ns);
if (_op == dbInsert) {
_query.append(bob, "insert");
}
else {
_query.append(bob, "query");
}
if( killPending() )
bob.append("killPending", true);
return bob.obj();
}
void CurOp::setKillWaiterFlags() {
for (size_t i = 0; i < _notifyList.size(); ++i)
*(_notifyList[i]) = true;
_notifyList.clear();
}
void CurOp::kill(bool* pNotifyFlag /* = NULL */) {
_killPending.store(1);
if (pNotifyFlag) {
_notifyList.push_back(pNotifyFlag);
}
}
void CurOp::setMaxTimeMicros(uint64_t maxTimeMicros) {
_maxTimeMicros = maxTimeMicros;
if (_maxTimeMicros == 0) {
// 0 is "allow to run indefinitely".
return;
}
// If the operation has a start time, then enable the tracker.
//
// If the operation has no start time yet, then ensureStarted() will take responsibility for
// enabling the tracker.
if (isStarted()) {
_maxTimeTracker.setTimeLimit(startTime(), _maxTimeMicros);
}
}
bool CurOp::maxTimeHasExpired() {
if (MONGO_FAIL_POINT(maxTimeNeverTimeOut)) {
return false;
}
if (_maxTimeMicros > 0 && MONGO_FAIL_POINT(maxTimeAlwaysTimeOut)) {
return true;
}
return _maxTimeTracker.checkTimeLimit();
}
uint64_t CurOp::getRemainingMaxTimeMicros() const {
return _maxTimeTracker.getRemainingMicros();
}
AtomicUInt CurOp::_nextOpNum;
static Counter64 returnedCounter;
static Counter64 insertedCounter;
static Counter64 updatedCounter;
static Counter64 deletedCounter;
static Counter64 scannedCounter;
static Counter64 scannedObjectCounter;
static ServerStatusMetricField displayReturned( "document.returned", &returnedCounter );
static ServerStatusMetricField displayUpdated( "document.updated", &updatedCounter );
static ServerStatusMetricField displayInserted( "document.inserted", &insertedCounter );
static ServerStatusMetricField displayDeleted( "document.deleted", &deletedCounter );
static ServerStatusMetricField displayScanned( "queryExecutor.scanned", &scannedCounter );
static ServerStatusMetricField displayScannedObjects( "queryExecutor.scannedObjects",
&scannedObjectCounter );
static Counter64 idhackCounter;
static Counter64 scanAndOrderCounter;
static Counter64 fastmodCounter;
static ServerStatusMetricField displayIdhack( "operation.idhack", &idhackCounter );
static ServerStatusMetricField displayScanAndOrder( "operation.scanAndOrder", &scanAndOrderCounter );
static ServerStatusMetricField displayFastMod( "operation.fastmod", &fastmodCounter );
void OpDebug::recordStats() {
if ( nreturned > 0 )
returnedCounter.increment( nreturned );
if ( ninserted > 0 )
insertedCounter.increment( ninserted );
if ( nMatched > 0 )
updatedCounter.increment( nMatched );
if ( ndeleted > 0 )
deletedCounter.increment( ndeleted );
if ( nscanned > 0 )
scannedCounter.increment( nscanned );
if ( nscannedObjects > 0 )
scannedObjectCounter.increment( nscannedObjects );
if ( idhack )
idhackCounter.increment();
if ( scanAndOrder )
scanAndOrderCounter.increment();
if ( fastmod )
fastmodCounter.increment();
}
CurOp::MaxTimeTracker::MaxTimeTracker() {
reset();
}
void CurOp::MaxTimeTracker::reset() {
_enabled = false;
_targetEpochMicros = 0;
_approxTargetServerMillis = 0;
}
void CurOp::MaxTimeTracker::setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros) {
dassert(durationMicros != 0);
_enabled = true;
_targetEpochMicros = startEpochMicros + durationMicros;
uint64_t now = curTimeMicros64();
// If our accurate time source thinks time is not up yet, calculate the next target for
// our approximate time source.
if (_targetEpochMicros > now) {
_approxTargetServerMillis = Listener::getElapsedTimeMillis() +
static_cast((_targetEpochMicros - now) / 1000);
}
// Otherwise, set our approximate time source target such that it thinks time is already
// up.
else {
_approxTargetServerMillis = Listener::getElapsedTimeMillis();
}
}
bool CurOp::MaxTimeTracker::checkTimeLimit() {
if (!_enabled) {
return false;
}
// Does our approximate time source think time is not up yet? If so, return early.
if (_approxTargetServerMillis > Listener::getElapsedTimeMillis()) {
return false;
}
uint64_t now = curTimeMicros64();
// Does our accurate time source think time is not up yet? If so, readjust the target for
// our approximate time source and return early.
if (_targetEpochMicros > now) {
_approxTargetServerMillis = Listener::getElapsedTimeMillis() +
static_cast((_targetEpochMicros - now) / 1000);
return false;
}
// Otherwise, time is up.
return true;
}
uint64_t CurOp::MaxTimeTracker::getRemainingMicros() const {
if (!_enabled) {
// 0 is "allow to run indefinitely".
return 0;
}
// Does our accurate time source think time is up? If so, claim there is 1 microsecond
// left for this operation.
uint64_t now = curTimeMicros64();
if (_targetEpochMicros <= now) {
return 1;
}
// Otherwise, calculate remaining time.
return _targetEpochMicros - now;
}
}