// @file curop.h
/*
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include "mongo/db/client.h"
#include "mongo/db/server_options.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/progress_meter.h"
#include "mongo/util/thread_safe_string.h"
#include "mongo/util/time_support.h"
namespace mongo {
class Client;
class Command;
class CurOp;
/**
* stores a copy of a bson obj in a fixed size buffer
* if its too big for the buffer, says "too big"
* useful for keeping a copy around indefinitely without wasting a lot of space or doing malloc
*/
class CachedBSONObjBase {
public:
static BSONObj _tooBig; // { $msg : "query not recording (too large)" }
};
template
class CachedBSONObj : public CachedBSONObjBase {
public:
enum { TOO_BIG_SENTINEL = 1 } ;
CachedBSONObj() {
_size = (int*)_buf;
reset();
}
void reset( int sz = 0 ) {
_lock.lock();
_reset( sz );
_lock.unlock();
}
void set( const BSONObj& o ) {
scoped_spinlock lk(_lock);
size_t sz = o.objsize();
if ( sz > sizeof(_buf) ) {
_reset(TOO_BIG_SENTINEL);
}
else {
memcpy(_buf, o.objdata(), sz );
}
}
int size() const { return *_size; }
bool have() const { return size() > 0; }
bool tooBig() const { return size() == TOO_BIG_SENTINEL; }
BSONObj get() const {
scoped_spinlock lk(_lock);
return _get();
}
void append( BSONObjBuilder& b , StringData name ) const {
scoped_spinlock lk(_lock);
BSONObj temp = _get();
b.append( name , temp );
}
private:
/** you have to be locked when you call this */
BSONObj _get() const {
int sz = size();
if ( sz == 0 )
return BSONObj();
if ( sz == TOO_BIG_SENTINEL )
return _tooBig;
return BSONObj( _buf ).copy();
}
/** you have to be locked when you call this */
void _reset( int sz ) { _size[0] = sz; }
mutable SpinLock _lock;
int * _size;
char _buf[BUFFER_SIZE];
};
/* lifespan is different than CurOp because of recursives with DBDirectClient */
class OpDebug {
public:
OpDebug() : planSummary(2048) { reset(); }
void reset();
void recordStats();
std::string report(const CurOp& curop, const SingleThreadedLockStats& lockStats) const;
/**
* Appends information about the current operation to "builder"
*
* @param curop reference to the CurOp that owns this OpDebug
* @param lockStats lockStats object containing locking information about the operation
*/
void append(const CurOp& curop,
const SingleThreadedLockStats& lockStats,
BSONObjBuilder& builder) const;
// -------------------
StringBuilder extra; // weird things we need to fix later
// basic options
int op;
bool iscommand;
ThreadSafeString ns;
BSONObj query;
BSONObj updateobj;
// detailed options
long long cursorid;
int ntoreturn;
int ntoskip;
bool exhaust;
// debugging/profile info
long long nscanned;
long long nscannedObjects;
bool idhack; // indicates short circuited code path on an update to make the update faster
bool scanAndOrder; // scanandorder query plan aspect was used
long long nMatched; // number of records that match the query
long long nModified; // number of records written (no no-ops)
long long nmoved; // updates resulted in a move (moves are expensive)
long long ninserted;
long long ndeleted;
bool fastmod;
bool fastmodinsert; // upsert of an $operation. builds a default object
bool upsert; // true if the update actually did an insert
bool cursorExhausted; // true if the cursor has been closed at end a find/getMore operation
int keyUpdates;
long long writeConflicts;
ThreadSafeString planSummary; // a brief std::string describing the query solution
// New Query Framework debugging/profiling info
// TODO: should this really be an opaque BSONObj? Not sure.
CachedBSONObj<4096> execStats;
// error handling
ExceptionInfo exceptionInfo;
// response info
int executionTime;
int nreturned;
int responseLength;
};
/* Current operation (for the current Client).
an embedded member of Client class, and typically used from within the mutex there.
*/
class CurOp : boost::noncopyable {
public:
static CurOp* get(const Client* client);
static CurOp* get(const Client& client);
explicit CurOp(Client* client);
~CurOp();
bool haveQuery() const { return _query.have(); }
BSONObj query() const { return _query.get(); }
void appendQuery( BSONObjBuilder& b , StringData name ) const { _query.append( b , name ); }
void enter(const char* ns, int dbProfileLevel);
void reset();
void reset( const HostAndPort& remote, int op );
void markCommand() { _isCommand = true; }
OpDebug& debug() { return _debug; }
std::string getNS() const { return _ns.toString(); }
bool shouldDBProfile( int ms ) const {
if ( _dbprofile <= 0 )
return false;
return _dbprofile >= 2 || ms >= serverGlobalParams.slowMS;
}
unsigned int opNum() const { return _opNum; }
/** if this op is running */
bool active() const { return _active; }
int getOp() const { return _op; }
//
// Methods for controlling CurOp "max time".
//
/**
* Sets the amount of time operation this should be allowed to run, units of microseconds.
* The special value 0 is "allow to run indefinitely".
*/
void setMaxTimeMicros(uint64_t maxTimeMicros);
/**
* Checks whether this operation has been running longer than its time limit. Returns
* false if not, or if the operation has no time limit.
*
* Note that KillCurrentOp objects are responsible for interrupting CurOp objects that
* have exceeded their allotted time; CurOp objects do not interrupt themselves.
*/
bool maxTimeHasExpired();
/**
* Returns the number of microseconds remaining for this operation's time limit, or the
* special value 0 if the operation has no time limit.
*
* Calling this method is more expensive than calling its sibling "maxTimeHasExpired()",
* since an accurate measure of remaining time needs to be calculated.
*/
uint64_t getRemainingMaxTimeMicros() const;
//
// Methods for getting/setting elapsed time.
//
void ensureStarted();
bool isStarted() const { return _start > 0; }
long long startTime() { // micros
ensureStarted();
return _start;
}
void done() {
_active = false;
_end = curTimeMicros64();
}
long long totalTimeMicros() {
massert( 12601 , "CurOp not marked done yet" , ! _active );
return _end - startTime();
}
int totalTimeMillis() { return (int) (totalTimeMicros() / 1000); }
long long elapsedMicros() {
return curTimeMicros64() - startTime();
}
int elapsedMillis() {
return (int) (elapsedMicros() / 1000);
}
int elapsedSeconds() { return elapsedMillis() / 1000; }
void setQuery(const BSONObj& query) { _query.set( query ); }
Command * getCommand() const { return _command; }
void setCommand(Command* command) { _command = command; }
void reportState(BSONObjBuilder* builder);
// Fetches less information than "info()"; used to search for ops with certain criteria
BSONObj description();
std::string getRemoteString( bool includePort = true ) {
if (includePort)
return _remote.toString();
return _remote.host();
}
ProgressMeter& setMessage(const char * msg,
std::string name = "Progress",
unsigned long long progressMeterTotal = 0,
int secondsBetween = 3);
std::string getMessage() const { return _message.toString(); }
ProgressMeter& getProgressMeter() { return _progressMeter; }
CurOp *parent() const { return _parent; }
void kill();
bool killPendingStrict() const { return _killPending.load(); }
bool killPending() const { return _killPending.loadRelaxed(); }
void yielded() { _numYields++; }
int numYields() const { return _numYields; }
long long getExpectedLatencyMs() const { return _expectedLatencyMs; }
void setExpectedLatencyMs( long long latency ) { _expectedLatencyMs = latency; }
void recordGlobalTime(bool isWriteLocked, long long micros) const;
/**
* this should be used very sparingly
* generally the Context should set this up
* but sometimes you want to do it ahead of time
*/
void setNS( StringData ns );
private:
class ClientCuropStack;
static const Client::Decoration _curopStack;
CurOp(Client*, ClientCuropStack*);
void _reset();
static AtomicUInt32 _nextOpNum;
ClientCuropStack* _stack;
CurOp* _parent = nullptr;
Command * _command;
long long _start;
long long _end;
bool _active;
int _op;
bool _isCommand;
int _dbprofile; // 0=off, 1=slow, 2=all
unsigned int _opNum;
ThreadSafeString _ns;
HostAndPort _remote; // CAREFUL here with thread safety
CachedBSONObj<512> _query; // CachedBSONObj is thread safe
OpDebug _debug;
ThreadSafeString _message;
ProgressMeter _progressMeter;
AtomicInt32 _killPending;
int _numYields;
// this is how much "extra" time a query might take
// a writebacklisten for example will block for 30s
// so this should be 30000 in that case
long long _expectedLatencyMs;
// Time limit for this operation. 0 if the operation has no time limit.
uint64_t _maxTimeMicros;
/** Nested class that implements tracking of a time limit for a CurOp object. */
class MaxTimeTracker {
MONGO_DISALLOW_COPYING(MaxTimeTracker);
public:
/** Newly-constructed MaxTimeTracker objects have the time limit disabled. */
MaxTimeTracker();
/** Disables the time tracker. */
void reset();
/** Returns whether or not time tracking is enabled. */
bool isEnabled() const { return _enabled; }
/**
* Enables time tracking. The time limit is set to be "durationMicros" microseconds
* from "startEpochMicros" (units of microseconds since the epoch).
*
* "durationMicros" must be nonzero.
*/
void setTimeLimit(uint64_t startEpochMicros, uint64_t durationMicros);
/**
* Checks whether the time limit has been hit. Returns false if not, or if time
* tracking is disabled.
*/
bool checkTimeLimit();
/**
* Returns the number of microseconds remaining for the time limit, or the special
* value 0 if time tracking is disabled.
*
* Calling this method is more expensive than calling its sibling "checkInterval()",
* since an accurate measure of remaining time needs to be calculated.
*/
uint64_t getRemainingMicros() const;
private:
// Whether or not time tracking is enabled for this operation.
bool _enabled;
// Point in time at which the time limit is hit. Units of microseconds since the
// epoch.
uint64_t _targetEpochMicros;
// Approximate point in time at which the time limit is hit. Units of milliseconds
// since the server process was started.
int64_t _approxTargetServerMillis;
} _maxTimeTracker;
};
}