// @file curop.h /* * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #pragma once #include "namespace-inl.h" #include "client.h" #include "../bson/util/atomic_int.h" #include "../util/concurrency/spin_lock.h" #include "../util/time_support.h" #include "../util/net/hostandport.h" namespace mongo { class CurOp; /* lifespan is different than CurOp because of recursives with DBDirectClient */ class OpDebug { public: OpDebug() : ns(""){ reset(); } void reset(); string toString() const; void append( const CurOp& curop, BSONObjBuilder& b ) const; // ------------------- StringBuilder extra; // weird things we need to fix later // basic options int op; bool iscommand; Namespace ns; BSONObj query; BSONObj updateobj; // detailed options long long cursorid; int ntoreturn; int ntoskip; bool exhaust; // debugging/profile info int nscanned; bool idhack; bool scanAndOrder; bool moved; bool fastmod; bool fastmodinsert; bool upsert; unsigned keyUpdates; // error handling ExceptionInfo exceptionInfo; // response info int executionTime; int nreturned; int responseLength; }; /** * stores a copy of a bson obj in a fixed size buffer * if its too big for the buffer, says "too big" * useful for keeping a copy around indefinitely without wasting a lot of space or doing malloc */ class CachedBSONObj { public: enum { TOO_BIG_SENTINEL = 1 } ; static BSONObj _tooBig; // { $msg : "query not recording (too large)" } CachedBSONObj() { _size = (int*)_buf; reset(); } void reset( int sz = 0 ) { _lock.lock(); _reset( sz ); _lock.unlock(); } void set( const BSONObj& o ) { _lock.lock(); try { int sz = o.objsize(); if ( sz > (int) sizeof(_buf) ) { _reset(TOO_BIG_SENTINEL); } else { memcpy(_buf, o.objdata(), sz ); } _lock.unlock(); } catch ( ... ) { _lock.unlock(); throw; } } int size() const { return *_size; } bool have() const { return size() > 0; } BSONObj get() const { _lock.lock(); BSONObj o; try { o = _get(); _lock.unlock(); } catch ( ... ) { _lock.unlock(); throw; } return o; } void append( BSONObjBuilder& b , const StringData& name ) const { scoped_spinlock lk(_lock); BSONObj temp = _get(); b.append( name , temp ); } private: /** you have to be locked when you call this */ BSONObj _get() const { int sz = size(); if ( sz == 0 ) return BSONObj(); if ( sz == TOO_BIG_SENTINEL ) return _tooBig; return BSONObj( _buf ).copy(); } /** you have to be locked when you call this */ void _reset( int sz ) { _size[0] = sz; } mutable SpinLock _lock; int * _size; char _buf[512]; }; /* Current operation (for the current Client). an embedded member of Client class, and typically used from within the mutex there. */ class CurOp : boost::noncopyable { public: CurOp( Client * client , CurOp * wrapped = 0 ); ~CurOp(); bool haveQuery() const { return _query.have(); } BSONObj query() { return _query.get(); } void appendQuery( BSONObjBuilder& b , const StringData& name ) const { _query.append( b , name ); } void ensureStarted() { if ( _start == 0 ) _start = _checkpoint = curTimeMicros64(); } bool isStarted() const { return _start > 0; } void enter( Client::Context * context ); void leave( Client::Context * context ); void reset() { _reset(); _start = _checkpoint = 0; _opNum = _nextOpNum++; _ns[0] = 0; _debug.reset(); _query.reset(); _active = true; // this should be last for ui clarity } void reset( const HostAndPort& remote, int op ) { reset(); _remote = remote; _op = op; } void markCommand() { _command = true; } void waitingForLock( int type ) { _waitingForLock = true; if ( type > 0 ) _lockType = 1; else _lockType = -1; } void gotLock() { _waitingForLock = false; } OpDebug& debug() { return _debug; } int profileLevel() const { return _dbprofile; } const char * getNS() const { return _ns; } bool shouldDBProfile( int ms ) const { if ( _dbprofile <= 0 ) return false; return _dbprofile >= 2 || ms >= cmdLine.slowMS; } AtomicUInt opNum() const { return _opNum; } /** if this op is running */ bool active() const { return _active; } int getLockType() const { return _lockType; } bool isWaitingForLock() const { return _waitingForLock; } int getOp() const { return _op; } /** micros */ unsigned long long startTime() { ensureStarted(); return _start; } void done() { _active = false; _end = curTimeMicros64(); } unsigned long long totalTimeMicros() { massert( 12601 , "CurOp not marked done yet" , ! _active ); return _end - startTime(); } int totalTimeMillis() { return (int) (totalTimeMicros() / 1000); } int elapsedMillis() { unsigned long long total = curTimeMicros64() - startTime(); return (int) (total / 1000); } int elapsedSeconds() { return elapsedMillis() / 1000; } void setQuery(const BSONObj& query) { _query.set( query ); } Client * getClient() const { return _client; } BSONObj info() { if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) { BSONObjBuilder b; b.append("err", "unauthorized"); return b.obj(); } return infoNoauth(); } BSONObj infoNoauth(); string getRemoteString( bool includePort = true ) { return _remote.toString(includePort); } ProgressMeter& setMessage( const char * msg , unsigned long long progressMeterTotal = 0 , int secondsBetween = 3 ) { if ( progressMeterTotal ) { if ( _progressMeter.isActive() ) { cout << "about to assert, old _message: " << _message << " new message:" << msg << endl; assert( ! _progressMeter.isActive() ); } _progressMeter.reset( progressMeterTotal , secondsBetween ); } else { _progressMeter.finished(); } _message = msg; return _progressMeter; } string getMessage() const { return _message.toString(); } ProgressMeter& getProgressMeter() { return _progressMeter; } CurOp *parent() const { return _wrapped; } void kill() { _killed = true; } bool killed() const { return _killed; } void yielded() { _numYields++; } void setNS(const char *ns) { strncpy(_ns, ns, Namespace::MaxNsLen); _ns[Namespace::MaxNsLen] = 0; } friend class Client; private: static AtomicUInt _nextOpNum; Client * _client; CurOp * _wrapped; unsigned long long _start; unsigned long long _checkpoint; unsigned long long _end; bool _active; int _op; bool _command; int _lockType; // see concurrency.h for values bool _waitingForLock; int _dbprofile; // 0=off, 1=slow, 2=all AtomicUInt _opNum; char _ns[Namespace::MaxNsLen+2]; HostAndPort _remote; CachedBSONObj _query; OpDebug _debug; ThreadSafeString _message; ProgressMeter _progressMeter; volatile bool _killed; int _numYields; void _reset() { _command = false; _lockType = 0; _dbprofile = 0; _end = 0; _waitingForLock = false; _message = ""; _progressMeter.finished(); _killed = false; _numYields = 0; } }; /* _globalKill: we are shutting down otherwise kill attribute set on specified CurOp this class does not handle races between interruptJs and the checkForInterrupt functions - those must be handled by the client of this class */ extern class KillCurrentOp { public: void killAll(); void kill(AtomicUInt i); /** @return true if global interrupt and should terminate the operation */ bool globalInterruptCheck() const { return _globalKill; } void checkForInterrupt( bool heedMutex = true ) { if ( heedMutex && dbMutex.isWriteLocked() ) return; if( _globalKill ) uasserted(11600,"interrupted at shutdown"); if( cc().curop()->killed() ) uasserted(11601,"interrupted"); } /** @return "" if not interrupted. otherwise, you should stop. */ const char *checkForInterruptNoAssert( bool heedMutex = true ) { if ( heedMutex && dbMutex.isWriteLocked() ) return ""; if( _globalKill ) return "interrupted at shutdown"; if( cc().curop()->killed() ) return "interrupted"; return ""; } private: void interruptJs( AtomicUInt *op ); volatile bool _globalKill; } killCurrentOp; }