/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/base/counter.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/curop.h"
#include "mongo/db/catalog/database.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
// Enabling the maxTimeAlwaysTimeOut fail point will cause any query or command run with a valid
// non-zero max time to fail immediately. Any getmore operation on a cursor already created
// with a valid non-zero max time will also fail immediately.
//
// This fail point cannot be used with the maxTimeNeverTimeOut fail point.
MONGO_FP_DECLARE(maxTimeAlwaysTimeOut);
// Enabling the maxTimeNeverTimeOut fail point will cause the server to never time out any
// query, command, or getmore operation, regardless of whether a max time is set.
//
// This fail point cannot be used with the maxTimeAlwaysTimeOut fail point.
MONGO_FP_DECLARE(maxTimeNeverTimeOut);
// todo : move more here
CurOp::CurOp( Client * client , CurOp * wrapped ) :
_client(client),
_wrapped(wrapped)
{
if ( _wrapped )
_client->_curOp = this;
_start = 0;
_active = false;
_reset();
_op = 0;
_opNum = _nextOpNum++;
_command = NULL;
}
void CurOp::_reset() {
_suppressFromCurop = false;
_isCommand = false;
_dbprofile = 0;
_end = 0;
_maxTimeMicros = 0;
_maxTimeTracker.reset();
_message = "";
_progressMeter.finished();
_killPending.store(0);
_numYields = 0;
_expectedLatencyMs = 0;
_lockStat.reset();
}
void CurOp::reset() {
_reset();
_start = 0;
_opNum = _nextOpNum++;
_debug.reset();
_query.reset();
_active = true; // this should be last for ui clarity
}
void CurOp::reset( const HostAndPort& remote, int op ) {
reset();
if( _remote != remote ) {
// todo : _remote is not thread safe yet is used as such!
_remote = remote;
}
_op = op;
}
ProgressMeter& CurOp::setMessage(const char * msg,
std::string name,
unsigned long long progressMeterTotal,
int secondsBetween) {
if ( progressMeterTotal ) {
if ( _progressMeter.isActive() ) {
cout << "about to assert, old _message: " << _message << " new message:" << msg << endl;
verify( ! _progressMeter.isActive() );
}
_progressMeter.reset( progressMeterTotal , secondsBetween );
_progressMeter.setName(name);
}
else {
_progressMeter.finished();
}
_message = msg;
return _progressMeter;
}
CurOp::~CurOp() {
if ( _wrapped ) {
scoped_lock bl(Client::clientsMutex);
_client->_curOp = _wrapped;
}
_client = 0;
}
void CurOp::setNS( const StringData& ns ) {
// _ns copies the data in the null-terminated ptr it's given
_ns = ns.toString().c_str();
}
void CurOp::ensureStarted() {
if ( _start == 0 ) {
_start = curTimeMicros64();
// If ensureStarted() is invoked after setMaxTimeMicros(), then time limit tracking will
// start here. This is because time limit tracking can only commence after the
// operation is assigned a start time.
if (_maxTimeMicros > 0) {
_maxTimeTracker.setTimeLimit(_start, _maxTimeMicros);
}
}
}
void CurOp::enter( Client::Context * context ) {
ensureStarted();
_ns = context->ns();
_dbprofile = std::max( context->_db ? context->_db->getProfilingLevel() : 0 , _dbprofile );
}
void CurOp::recordGlobalTime(bool isWriteLocked, long long micros) const {
string nsStr = _ns.toString();
Top::global.record(nsStr, _op, isWriteLocked ? 1 : -1, micros, _isCommand);
}
void CurOp::reportState(BSONObjBuilder* builder) {
builder->append("opid", _opNum);
bool a = _active && _start;
builder->append("active", a);
if( a ) {
builder->append("secs_running", elapsedSeconds() );
builder->append("microsecs_running", static_cast(elapsedMicros()) );
}
builder->append( "op" , opToString( _op ) );
builder->append("ns", _ns.toString());
if (_op == dbInsert) {
_query.append(*builder, "insert");
}
else {
_query.append(*builder, "query");
}
if ( !debug().planSummary.empty() ) {
builder->append( "planSummary" , debug().planSummary.toString() );
}
if( !_remote.empty() ) {
builder->append("client", _remote.toString());
}
if ( ! _message.empty() ) {
if ( _progressMeter.isActive() ) {
StringBuilder buf;
buf << _message.toString() << " " << _progressMeter.toString();
builder->append( "msg" , buf.str() );
BSONObjBuilder sub( builder->subobjStart( "progress" ) );
sub.appendNumber( "done" , (long long)_progressMeter.done() );
sub.appendNumber( "total" , (long long)_progressMeter.total() );
sub.done();
}
else {
builder->append( "msg" , _message.toString() );
}
}
if( killPending() )
builder->append("killPending", true);
builder->append( "numYields" , _numYields );
builder->append( "lockStats" , _lockStat.report() );
}
BSONObj CurOp::description() {
BSONObjBuilder bob;
bool a = _active && _start;
bob.append("active", a);
bob.append( "op" , opToString( _op ) );
bob.append("ns", _ns.toString());
if (_op == dbInsert) {
_query.append(bob, "insert");
}
else {
_query.append(bob, "query");
}
if( killPending() )
bob.append("killPending", true);
return bob.obj();
}
void CurOp::kill() {
_killPending.store(1);
}
void CurOp::setMaxTimeMicros(uint64_t maxTimeMicros) {
_maxTimeMicros = maxTimeMicros;
if (_maxTimeMicros == 0) {
// 0 is "allow to run indefinitely".
return;
}
// If the operation has a start time, then enable the tracker.
//
// If the operation has no start time yet, then ensureStarted() will take responsibility for
// enabling the tracker.
if (isStarted()) {
_maxTimeTracker.setTimeLimit(startTime(), _maxTimeMicros);
}
}
bool CurOp::maxTimeHasExpired() {
if (MONGO_FAIL_POINT(maxTimeNeverTimeOut)) {
return false;
}
if (_maxTimeMicros > 0 && MONGO_FAIL_POINT(maxTimeAlwaysTimeOut)) {
return true;
}
return _maxTimeTracker.checkTimeLimit();
}
uint64_t CurOp::getRemainingMaxTimeMicros() const {
return _maxTimeTracker.getRemainingMicros();
}
AtomicUInt CurOp::_nextOpNum;
static Counter64 returnedCounter;
static Counter64 insertedCounter;
static Counter64 updatedCounter;
static Counter64 deletedCounter;
static Counter64 scannedCounter;
static Counter64 scannedObjectCounter;
static ServerStatusMetricField displayReturned( "document.returned", &returnedCounter );
static ServerStatusMetricField displayUpdated( "document.updated", &updatedCounter );
static ServerStatusMetricField displayInserted( "document.inserted", &insertedCounter );
static ServerStatusMetricField displayDeleted( "document.deleted", &deletedCounter );
static ServerStatusMetricField displayScanned( "queryExecutor.scanned", &scannedCounter );
static ServerStatusMetricField displayScannedObjects( "queryExecutor.scannedObjects",
&scannedObjectCounter );
static Counter64 idhackCounter;
static Counter64 scanAndOrderCounter;
static Counter64 fastmodCounter;
static ServerStatusMetricField displayIdhack( "operation.idhack", &idhackCounter );
static ServerStatusMetricField displayScanAndOrder( "operation.scanAndOrder", &scanAndOrderCounter );
static ServerStatusMetricField displayFastMod( "operation.fastmod", &fastmodCounter );
void OpDebug::recordStats() {
if ( nreturned > 0 )
returnedCounter.increment( nreturned );
if ( ninserted > 0 )
insertedCounter.increment( ninserted );
if ( nMatched > 0 )
updatedCounter.increment( nMatched );
if ( ndeleted > 0 )
deletedCounter.increment( ndeleted );
if ( nscanned > 0 )
scannedCounter.increment( nscanned );
if ( nscannedObjects > 0 )
scannedObjectCounter.increment( nscannedObjects );
if ( idhack )
idhackCounter.increment();
if ( scanAndOrder )
scanAndOrderCounter.increment();
if ( fastmod )
fastmodCounter.increment();
}
CurOp::MaxTimeTracker::MaxTimeTracker() {
reset();
}
void CurOp::MaxTimeTracker::reset() {
_enabled = false;
_targetEpochMicros = 0;
_approxTargetServerMillis = 0;
}
void CurOp::MaxTimeTracker::setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros) {
dassert(durationMicros != 0);
_enabled = true;
_targetEpochMicros = startEpochMicros + durationMicros;
uint64_t now = curTimeMicros64();
// If our accurate time source thinks time is not up yet, calculate the next target for
// our approximate time source.
if (_targetEpochMicros > now) {
_approxTargetServerMillis = Listener::getElapsedTimeMillis() +
static_cast((_targetEpochMicros - now) / 1000);
}
// Otherwise, set our approximate time source target such that it thinks time is already
// up.
else {
_approxTargetServerMillis = Listener::getElapsedTimeMillis();
}
}
bool CurOp::MaxTimeTracker::checkTimeLimit() {
if (!_enabled) {
return false;
}
// Does our approximate time source think time is not up yet? If so, return early.
if (_approxTargetServerMillis > Listener::getElapsedTimeMillis()) {
return false;
}
uint64_t now = curTimeMicros64();
// Does our accurate time source think time is not up yet? If so, readjust the target for
// our approximate time source and return early.
if (_targetEpochMicros > now) {
_approxTargetServerMillis = Listener::getElapsedTimeMillis() +
static_cast((_targetEpochMicros - now) / 1000);
return false;
}
// Otherwise, time is up.
return true;
}
uint64_t CurOp::MaxTimeTracker::getRemainingMicros() const {
if (!_enabled) {
// 0 is "allow to run indefinitely".
return 0;
}
// Does our accurate time source think time is up? If so, claim there is 1 microsecond
// left for this operation.
uint64_t now = curTimeMicros64();
if (_targetEpochMicros <= now) {
return 1;
}
// Otherwise, calculate remaining time.
return _targetEpochMicros - now;
}
}