diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-06-26 10:38:26 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-06-26 16:15:50 -0400 |
commit | 9e93c8d95e7b7fb56f8746fd691b514d7957045f (patch) | |
tree | 55db70a570be4398b0e6a8a508e93e7b5a9823d5 /src/mongo | |
parent | 6937b273a7a9db13279161ba3e74259eb89b0d15 (diff) | |
download | mongo-9e93c8d95e7b7fb56f8746fd691b514d7957045f.tar.gz |
SERVER-13961 Add capability to register and discover OperationContexts
This replaces the need to iterate through the list of registered Clients
for diagnostics/reporting purposes. Also moves some of the per-client
information to be under OperationContext.
This is in preparation for removing LockState from TLS.
Diffstat (limited to 'src/mongo')
42 files changed, 353 insertions, 232 deletions
diff --git a/src/mongo/client/clientAndShell.cpp b/src/mongo/client/clientAndShell.cpp index 66092e70e2e..23ebd04ac63 100644 --- a/src/mongo/client/clientAndShell.cpp +++ b/src/mongo/client/clientAndShell.cpp @@ -85,8 +85,4 @@ namespace mongo { ClientBasic* ClientBasic::getCurrent() { return 0; } - - bool ClientBasic::hasCurrent() { - return false; - } } diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 2abfc67e358..f427b73052a 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -32,7 +32,7 @@ to an open socket (or logical connection if pooling on sockets) from a client. */ -#include "mongo/pch.h" +#include "mongo/platform/basic.h" #include "mongo/db/client.h" @@ -67,6 +67,7 @@ #include "mongo/util/mongoutils/checksum.h" #include "mongo/util/mongoutils/str.h" + namespace mongo { mongo::mutex& Client::clientsMutex = *(new mutex("clientsMutex")); @@ -293,20 +294,6 @@ namespace mongo { return ""; } - string Client::toString() const { - stringstream ss; - if ( _curOp ) - ss << _curOp->info().jsonString(); - return ss.str(); - } - - string sayClientState() { - Client* c = currentClient.get(); - if ( !c ) - return "no client"; - return c->toString(); - } - bool Client::gotHandshake( const BSONObj& o ) { BSONObjIterator i(o); @@ -332,10 +319,6 @@ namespace mongo { return repl::theReplSet->registerSlave(_remoteId, o["member"].Int()); } - bool ClientBasic::hasCurrent() { - return currentClient.get(); - } - ClientBasic* ClientBasic::getCurrent() { return currentClient.get(); } @@ -362,57 +345,7 @@ namespace mongo { } handshakeCmd; - int Client::recommendedYieldMicros( int * writers , int * readers, bool needExact ) { - int num = 0; - int w = 0; - int r = 0; - { - scoped_lock bl(clientsMutex); - for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { - Client* c = *i; - if ( c->lockState().hasLockPending() ) { - num++; - if ( c->lockState().isWriteLocked() ) - w++; - else - r++; - } - if (num > 100 && !needExact) - break; - } - } - - if ( writers ) - *writers = w; - if ( readers ) - *readers = r; - - int time = r * 10; // we have to be nice to readers since they don't have priority - time += w; // writers are greedy, so we can be mean tot hem - - time = min( time , 1000000 ); - - return time; - } - int Client::getActiveClientCount( int& writers, int& readers ) { - writers = 0; - readers = 0; - - scoped_lock bl(clientsMutex); - for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { - Client* c = *i; - if ( ! c->curop()->active() ) - continue; - - if ( c->lockState().isWriteLocked() ) - writers++; - if ( c->lockState().hasAnyReadLock() ) - readers++; - } - - return writers + readers; - } void OpDebug::reset() { extra.reset(); diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 13dcaab77c9..48a8d74dd18 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -59,6 +59,7 @@ namespace mongo { class AbstractMessagingPort; class LockCollectionForReading; + TSP_DECLARE(Client, currentClient) typedef long long ConnectionId; @@ -69,11 +70,9 @@ namespace mongo { // always be in clientsMutex when manipulating this. killop stuff uses these. static std::set<Client*>& clients; static mongo::mutex& clientsMutex; - static int getActiveClientCount( int& writers , int& readers ); - class Context; + ~Client(); - static int recommendedYieldMicros( int * writers = 0 , int * readers = 0, - bool needExact = false ); + /** each thread which does db operations has a Client object in TLS. * call this when your thread starts. */ @@ -101,11 +100,11 @@ namespace mongo { bool isGod() const { return _god; } /* this is for map/reduce writes */ bool setGod(bool newVal) { const bool prev = _god; _god = newVal; return prev; } - std::string toString() const; bool gotHandshake( const BSONObj& o ); BSONObj getRemoteID() const { return _remoteId; } BSONObj getHandshake() const { return _handshake; } ConnectionId getConnectionId() const { return _connectionId; } + const std::string& getThreadId() const { return _threadId; } // XXX(hk): this is per-thread mmapv1 recovery unit stuff, move into that // impl of recovery unit @@ -139,6 +138,8 @@ namespace mongo { public: + class Context; + /** "read lock, and set my context, all in one operation" * This handles (if not recursively locked) opening an unopened database. */ diff --git a/src/mongo/db/client_basic.h b/src/mongo/db/client_basic.h index 0a41400f3df..c8a5f611aac 100644 --- a/src/mongo/db/client_basic.h +++ b/src/mongo/db/client_basic.h @@ -72,7 +72,6 @@ namespace mongo { AbstractMessagingPort * port() const { return _messagingPort; } static ClientBasic* getCurrent(); - static bool hasCurrent(); protected: ClientBasic(AbstractMessagingPort* messagingPort); diff --git a/src/mongo/db/clientlistplugin.cpp b/src/mongo/db/clientlistplugin.cpp index 7cf07358056..37046a3c05a 100644 --- a/src/mongo/db/clientlistplugin.cpp +++ b/src/mongo/db/clientlistplugin.cpp @@ -30,10 +30,59 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/operation_context.h" #include "mongo/db/dbwebserver.h" #include "mongo/util/mongoutils/html.h" + namespace mongo { + + class OperationsDataBuilder : public GlobalEnvironmentExperiment::ProcessOperationContext { + public: + OperationsDataBuilder(std::stringstream& stringStream) + : _stringStream(stringStream) { + + } + + virtual void processOpContext(OperationContext* txn) { + using namespace mongoutils::html; + + CurOp& co = *(txn->getCurOp()); + + _stringStream << "<tr><td>" << txn->getClient()->desc() << "</td>"; + + tablecell(_stringStream, co.opNum()); + tablecell(_stringStream, co.active()); + tablecell(_stringStream, txn->lockState()->reportState()); + if (co.active()) { + tablecell(_stringStream, co.elapsedSeconds()); + } + else { + tablecell(_stringStream, ""); + } + + tablecell(_stringStream, co.getOp()); + tablecell(_stringStream, html::escape(co.getNS())); + if (co.haveQuery()) { + tablecell(_stringStream, html::escape(co.query().toString())); + } + else { + tablecell(_stringStream, ""); + } + + tablecell(_stringStream, co.getRemoteString()); + + tablecell(_stringStream, co.getMessage()); + tablecell(_stringStream, co.getProgressMeter().toString()); + + _stringStream << "</tr>\n"; + } + + private: + std::stringstream& _stringStream; + }; + namespace { class ClientListPlugin : public WebStatusPlugin { public: @@ -58,35 +107,10 @@ namespace { << "<th>progress</th>" << "</tr>\n"; - { - scoped_lock bl(Client::clientsMutex); - for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { - Client *c = *i; - CurOp& co = *(c->curop()); - ss << "<tr><td>" << c->desc() << "</td>"; - - tablecell( ss , co.opNum() ); - tablecell( ss , co.active() ); - tablecell( ss , c->lockState().reportState() ); - if ( co.active() ) - tablecell( ss , co.elapsedSeconds() ); - else - tablecell( ss , "" ); - tablecell( ss , co.getOp() ); - tablecell( ss , html::escape( co.getNS() ) ); - if ( co.haveQuery() ) - tablecell( ss , html::escape( co.query().toString() ) ); - else - tablecell( ss , "" ); - tablecell( ss , co.getRemoteString() ); - - tablecell( ss , co.getMessage() ); - tablecell( ss , co.getProgressMeter().toString() ); - - - ss << "</tr>\n"; - } - } + + OperationsDataBuilder opCtxDataBuilder(ss); + getGlobalEnvironment()->forEachOperationContext(&opCtxDataBuilder); + ss << "</table>\n"; } diff --git a/src/mongo/db/commands/apply_ops.cpp b/src/mongo/db/commands/apply_ops.cpp index 733cdfd1dd6..88c8c66f484 100644 --- a/src/mongo/db/commands/apply_ops.cpp +++ b/src/mongo/db/commands/apply_ops.cpp @@ -85,6 +85,8 @@ namespace mongo { // ns used so locking individually requires more analysis Lock::GlobalWrite globalWriteLock(txn->lockState()); + DBDirectClient db(txn); + // Preconditions check reads the database state, so needs to be done locked if ( cmdObj["preCondition"].type() == Array ) { BSONObjIterator i( cmdObj["preCondition"].Obj() ); @@ -172,7 +174,6 @@ namespace mongo { return errors == 0; } - } applyOpsCmd; } diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp index 9c11baab7f7..c659454d684 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes.cpp @@ -214,7 +214,7 @@ namespace mongo { } bool run(OperationContext* txn, const string& dbname , BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { - DBDirectClient db; + DBDirectClient db(txn); BSONElement e = jsobj.firstElement(); string toDeleteNs = dbname + '.' + e.valuestr(); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 3eca6ce9259..38dbceefe4d 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -665,6 +665,7 @@ namespace mongo { State::State(OperationContext* txn, const Config& c) : _config(c), + _db(txn), _useIncremental(true), _txn(txn), _size(0), diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index b5b535b3ba9..178166402d3 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -166,67 +166,55 @@ namespace mongo { } } - BSONObj CurOp::info() { - BSONObjBuilder b; - b.append("opid", _opNum); + void CurOp::reportState(BSONObjBuilder* builder) { + builder->append("opid", _opNum); bool a = _active && _start; - b.append("active", a); + builder->append("active", a); if( a ) { - b.append("secs_running", elapsedSeconds() ); - b.append("microsecs_running", static_cast<long long int>(elapsedMicros()) ); + builder->append("secs_running", elapsedSeconds() ); + builder->append("microsecs_running", static_cast<long long int>(elapsedMicros()) ); } - b.append( "op" , opToString( _op ) ); + builder->append( "op" , opToString( _op ) ); - b.append("ns", _ns); + builder->append("ns", _ns); if (_op == dbInsert) { - _query.append(b, "insert"); + _query.append(*builder, "insert"); } else { - _query.append(b , "query"); + _query.append(*builder, "query"); } if ( !debug().planSummary.empty() ) { - b.append( "planSummary" , debug().planSummary.toString() ); + builder->append( "planSummary" , debug().planSummary.toString() ); } if( !_remote.empty() ) { - b.append("client", _remote.toString()); - } - - if ( _client ) { - b.append( "desc" , _client->desc() ); - if ( _client->_threadId.size() ) - b.append( "threadId" , _client->_threadId ); - if ( _client->_connectionId ) - b.appendNumber( "connectionId" , _client->_connectionId ); - _client->_ls.reportState(b); + builder->append("client", _remote.toString()); } if ( ! _message.empty() ) { if ( _progressMeter.isActive() ) { StringBuilder buf; buf << _message.toString() << " " << _progressMeter.toString(); - b.append( "msg" , buf.str() ); - BSONObjBuilder sub( b.subobjStart( "progress" ) ); + builder->append( "msg" , buf.str() ); + BSONObjBuilder sub( builder->subobjStart( "progress" ) ); sub.appendNumber( "done" , (long long)_progressMeter.done() ); sub.appendNumber( "total" , (long long)_progressMeter.total() ); sub.done(); } else { - b.append( "msg" , _message.toString() ); + builder->append( "msg" , _message.toString() ); } } if( killPending() ) - b.append("killPending", true); - - b.append( "numYields" , _numYields ); - b.append( "lockStats" , _lockStat.report() ); + builder->append("killPending", true); - return b.obj(); + builder->append( "numYields" , _numYields ); + builder->append( "lockStats" , _lockStat.report() ); } BSONObj CurOp::description() { diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 62a92215737..f24da292c98 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -283,7 +283,7 @@ namespace mongo { Command * getCommand() const { return _command; } void setCommand(Command* command) { _command = command; } - BSONObj info(); + void reportState(BSONObjBuilder* builder); // Fetches less information than "info()"; used to search for ops with certain criteria BSONObj description(); diff --git a/src/mongo/db/d_concurrency.cpp b/src/mongo/db/d_concurrency.cpp index a8651645303..530dc4d3879 100644 --- a/src/mongo/db/d_concurrency.cpp +++ b/src/mongo/db/d_concurrency.cpp @@ -36,9 +36,11 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/curop.h" #include "mongo/db/d_globals.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/lockstat.h" #include "mongo/db/namespace_string.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/operation_context.h" #include "mongo/server.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/mapsf.h" @@ -743,6 +745,40 @@ namespace mongo { } + /** + * This is passed to the iterator for global environments and aggregates information about the + * locks which are currently being held or waited on. + */ + class LockStateAggregator : public GlobalEnvironmentExperiment::ProcessOperationContext { + public: + LockStateAggregator(bool blockedOnly) + : numWriteLocked(0), + numReadLocked(0), + _blockedOnly(blockedOnly) { + + } + + virtual void processOpContext(OperationContext* txn) { + if (_blockedOnly && !txn->lockState()->hasLockPending()) { + return; + } + + if (txn->lockState()->isWriteLocked()) { + numWriteLocked++; + } + else { + numReadLocked++; + } + } + + int numWriteLocked; + int numReadLocked; + + private: + const bool _blockedOnly; + }; + + class GlobalLockServerStatusSection : public ServerStatusSection { public: GlobalLockServerStatusSection() : ServerStatusSection( "globalLock" ){ @@ -757,23 +793,29 @@ namespace mongo { t.append( "totalTime" , (long long)(1000 * ( curTimeMillis64() - _started ) ) ); t.append( "lockTime" , Lock::globalLockStat()->getTimeLocked( 'W' ) ); + // This returns the blocked lock states { BSONObjBuilder ttt( t.subobjStart( "currentQueue" ) ); - int w=0, r=0; - Client::recommendedYieldMicros( &w , &r, true ); - ttt.append( "total" , w + r ); - ttt.append( "readers" , r ); - ttt.append( "writers" , w ); + + LockStateAggregator blocked(true); + getGlobalEnvironment()->forEachOperationContext(&blocked); + + ttt.append("total", blocked.numReadLocked + blocked.numWriteLocked); + ttt.append("readers", blocked.numReadLocked); + ttt.append("writers", blocked.numWriteLocked); ttt.done(); } + // This returns all the active clients (including those holding locks) { BSONObjBuilder ttt( t.subobjStart( "activeClients" ) ); - int w=0, r=0; - Client::getActiveClientCount( w , r ); - ttt.append( "total" , w + r ); - ttt.append( "readers" , r ); - ttt.append( "writers" , w ); + + LockStateAggregator active(false); + getGlobalEnvironment()->forEachOperationContext(&active); + + ttt.append("total", active.numReadLocked + active.numWriteLocked); + ttt.append("readers", active.numReadLocked); + ttt.append("writers", active.numWriteLocked); ttt.done(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index bb57f42023e..29a5f1bf85c 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -178,11 +178,6 @@ namespace mongo { }; #endif - /* if server is really busy, wait a bit */ - void beNice() { - sleepmicros( Client::recommendedYieldMicros() ); - } - class MyMessageHandler : public MessageHandler { public: virtual void connected( AbstractMessagingPort* p ) { @@ -224,7 +219,6 @@ namespace mongo { m.appendData(b.buf(), b.len()); b.decouple(); DEV log() << "exhaust=true sending more" << endl; - beNice(); continue; // this goes back to top loop } } diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 2e5fadf2c70..8d833cb12b6 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -826,7 +826,7 @@ namespace mongo { break; // skipped chunk is probably on another shard } log() << "should have chunk: " << n << " have:" << myn << endl; - dumpChunks( ns , query , sort ); + dumpChunks(txn, ns, query, sort); uassert( 10040 , "chunks out of order" , n == myn ); } @@ -850,8 +850,11 @@ namespace mongo { return true; } - void dumpChunks( const string& ns , const BSONObj& query , const BSONObj& sort ) { - DBDirectClient client; + void dumpChunks(OperationContext* txn, + const string& ns, + const BSONObj& query, + const BSONObj& sort) { + DBDirectClient client(txn); Query q(query); q.sort(sort); auto_ptr<DBClientCursor> c = client.query(ns, q); @@ -1250,8 +1253,7 @@ namespace mongo { const BSONObj& cmdObj, std::vector<Privilege>* out) {} // No auth required virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { - BSONObj info = txn->getCurOp()->info(); - result << "you" << info[ "client" ]; + result << "you" << txn->getCurOp()->getRemoteString(); return true; } } cmdWhatsMyUri; diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 272e527c6c2..abbeea15a86 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -446,14 +446,6 @@ namespace mongo { } millisWaitingForReplication += replStatus.duration.total_milliseconds(); } - - if (!txn->lockState()->isLocked()) { - int micros = ( 2 * Client::recommendedYieldMicros() ) - secondaryThrottleTime.micros(); - if ( micros > 0 ) { - LOG(1) << "Helpers::removeRangeUnlocked going to sleep for " << micros << " micros" << endl; - sleepmicros( micros ); - } - } } if ( secondaryThrottle ) diff --git a/src/mongo/db/global_environment_d.cpp b/src/mongo/db/global_environment_d.cpp index dcbc758d5d6..cc0a2577190 100644 --- a/src/mongo/db/global_environment_d.cpp +++ b/src/mongo/db/global_environment_d.cpp @@ -38,7 +38,17 @@ namespace mongo { - GlobalEnvironmentMongoD::GlobalEnvironmentMongoD() : _globalKill(false) { } + GlobalEnvironmentMongoD::GlobalEnvironmentMongoD() + : _globalKill(false), + _registeredOpContextsMutex("RegisteredOpContextsMutex") { + + } + + GlobalEnvironmentMongoD::~GlobalEnvironmentMongoD() { + if (!_registeredOpContexts.empty()) { + warning() << "Terminating with outstanding operation contexts." << endl; + } + } namespace { void interruptJs(AtomicUInt* op) { @@ -97,6 +107,34 @@ namespace mongo { _globalKill = false; } + void GlobalEnvironmentMongoD::registerOperationContext(OperationContext* txn) { + scoped_lock lock(_registeredOpContextsMutex); + + // It is an error to register twice + pair<OperationContextSet::const_iterator, bool> inserted + = _registeredOpContexts.insert(txn); + invariant(inserted.second); + } + + void GlobalEnvironmentMongoD::unregisterOperationContext(OperationContext* txn) { + scoped_lock lock(_registeredOpContextsMutex); + + // It is an error to unregister twice or to unregister something that's not been registered + OperationContextSet::const_iterator it = _registeredOpContexts.find(txn); + invariant(it != _registeredOpContexts.end()); + + _registeredOpContexts.erase(it); + } + + void GlobalEnvironmentMongoD::forEachOperationContext(ProcessOperationContext* procOpCtx) { + scoped_lock lock(_registeredOpContextsMutex); + + OperationContextSet::iterator it; + for (it = _registeredOpContexts.begin(); it != _registeredOpContexts.end(); it++) { + procOpCtx->processOpContext(*it); + } + } + OperationContext* GlobalEnvironmentMongoD::newOpCtx() { return new OperationContextImpl(); } diff --git a/src/mongo/db/global_environment_d.h b/src/mongo/db/global_environment_d.h index 540a01715f5..35a8d039902 100644 --- a/src/mongo/db/global_environment_d.h +++ b/src/mongo/db/global_environment_d.h @@ -28,7 +28,11 @@ #pragma once +#include <set> + #include "mongo/db/global_environment_experiment.h" +#include "mongo/util/concurrency/mutex.h" + namespace mongo { @@ -36,6 +40,8 @@ namespace mongo { public: GlobalEnvironmentMongoD(); + ~GlobalEnvironmentMongoD(); + void setKillAllOperations(); void unsetKillAllOperations(); @@ -44,10 +50,21 @@ namespace mongo { bool killOperation(AtomicUInt opId); + void registerOperationContext(OperationContext* txn); + + void unregisterOperationContext(OperationContext* txn); + + void forEachOperationContext(ProcessOperationContext* procOpCtx); + OperationContext* newOpCtx(); private: bool _globalKill; + + typedef std::set<OperationContext*> OperationContextSet; + + mongo::mutex _registeredOpContextsMutex; + OperationContextSet _registeredOpContexts; }; } // namespace mongo diff --git a/src/mongo/db/global_environment_experiment.h b/src/mongo/db/global_environment_experiment.h index ab923baf833..a7d1aaac81a 100644 --- a/src/mongo/db/global_environment_experiment.h +++ b/src/mongo/db/global_environment_experiment.h @@ -67,6 +67,48 @@ namespace mongo { **/ virtual bool killOperation(AtomicUInt opId) = 0; + /** + * Registers the specified operation context on the global environment, so it is + * discoverable by diagnostics tools. + * + * This function must be thread-safe. + */ + virtual void registerOperationContext(OperationContext* txn) = 0; + + /** + * Unregisters a previously-registered operation context. It is an error to unregister the + * same context twice or to unregister a context, which has not previously been registered. + * + * This function must be thread-safe. + */ + virtual void unregisterOperationContext(OperationContext* txn) = 0; + + /** + * Notification object to be passed to forEachOperationContext so that certain processing + * can be done on all registered contexts. + */ + class ProcessOperationContext { + public: + + /** + * Invoked for each registered OperationContext. The pointer is guaranteed to be stable + * until the call returns. + * + * Implementations of this method should not acquire locks or do any operations, which + * might block and should generally do as little work as possible in order to not block + * the iteration or the release of the OperationContext. + */ + virtual void processOpContext(OperationContext* txn) = 0; + + virtual ~ProcessOperationContext() { } + }; + + /** + * Iterates over all registered operation contexts and invokes + * ProcessOperationContext::processOpContext for each. + */ + virtual void forEachOperationContext(ProcessOperationContext* procOpCtx) = 0; + // // Factories for storage interfaces // diff --git a/src/mongo/db/global_environment_noop.cpp b/src/mongo/db/global_environment_noop.cpp index 1d9c7e5edf1..f7af618c5ca 100644 --- a/src/mongo/db/global_environment_noop.cpp +++ b/src/mongo/db/global_environment_noop.cpp @@ -44,6 +44,18 @@ namespace mongo { return false; } + void GlobalEnvironmentNoop::registerOperationContext(OperationContext* txn) { + + } + + void GlobalEnvironmentNoop::unregisterOperationContext(OperationContext* txn) { + + } + + void GlobalEnvironmentNoop::forEachOperationContext(ProcessOperationContext* procOpCtx) { + + } + OperationContext* GlobalEnvironmentNoop::newOpCtx() { return new OperationContextNoop(); } diff --git a/src/mongo/db/global_environment_noop.h b/src/mongo/db/global_environment_noop.h index 2788c839afb..68644a303c1 100644 --- a/src/mongo/db/global_environment_noop.h +++ b/src/mongo/db/global_environment_noop.h @@ -40,6 +40,12 @@ namespace mongo { bool getKillAllOperations(); + void registerOperationContext(OperationContext* txn); + + void unregisterOperationContext(OperationContext* txn); + + void forEachOperationContext(ProcessOperationContext* procOpCtx); + OperationContext* newOpCtx(); }; diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 17111a87b73..3894d07c10d 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -115,7 +115,7 @@ namespace mongo { MONGO_FP_DECLARE(rsStopGetMore); - void inProgCmd( Message &m, DbResponse &dbresponse ) { + static void inProgCmd( Message &m, DbResponse &dbresponse ) { DbMessage d(m); QueryMessage q(d); BSONObjBuilder b; @@ -160,7 +160,10 @@ namespace mongo { } verify( co ); if( all || co->displayInCurop() ) { - BSONObj info = co->info(); + BSONObjBuilder infoBuilder; + co->reportState(&infoBuilder); + + const BSONObj info = infoBuilder.obj(); if ( all || m.matches( info )) { vals.push_back( info ); } @@ -1013,19 +1016,10 @@ namespace { return (unsigned long long )res; } - DBClientBase * createDirectClient() { + DBClientBase* createDirectClient() { return new DBDirectClient(); } - MONGO_INITIALIZER_GENERAL(CreateJSDirectClient, - ("StorageEngineInit"), - MONGO_NO_DEPENDENTS) - (InitializerContext* context) { - - directDBClient = createDirectClient(); - - return Status::OK(); - } mongo::mutex exitMutex("exit"); AtomicUInt numExitCalls = 0; diff --git a/src/mongo/db/lockstate.cpp b/src/mongo/db/lockstate.cpp index 30168ea8623..372a34c5641 100644 --- a/src/mongo/db/lockstate.cpp +++ b/src/mongo/db/lockstate.cpp @@ -151,7 +151,8 @@ namespace mongo { BSONObj LockState::reportState() { BSONObjBuilder b; - reportState( b ); + reportState(&b); + return b.obj(); } @@ -159,7 +160,7 @@ namespace mongo { thread. So be careful about thread safety here. For example reading this->otherName would not be safe as-is! */ - void LockState::reportState(BSONObjBuilder& res) { + void LockState::reportState(BSONObjBuilder* res) { BSONObjBuilder b; if( _threadState ) { char buf[2]; @@ -184,9 +185,10 @@ namespace mongo { } } BSONObj o = b.obj(); - if( !o.isEmpty() ) - res.append("locks", o); - res.append( "waitingForLock" , _lockPending ); + if (!o.isEmpty()) { + res->append("locks", o); + } + res->append("waitingForLock", _lockPending); } void LockState::dump() const { diff --git a/src/mongo/db/lockstate.h b/src/mongo/db/lockstate.h index 8f17c793632..7f484c592a6 100644 --- a/src/mongo/db/lockstate.h +++ b/src/mongo/db/lockstate.h @@ -45,7 +45,7 @@ namespace mongo { void dump() const; BSONObj reportState(); - void reportState(BSONObjBuilder& b); + void reportState(BSONObjBuilder* b); unsigned recursiveCount() const { return _recursive; } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 2fa4275ca5d..bab1b555e51 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -39,7 +39,7 @@ namespace mongo { - + class Client; class CurOp; class ProgressMeter; @@ -95,9 +95,10 @@ namespace mongo { */ virtual const char * getNS() const = 0; - // - // CurOp - // + /** + * Returns the client under which this context runs. + */ + virtual Client* getClient() const = 0; /** * Returns CurOp. Caller does not own pointer diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index 44b613be389..4af1def603c 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -42,6 +42,12 @@ namespace mongo { OperationContextImpl::OperationContextImpl() { invariant( globalStorageEngine ); _recovery.reset(globalStorageEngine->newRecoveryUnit(this)); + + getGlobalEnvironment()->registerOperationContext(this); + } + + OperationContextImpl::~OperationContextImpl() { + getGlobalEnvironment()->unregisterOperationContext(this); } RecoveryUnit* OperationContextImpl::recoveryUnit() const { @@ -64,6 +70,10 @@ namespace mongo { return getCurOp()->getNS(); } + Client* OperationContextImpl::getClient() const { + return &cc(); + } + CurOp* OperationContextImpl::getCurOp() const { return cc().curop(); } diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index d4dd56867c5..3acadbca4a5 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -39,7 +39,7 @@ namespace mongo { public: OperationContextImpl(); - virtual ~OperationContextImpl() { } + virtual ~OperationContextImpl(); virtual RecoveryUnit* recoveryUnit() const; @@ -52,6 +52,8 @@ namespace mongo { virtual const char * getNS() const; + virtual Client* getClient() const; + virtual CurOp* getCurOp() const; virtual void checkForInterrupt(bool heedMutex = true) const; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 26c497e6fce..527348b1537 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -47,7 +47,12 @@ namespace mongo { virtual ~OperationContextNoop() { } - CurOp* getCurOp() const { + virtual Client* getClient() const { + invariant(false); + return NULL; + } + + virtual CurOp* getCurOp() const { invariant(false); return NULL; } diff --git a/src/mongo/db/restapi.cpp b/src/mongo/db/restapi.cpp index c01dfeb90e0..47f9e0d1421 100644 --- a/src/mongo/db/restapi.cpp +++ b/src/mongo/db/restapi.cpp @@ -108,11 +108,11 @@ namespace mongo { if ( method == "GET" ) { responseCode = 200; - html = handleRESTQuery( db, fullns, action, params, responseCode, ss ); + html = handleRESTQuery(txn, fullns, action, params, responseCode, ss); } else if ( method == "POST" ) { responseCode = 201; - handlePost( db, fullns, MiniWebServer::body( rq ), params, responseCode, ss ); + handlePost(txn, fullns, MiniWebServer::body(rq), params, responseCode, ss); } else { responseCode = 400; @@ -129,7 +129,7 @@ namespace mongo { responseMsg = ss.str(); } - bool handleRESTQuery( DBDirectClient& db, + bool handleRESTQuery( OperationContext* txn, const std::string& ns, const std::string& action, BSONObj & params, @@ -170,6 +170,8 @@ namespace mongo { } BSONObj query = queryBuilder.obj(); + + DBDirectClient db(txn); auto_ptr<DBClientCursor> cursor = db.query( ns.c_str() , query, num , skip ); uassert( 13085 , "query failed for dbwebserver" , cursor.get() ); @@ -232,7 +234,7 @@ namespace mongo { } // TODO Generate id and revision per couch POST spec - void handlePost( DBDirectClient& db, + void handlePost( OperationContext* txn, const std::string& ns, const char *body, BSONObj& params, @@ -240,6 +242,8 @@ namespace mongo { stringstream & out ) { try { BSONObj obj = fromjson( body ); + + DBDirectClient db(txn); db.insert( ns.c_str(), obj ); } catch ( ... ) { @@ -259,7 +263,6 @@ namespace mongo { return atoi( e.valuestr() ); return def; } - } restHandler; bool RestAdminAccess::haveAdminUsers(OperationContext* txn) const { diff --git a/src/mongo/db/storage/heap1/heap1_test.cpp b/src/mongo/db/storage/heap1/heap1_test.cpp index e343d4b956d..b41b3122898 100644 --- a/src/mongo/db/storage/heap1/heap1_test.cpp +++ b/src/mongo/db/storage/heap1/heap1_test.cpp @@ -50,6 +50,11 @@ namespace { virtual ~MyOperationContext() { } + Client* getClient() const { + invariant(false); + return NULL; + } + CurOp* getCurOp() const { invariant(false); return NULL; diff --git a/src/mongo/dbtests/dbhelper_tests.cpp b/src/mongo/dbtests/dbhelper_tests.cpp index 06354880e42..e0aeb507e36 100644 --- a/src/mongo/dbtests/dbhelper_tests.cpp +++ b/src/mongo/dbtests/dbhelper_tests.cpp @@ -49,15 +49,17 @@ namespace mongo { _min( 4 ), _max( 8 ) { } + void run() { - DBDirectClient client; + OperationContextImpl txn; + DBDirectClient client(&txn); + for ( int i = 0; i < 10; ++i ) { client.insert( ns, BSON( "_id" << i ) ); } { // Remove _id range [_min, _max). - OperationContextImpl txn; Lock::DBWrite lk(txn.lockState(), ns); Client::Context ctx( ns ); @@ -69,7 +71,7 @@ namespace mongo { } // Check that the expected documents remain. - ASSERT_EQUALS( expected(), docs() ); + ASSERT_EQUALS( expected(), docs(&txn) ); } private: BSONArray expected() const { @@ -82,8 +84,9 @@ namespace mongo { } return bab.arr(); } - BSONArray docs() const { - DBDirectClient client; + + BSONArray docs(OperationContext* txn) const { + DBDirectClient client(txn); auto_ptr<DBClientCursor> cursor = client.query( ns, Query().hint( BSON( "_id" << 1 ) ) ); BSONArrayBuilder bab; diff --git a/src/mongo/dbtests/gle_test.cpp b/src/mongo/dbtests/gle_test.cpp index e804b167ed3..71b3642e3af 100644 --- a/src/mongo/dbtests/gle_test.cpp +++ b/src/mongo/dbtests/gle_test.cpp @@ -45,6 +45,7 @@ namespace { void run() { DBDirectClient client; client.insert(_ns, BSON( "test" << "test")); + // Cannot mix fsync + j, will make command fail string gleString = client.getLastError(true, true, 10, 10); ASSERT_NOT_EQUALS(gleString, ""); @@ -59,6 +60,7 @@ namespace { void run() { DBDirectClient client; client.insert(_ns, BSON( "test" << "test")); + // Make sure there was no error string gleString = client.getLastError(); ASSERT_EQUALS(gleString, ""); @@ -73,6 +75,7 @@ namespace { void run() { DBDirectClient client; client.insert(_ns, BSON( "_id" << 1)); + // Make sure there was no error string gleString = client.getLastError(); ASSERT_EQUALS(gleString, ""); diff --git a/src/mongo/dbtests/gridfstest.cpp b/src/mongo/dbtests/gridfstest.cpp index edabefa1d32..768db4cc307 100644 --- a/src/mongo/dbtests/gridfstest.cpp +++ b/src/mongo/dbtests/gridfstest.cpp @@ -42,7 +42,8 @@ namespace { public: virtual void run() { DBDirectClient client; - GridFS grid( client, "gridtest" ); + + GridFS grid(client, "gridtest"); grid.setChunkSize( 5 ); ASSERT_EQUALS( 5U, grid.getChunkSize() ); diff --git a/src/mongo/dbtests/indexupdatetests.cpp b/src/mongo/dbtests/indexupdatetests.cpp index f0cdf72e32e..3458181887a 100644 --- a/src/mongo/dbtests/indexupdatetests.cpp +++ b/src/mongo/dbtests/indexupdatetests.cpp @@ -43,9 +43,6 @@ namespace IndexUpdateTests { static const char* const _ns = "unittests.indexupdate"; -#if 0 - ExternalSortComparison* _aFirstSort = BtreeBasedBulkAccessMethod::getComparison(0, BSON("a" << 1)); -#endif /** * Test fixture for a write locked test using collection _ns. Includes functionality to @@ -55,9 +52,10 @@ namespace IndexUpdateTests { class IndexBuildBase { public: IndexBuildBase() : - _ctx(&_txn, _ns) { + _ctx(&_txn, _ns), + _client(&_txn) { + _client.createCollection( _ns ); - setGlobalEnvironment(new GlobalEnvironmentMongoD()); } ~IndexBuildBase() { _client.dropCollection( _ns ); @@ -93,9 +91,9 @@ namespace IndexUpdateTests { } #endif - DBDirectClient _client; OperationContextImpl _txn; Client::WriteContext _ctx; + DBDirectClient _client; }; /** addKeysToPhaseOne() adds keys from a collection's documents to an external sorter. */ diff --git a/src/mongo/dbtests/jstests.cpp b/src/mongo/dbtests/jstests.cpp index 136d590b09f..400aceffd40 100644 --- a/src/mongo/dbtests/jstests.cpp +++ b/src/mongo/dbtests/jstests.cpp @@ -904,6 +904,7 @@ namespace JSTests { verify(0); } + class Utf8Check { public: Utf8Check() { reset(); } @@ -930,6 +931,7 @@ namespace JSTests { client.dropCollection( ns() ); } static const char *ns() { return "unittest.jstests.utf8check"; } + DBDirectClient client; }; @@ -947,6 +949,7 @@ namespace JSTests { client.dropCollection( ns() ); } static const char *ns() { return "unittest.jstests.longutf8string"; } + DBDirectClient client; }; @@ -1077,6 +1080,7 @@ namespace JSTests { virtual string jsonOut() const { return json(); } + DBDirectClient client; }; @@ -1982,13 +1986,14 @@ namespace JSTests { class InvalidStoredJS { public: void run() { - DBDirectClient client; BSONObjBuilder query; query.append( "_id" , "invalidstoredjs1" ); BSONObjBuilder update; update.append( "_id" , "invalidstoredjs1" ); update.appendCode( "value" , "function () { db.test.find().forEach(function(obj) { continue; }); }" ); + + DBDirectClient client; client.update( "test.system.js" , query.obj() , update.obj() , true /* upsert */ ); scoped_ptr<Scope> s( globalScriptEngine->newScope() ); diff --git a/src/mongo/dbtests/perftests.cpp b/src/mongo/dbtests/perftests.cpp index 672e9b99a62..74eb31e8f68 100644 --- a/src/mongo/dbtests/perftests.cpp +++ b/src/mongo/dbtests/perftests.cpp @@ -68,18 +68,14 @@ namespace PerfTests { const bool profiling = false; - typedef DBDirectClient DBClientType; - //typedef DBClientConnection DBClientType; - class ClientBase { public: // NOTE: Not bothering to backup the old error record. ClientBase() { - //_client.connect("localhost"); mongo::lastError.reset( new LastError() ); } virtual ~ClientBase() { - //mongo::lastError.release(); + } protected: void insert( const char *ns, BSONObj o ) { @@ -91,9 +87,11 @@ namespace PerfTests { bool error() { return !_client.getPrevError().getField( "err" ).isNull(); } + DBClientBase* client() { return &_client; } + private: - DBClientType _client; + DBDirectClient _client; }; /* if you want recording of the timings, place the password for the perf database @@ -358,7 +356,7 @@ namespace PerfTests { static int z; srand( ++z ^ (unsigned) time(0)); #endif - DBClientType c; + DBDirectClient c; Client::initThreadIfNotAlready("perftestthr"); const unsigned int Batch = batchSize(); while( 1 ) { diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp index 272e3ad822e..3414475baf9 100644 --- a/src/mongo/dbtests/query_stage_fetch.cpp +++ b/src/mongo/dbtests/query_stage_fetch.cpp @@ -78,6 +78,7 @@ namespace QueryStageFetch { DBDirectClient _client; }; + // // Test that a WSM with an obj is passed through verbatim. // diff --git a/src/mongo/dbtests/query_stage_keep.cpp b/src/mongo/dbtests/query_stage_keep.cpp index 5959ee17bee..3be756ae2a5 100644 --- a/src/mongo/dbtests/query_stage_keep.cpp +++ b/src/mongo/dbtests/query_stage_keep.cpp @@ -93,6 +93,7 @@ namespace QueryStageKeep { DBDirectClient _client; }; + // Test that we actually merge flagged results. // diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 0ef81145036..ac70e48555f 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -190,10 +190,9 @@ namespace QueryTests { public: ClientBase() { mongo::lastError.reset( new LastError() ); - setGlobalEnvironment(new GlobalEnvironmentMongoD()); } ~ClientBase() { - //mongo::lastError.release(); + } protected: diff --git a/src/mongo/dbtests/runner_registry.cpp b/src/mongo/dbtests/runner_registry.cpp index 2964e14be00..b7c5981a27f 100644 --- a/src/mongo/dbtests/runner_registry.cpp +++ b/src/mongo/dbtests/runner_registry.cpp @@ -49,7 +49,9 @@ namespace RunnerRegistry { class RunnerRegistryBase { public: - RunnerRegistryBase() { + RunnerRegistryBase() + : _client(&_opCtx) + { _ctx.reset(new Client::WriteContext(&_opCtx, ns())); _client.dropCollection(ns()); @@ -94,11 +96,14 @@ namespace RunnerRegistry { } static const char* ns() { return "unittests.RunnerRegistryDiskLocInvalidation"; } - DBDirectClient _client; - auto_ptr<Client::WriteContext> _ctx; + + // Order of these is important for initialization OperationContextImpl _opCtx; + auto_ptr<Client::WriteContext> _ctx; + DBDirectClient _client; }; + // Test that a registered runner receives invalidation notifications. class RunnerRegistryDiskLocInvalid : public RunnerRegistryBase { public: diff --git a/src/mongo/dbtests/updatetests.cpp b/src/mongo/dbtests/updatetests.cpp index f486dd0ce3a..e048ebc2d9f 100644 --- a/src/mongo/dbtests/updatetests.cpp +++ b/src/mongo/dbtests/updatetests.cpp @@ -61,7 +61,9 @@ namespace UpdateTests { bool error() { return !client_.getPrevError().getField( "err" ).isNull(); } - DBDirectClient &client() { return client_; } + + DBDirectClient& client() { return client_; } + private: DBDirectClient client_; }; diff --git a/src/mongo/s/client_info.cpp b/src/mongo/s/client_info.cpp index adffade0e8e..900dc70e84a 100644 --- a/src/mongo/s/client_info.cpp +++ b/src/mongo/s/client_info.cpp @@ -132,10 +132,6 @@ namespace mongo { return _tlInfo.get(); } - bool ClientBasic::hasCurrent() { - return ClientInfo::exists(); - } - ClientBasic* ClientBasic::getCurrent() { return ClientInfo::get(); } diff --git a/src/mongo/scripting/engine.cpp b/src/mongo/scripting/engine.cpp index 5a7789be8b3..c0c6d30f577 100644 --- a/src/mongo/scripting/engine.cpp +++ b/src/mongo/scripting/engine.cpp @@ -27,7 +27,7 @@ * then also delete it in the license file. */ -#include "mongo/pch.h" +#include "mongo/platform/basic.h" #include "mongo/scripting/engine.h" @@ -44,7 +44,6 @@ namespace mongo { long long Scope::_lastVersion = 1; static const unsigned kMaxJsFileLength = std::numeric_limits<unsigned>::max() - 1; - DBClientBase* directDBClient; ScriptEngine::ScriptEngine() : _scopeInitCallback() { } @@ -192,6 +191,7 @@ namespace mongo { _loadedVersion = _lastVersion; string coll = _localDBName + ".system.js"; + DBClientBase* directDBClient = createDirectClient(); auto_ptr<DBClientCursor> c = directDBClient->query(coll, Query(), 0, 0, NULL, QueryOption_SlaveOk, 0); massert(16669, "unable to get db client cursor from query", c.get()); diff --git a/src/mongo/scripting/engine.h b/src/mongo/scripting/engine.h index 05061e82a19..1fb512a6a44 100644 --- a/src/mongo/scripting/engine.h +++ b/src/mongo/scripting/engine.h @@ -258,5 +258,4 @@ namespace mongo { const char* jsSkipWhiteSpace(const char* raw); extern ScriptEngine* globalScriptEngine; - extern DBClientBase* directDBClient; } |