diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2015-05-27 19:36:59 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2015-06-05 14:21:56 -0400 |
commit | 51c2064d518140fbeae62f9d7ba29f1d69fb530f (patch) | |
tree | b6d97eb91b4ac76279eb983234c6ec63ac44a8c6 | |
parent | 1cf11a282870c26ce7d963fb3a6c3329b39d90a2 (diff) | |
download | mongo-51c2064d518140fbeae62f9d7ba29f1d69fb530f.tar.gz |
SERVER-18277 Stronger locking rules for CurOp and OpDebug.
23 files changed, 304 insertions, 170 deletions
diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index 01dfb1037e4..67498ac19db 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -49,6 +49,7 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" #include "mongo/util/progress_meter.h" @@ -225,9 +226,11 @@ namespace mongo { Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsOut) { const char* curopMessage = _buildInBackground ? "Index Build (background)" : "Index Build"; - ProgressMeterHolder progress(*_txn->setMessage(curopMessage, - curopMessage, - _collection->numRecords(_txn))); + stdx::unique_lock<Client> lk(*_txn->getClient()); + ProgressMeterHolder progress(*_txn->setMessage_inlock(curopMessage, + curopMessage, + _collection->numRecords(_txn))); + lk.unlock(); Timer t; diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 371df966580..744a8eb88ca 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -202,7 +202,7 @@ namespace mongo { // Fill out curop information. int ntoreturn = lpq->getBatchSize().value_or(0); - beginQueryOp(nss, cmdObj, ntoreturn, lpq->getSkip(), CurOp::get(txn)); + beginQueryOp(txn, nss, cmdObj, ntoreturn, lpq->getSkip()); // 1b) Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. std::unique_ptr<CanonicalQuery> cq; @@ -263,8 +263,7 @@ namespace mongo { // there is no ClientCursor id, and then return. const int numResults = 0; const CursorId cursorId = 0; - endQueryOp(execHolder.get(), dbProfilingLevel, numResults, cursorId, - CurOp::get(txn)); + endQueryOp(txn, execHolder.get(), dbProfilingLevel, numResults, cursorId); appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &result); return true; } @@ -348,7 +347,7 @@ namespace mongo { } // Fill out curop based on the results. - endQueryOp(exec, dbProfilingLevel, numResults, cursorId, CurOp::get(txn)); + endQueryOp(txn, exec, dbProfilingLevel, numResults, cursorId); // 7) Generate the response object to send to the client. appendCursorResponseObject(cursorId, nss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/commands/get_last_error.cpp b/src/mongo/db/commands/get_last_error.cpp index be88e092683..ac80e1de823 100644 --- a/src/mongo/db/commands/get_last_error.cpp +++ b/src/mongo/db/commands/get_last_error.cpp @@ -251,7 +251,10 @@ namespace mongo { } txn->setWriteConcern(writeConcern); - txn->setMessage( "waiting for write concern" ); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + txn->setMessage_inlock( "waiting for write concern" ); + } WriteConcernResult wcResult; status = waitForWriteConcern( txn, lastOpTime, &wcResult ); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 5462e7abb93..07eda7d0d22 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -66,6 +66,7 @@ #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" #include "mongo/scripting/engine.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -615,9 +616,13 @@ namespace mongo { } else if ( _config.outputOptions.outType == Config::MERGE ) { // merge: upsert new docs into old collection - op->setMessage("m/r: merge post processing", - "M/R Merge Post Processing Progress", - _safeCount(_db, _config.tempNamespace, BSONObj())); + { + const auto count = _safeCount(_db, _config.tempNamespace, BSONObj()); + stdx::lock_guard<Client> lk(*txn->getClient()); + op->setMessage_inlock("m/r: merge post processing", + "M/R Merge Post Processing Progress", + count); + } auto_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace , BSONObj()); while (cursor->more()) { ScopedTransaction scopedXact(_txn, MODE_IX); @@ -635,9 +640,13 @@ namespace mongo { // reduce: apply reduce op on new result and existing one BSONList values; - op->setMessage("m/r: reduce post processing", - "M/R Reduce Post Processing Progress", - _safeCount(_db, _config.tempNamespace, BSONObj())); + { + const auto count = _safeCount(_db, _config.tempNamespace, BSONObj()); + stdx::lock_guard<Client> lk(*txn->getClient()); + op->setMessage_inlock("m/r: reduce post processing", + "M/R Reduce Post Processing Progress", + count); + } auto_ptr<DBClientCursor> cursor = _db.query( _config.tempNamespace , BSONObj() ); while ( cursor->more() ) { ScopedTransaction transaction(txn, MODE_X); @@ -1018,9 +1027,13 @@ namespace mongo { BSONObj prev; BSONList all; - verify(pm == op->setMessage("m/r: (3/3) final reduce to collection", - "M/R: (3/3) Final Reduce Progress", - _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk))); + { + const auto count = _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk); + stdx::lock_guard<Client> lk(*_txn->getClient()); + verify(pm == op->setMessage_inlock("m/r: (3/3) final reduce to collection", + "M/R: (3/3) Final Reduce Progress", + count)); + } const NamespaceString nss(_config.incLong); const WhereCallbackReal whereCallback(_txn, nss.db()); @@ -1361,10 +1374,12 @@ namespace mongo { progressTotal = 1; } - ProgressMeter& progress( op->setMessage("m/r: (1/3) emit phase", - "M/R: (1/3) Emit Progress", - progressTotal )); + stdx::unique_lock<Client> lk(*txn->getClient()); + ProgressMeter& progress( op->setMessage_inlock("m/r: (1/3) emit phase", + "M/R: (1/3) Emit Progress", + progressTotal )); progress.showTotal(showTotal); + lk.unlock(); ProgressMeterHolder pm(progress); // See cast on next line to 32 bit unsigned @@ -1494,8 +1509,11 @@ namespace mongo { timingBuilder.appendNumber( "mapTime" , mapTime / 1000 ); timingBuilder.append( "emitLoop" , t.millis() ); - op->setMessage("m/r: (2/3) final reduce in memory", - "M/R: (2/3) Final In-Memory Reduce Progress"); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + op->setMessage_inlock("m/r: (2/3) final reduce in memory", + "M/R: (2/3) Final In-Memory Reduce Progress"); + } Timer rt; // do reduce in memory // this will be the last reduce needed for inline mode @@ -1603,8 +1621,10 @@ namespace mongo { BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); - ProgressMeterHolder pm(op->setMessage("m/r: merge sort and reduce", - "M/R Merge Sort and Reduce Progress")); + stdx::unique_lock<Client> lk(*txn->getClient()); + ProgressMeterHolder pm(op->setMessage_inlock("m/r: merge sort and reduce", + "M/R Merge Sort and Reduce Progress")); + lk.unlock(); set<string> servers; { diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 64661939af6..0469027060b 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -75,6 +75,7 @@ #include "mongo/s/write_ops/batched_upsert_detail.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -250,8 +251,10 @@ namespace mongo { && writeErrors.size() < request.sizeWriteOps() ); if ( needToEnforceWC ) { - - CurOp::get(_txn)->setMessage( "waiting for write concern" ); + { + stdx::lock_guard<Client> lk(*_txn->getClient()); + CurOp::get(_txn)->setMessage_inlock( "waiting for write concern" ); + } WriteConcernResult res; Status status = waitForWriteConcern( @@ -375,17 +378,17 @@ namespace mongo { // Translates write item type to wire protocol op code. // Helper for WriteBatchExecutor::applyWriteItem(). - static int getOpCode( BatchedCommandRequest::BatchType writeType ) { - switch ( writeType ) { + static int getOpCode(const BatchItemRef& currWrite) { + switch (currWrite.getRequest()->getBatchType()) { case BatchedCommandRequest::BatchType_Insert: return dbInsert; case BatchedCommandRequest::BatchType_Update: return dbUpdate; - default: - dassert( writeType == BatchedCommandRequest::BatchType_Delete ); + case BatchedCommandRequest::BatchType_Delete: return dbDelete; + default: + MONGO_UNREACHABLE; } - return 0; } static void buildStaleError( const ChunkVersion& shardVersionRecvd, @@ -494,31 +497,31 @@ namespace mongo { // HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS // - static void beginCurrentOp( CurOp* currentOp, Client* client, const BatchItemRef& currWrite ) { - - // Execute the write item as a child operation of the current operation. - // This is not done by out callers + static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) { + stdx::lock_guard<Client> lk(*txn->getClient()); + CurOp* const currentOp = CurOp::get(txn); + currentOp->setOp_inlock(getOpCode(currWrite)); currentOp->ensureStarted(); - currentOp->setNS( currWrite.getRequest()->getNS() ); + currentOp->setNS_inlock( currWrite.getRequest()->getNS() ); currentOp->debug().ns = currentOp->getNS(); currentOp->debug().op = currentOp->getOp(); if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { - currentOp->setQuery( currWrite.getDocument() ); + currentOp->setQuery_inlock( currWrite.getDocument() ); currentOp->debug().query = currWrite.getDocument(); currentOp->debug().ninserted = 0; } else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { - currentOp->setQuery( currWrite.getUpdate()->getQuery() ); + currentOp->setQuery_inlock( currWrite.getUpdate()->getQuery() ); currentOp->debug().query = currWrite.getUpdate()->getQuery(); currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); // Note: debug().nMatched, nModified and nmoved are set internally in update } else { dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); - currentOp->setQuery( currWrite.getDelete()->getQuery() ); + currentOp->setQuery_inlock( currWrite.getDelete()->getQuery() ); currentOp->debug().query = currWrite.getDelete()->getQuery(); currentOp->debug().ndeleted = 0; } @@ -580,10 +583,9 @@ namespace mongo { } } - static void finishCurrentOp( OperationContext* txn, - CurOp* currentOp, - WriteErrorDetail* opError ) { + static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) { + CurOp* currentOp = CurOp::get(txn); currentOp->done(); int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); currentOp->debug().recordStats(); @@ -877,8 +879,7 @@ namespace mongo { // BEGIN CURRENT OP CurOp currentOp(_txn); - currentOp.setOp(dbUpdate); - beginCurrentOp( ¤tOp, _txn->getClient(), updateItem ); + beginCurrentOp(_txn, updateItem); incOpStats( updateItem ); ShardedConnectionInfo* info = ShardedConnectionInfo::get(false); @@ -904,7 +905,7 @@ namespace mongo { } // END CURRENT OP incWriteStats( updateItem, result.getStats(), result.getError(), ¤tOp ); - finishCurrentOp( _txn, ¤tOp, result.getError() ); + finishCurrentOp(_txn, result.getError()); // End current transaction and release snapshot. _txn->recoveryUnit()->abandonSnapshot(); @@ -922,8 +923,7 @@ namespace mongo { // BEGIN CURRENT OP CurOp currentOp(_txn); - currentOp.setOp(dbDelete); - beginCurrentOp( ¤tOp, _txn->getClient(), removeItem ); + beginCurrentOp(_txn, removeItem); incOpStats( removeItem ); ShardedConnectionInfo* info = ShardedConnectionInfo::get(false); @@ -946,7 +946,7 @@ namespace mongo { // END CURRENT OP incWriteStats( removeItem, result.getStats(), result.getError(), ¤tOp ); - finishCurrentOp( _txn, ¤tOp, result.getError() ); + finishCurrentOp(_txn, result.getError()); // End current transaction and release snapshot. _txn->recoveryUnit()->abandonSnapshot(); @@ -1112,8 +1112,7 @@ namespace mongo { void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) { BatchItemRef currInsertItem(state->request, state->currIndex); CurOp currentOp(_txn); - currentOp.setOp(dbInsert); - beginCurrentOp( ¤tOp, _txn->getClient(), currInsertItem ); + beginCurrentOp(_txn, currInsertItem); incOpStats(currInsertItem); WriteOpResult result; @@ -1123,7 +1122,7 @@ namespace mongo { result.getStats(), result.getError(), ¤tOp); - finishCurrentOp(_txn, ¤tOp, result.getError()); + finishCurrentOp(_txn, result.getError()); if (result.getError()) { *error = result.releaseError(); diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index c10488da705..a23011d6ae9 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -171,14 +171,14 @@ namespace mongo { _command = NULL; } - void CurOp::setOp(int op) { + void CurOp::setOp_inlock(int op) { _op = op; } - ProgressMeter& CurOp::setMessage(const char * msg, - std::string name, - unsigned long long progressMeterTotal, - int secondsBetween) { + ProgressMeter& CurOp::setMessage_inlock(const char * msg, + std::string name, + unsigned long long progressMeterTotal, + int secondsBetween) { if ( progressMeterTotal ) { if ( _progressMeter.isActive() ) { error() << "old _message: " << _message << " new message:" << msg; @@ -198,9 +198,8 @@ namespace mongo { invariant(this == _stack->pop()); } - void CurOp::setNS( StringData ns ) { - // _ns copies the data in the null-terminated ptr it's given - _ns = ns; + void CurOp::setNS_inlock(StringData ns) { + _ns = ns.toString(); } void CurOp::ensureStarted() { @@ -216,7 +215,7 @@ namespace mongo { } } - void CurOp::enter(const char* ns, int dbProfileLevel) { + void CurOp::enter_inlock(const char* ns, int dbProfileLevel) { ensureStarted(); _ns = ns; raiseDbProfileLevel(dbProfileLevel); @@ -227,9 +226,8 @@ namespace mongo { } void CurOp::recordGlobalTime(bool isWriteLocked, long long micros) const { - string nsStr = _ns.toString(); int lockType = isWriteLocked ? 1 : -1; - Top::get(getGlobalServiceContext()).record(nsStr, _op, lockType, micros, _isCommand); + Top::get(getGlobalServiceContext()).record(_ns, _op, lockType, micros, _isCommand); } void CurOp::reportState(BSONObjBuilder* builder) { @@ -239,11 +237,13 @@ namespace mongo { builder->append("microsecs_running", static_cast<long long int>(elapsedMicros()) ); } - builder->append( "op" , opToString( _op ) ); + builder->append("op", opToString(_op)); // Fill out "ns" from our namespace member (and if it's not available, fall back to the - // OpDebug namespace member). - builder->append("ns", !_ns.empty() ? _ns.toString() : _debug.ns.toString()); + // OpDebug namespace member). We prefer our ns when set because it changes to match each + // accessed namespace, while _debug.ns is set once at the start of the operation. However, + // sometimes _ns is not yet set. + builder->append("ns", !_ns.empty() ? _ns : _debug.ns); if (_op == dbInsert) { _query.append(*builder, "insert"); @@ -259,7 +259,7 @@ namespace mongo { if ( ! _message.empty() ) { if ( _progressMeter.isActive() ) { StringBuilder buf; - buf << _message.toString() << " " << _progressMeter.toString(); + buf << _message << " " << _progressMeter.toString(); builder->append( "msg" , buf.str() ); BSONObjBuilder sub( builder->subobjStart( "progress" ) ); sub.appendNumber( "done" , (long long)_progressMeter.done() ); @@ -267,7 +267,7 @@ namespace mongo { sub.done(); } else { - builder->append( "msg" , _message.toString() ); + builder->append("msg" , _message); } } diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 07f0c7a024b..19a307b2288 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -148,7 +148,7 @@ namespace mongo { // basic options int op; bool iscommand; - ThreadSafeString ns; + std::string ns; BSONObj query; BSONObj updateobj; @@ -189,31 +189,65 @@ namespace mongo { int responseLength; }; - /* Current operation (for the current Client). - an embedded member of Client class, and typically used from within the mutex there. - */ + /** + * Container for data used to report information about an OperationContext. + * + * Every OperationContext in a server with CurOp support has a stack of CurOp + * objects. The entry at the top of the stack is used to record timing and + * resource statistics for the executing operation or suboperation. + * + * All of the accessor methods on CurOp may be called by the thread executing + * the associated OperationContext at any time, or by other threads that have + * locked the context's owning Client object. + * + * The mutator methods on CurOp whose names end _inlock may only be called by the thread + * executing the associated OperationContext and Client, and only when that thread has also + * locked the Client object. All other mutators may only be called by the thread executing + * CurOp, but do not require holding the Client lock. The exception to this is the kill() + * method, which is self-synchronizing. + * + * The OpDebug member of a CurOp, accessed via the debug() accessor should *only* be accessed + * from the thread executing an operation, and as a result its fields may be accessed without + * any synchronization. + */ class CurOp { MONGO_DISALLOW_COPYING(CurOp); public: static CurOp* get(const OperationContext* opCtx); static CurOp* get(const OperationContext& opCtx); - explicit CurOp(OperationContext* client); + /** + * Constructs a nested CurOp at the top of the given "opCtx"'s CurOp stack. + */ + explicit CurOp(OperationContext* opCtx); ~CurOp(); bool haveQuery() const { return _query.have(); } BSONObj query() const { return _query.get(); } void appendQuery( BSONObjBuilder& b , StringData name ) const { _query.append( b , name ); } - void enter(const char* ns, int dbProfileLevel); + void enter_inlock(const char* ns, int dbProfileLevel); /** * Sets the type of the current operation to "op". */ - void setOp(int op); - void markCommand() { _isCommand = true; } + void setOp_inlock(int op); + + /** + * Marks the current operation as being a command. + */ + void markCommand_inlock() { _isCommand = true; } + + /** + * Returns a structure containing data used for profiling, accessed only by a thread + * currently executing the operation context associated with this CurOp. + */ OpDebug& debug() { return _debug; } - std::string getNS() const { return _ns.toString(); } + + /** + * Gets the name of the namespace on which the current operation operates. + */ + std::string getNS() const { return _ns; } bool shouldDBProfile( int ms ) const { if ( _dbprofile <= 0 ) @@ -225,9 +259,14 @@ namespace mongo { /** * Raises the profiling level for this operation to "dbProfileLevel" if it was previously * less than "dbProfileLevel". + * + * This belongs on OpDebug, and so does not have the _inlock suffix. */ void raiseDbProfileLevel(int dbProfileLevel); + /** + * Gets the type of the current operation. + */ int getOp() const { return _op; } // @@ -287,21 +326,43 @@ namespace mongo { } int elapsedSeconds() { return elapsedMillis() / 1000; } - void setQuery(const BSONObj& query) { _query.set( query ); } + void setQuery_inlock(const BSONObj& query) { _query.set( query ); } Command * getCommand() const { return _command; } - void setCommand(Command* command) { _command = command; } + void setCommand_inlock(Command* command) { _command = command; } + /** + * Appends information about this CurOp to "builder". + * + * If called from a thread other than the one executing the operation associated with this + * CurOp, it is necessary to lock the associated Client object before executing this method. + */ void reportState(BSONObjBuilder* builder); - ProgressMeter& setMessage(const char * msg, + /** + * Sets the message and the progress meter for this CurOp. + * + * While it is necessary to hold the lock while this method executes, the + * "hit" and "finished" methods of ProgressMeter may be called safely from + * the thread executing the operation without locking the Client. + */ + ProgressMeter& setMessage_inlock(const char * msg, std::string name = "Progress", unsigned long long progressMeterTotal = 0, int secondsBetween = 3); - std::string getMessage() const { return _message.toString(); } - ProgressMeter& getProgressMeter() { return _progressMeter; } + + /** + * Gets the message for this CurOp. + */ + const std::string& getMessage() const { return _message; } + const ProgressMeter& getProgressMeter() { return _progressMeter; } CurOp *parent() const { return _parent; } - void yielded() { _numYields++; } + void yielded() { _numYields++; } // Should be _inlock()? + + /** + * Returns the number of times yielded() was called. Callers on threads other + * than the one executing the operation must lock the client. + */ int numYields() const { return _numYields; } long long getExpectedLatencyMs() const { return _expectedLatencyMs; } @@ -314,7 +375,7 @@ namespace mongo { * generally the Context should set this up * but sometimes you want to do it ahead of time */ - void setNS( StringData ns ); + void setNS_inlock( StringData ns ); private: class CurOpStack; @@ -331,10 +392,10 @@ namespace mongo { int _op; bool _isCommand; int _dbprofile; // 0=off, 1=slow, 2=all - ThreadSafeString _ns; + std::string _ns; CachedBSONObj<512> _query; // CachedBSONObj is thread safe OpDebug _debug; - ThreadSafeString _message; + std::string _message; ProgressMeter _progressMeter; int _numYields; diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 5a78f2d7d90..a4c52b97cc6 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -31,6 +31,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/s/d_state.h" @@ -85,19 +86,21 @@ namespace mongo { void AutoGetCollectionForRead::_init(const std::string& ns, StringData coll) { massert(28535, "need a non-empty collection name", !coll.empty()); - // TODO: OldClientContext legacy, needs to be removed - CurOp::get(_txn)->ensureStarted(); - CurOp::get(_txn)->setNS(ns); - // We have both the DB and collection locked, which the prerequisite to do a stable shard // version check. ensureShardVersionOKOrThrow(ns); + auto curOp = CurOp::get(_txn); + stdx::lock_guard<Client> lk(*_txn->getClient()); + // TODO: OldClientContext legacy, needs to be removed + curOp->ensureStarted(); + curOp->setNS_inlock(ns); + // At this point, we are locked in shared mode for the database by the DB lock in the // constructor, so it is safe to load the DB pointer. if (_db.getDb()) { // TODO: OldClientContext legacy, needs to be removed - CurOp::get(_txn)->enter(ns.c_str(), _db.getDb()->getProfilingLevel()); + curOp->enter_inlock(ns.c_str(), _db.getDb()->getProfilingLevel()); _coll = _db.getDb()->getCollection(ns); } @@ -156,7 +159,8 @@ namespace mongo { _checkNotStale(); } - CurOp::get(_txn)->enter(_ns.c_str(), _db->getProfilingLevel()); + stdx::lock_guard<Client> lk(*_txn->getClient()); + CurOp::get(_txn)->enter_inlock(_ns.c_str(), _db->getProfilingLevel()); } void OldClientContext::_checkNotStale() const { diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 828bfa03215..3009e3d3699 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1000,7 +1000,10 @@ namespace mongo { // TODO: OldClientContext legacy, needs to be removed CurOp::get(txn)->ensureStarted(); - CurOp::get(txn)->setNS(dbname); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + CurOp::get(txn)->setNS_inlock(dbname); + } // We lock the entire database in S-mode in order to ensure that the contents will not // change for the stats snapshot. This might be unnecessary and if it becomes a @@ -1028,8 +1031,11 @@ namespace mongo { result.appendNumber("fileSize", 0); } else { - // TODO: OldClientContext legacy, needs to be removed - CurOp::get(txn)->enter(dbname.c_str(), db->getProfilingLevel()); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + // TODO: OldClientContext legacy, needs to be removed + CurOp::get(txn)->enter_inlock(dbname.c_str(), db->getProfilingLevel()); + } db->getStats(txn, &result, scale); } @@ -1299,8 +1305,6 @@ namespace { LOG(2) << "command: " << request.getCommandName(); } - - if (command->maintenanceMode()) { mmSetter.reset(new MaintenanceModeSetter); } diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index e04593cd27e..2e1490f1f4d 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -36,6 +36,7 @@ #include "mongo/base/error_codes.h" #include "mongo/base/status.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/jsobj.h" @@ -375,10 +376,12 @@ namespace mongo { std::unique_ptr<BulkBuilder::Sorter::Iterator> i(bulk->_sorter->done()); - ProgressMeterHolder pm(*txn->setMessage("Index Bulk Build: (2/3) btree bottom up", - "Index: (2/3) BTree Bottom Up Progress", - bulk->_keysInserted, - 10)); + stdx::unique_lock<Client> lk(*txn->getClient()); + ProgressMeterHolder pm(*txn->setMessage_inlock("Index Bulk Build: (2/3) btree bottom up", + "Index: (2/3) BTree Bottom Up Progress", + bulk->_keysInserted, + 10)); + lk.unlock(); std::unique_ptr<SortedDataBuilderInterface> builder; @@ -436,8 +439,11 @@ namespace mongo { pm.finished(); - CurOp::get(txn)->setMessage("Index Bulk Build: (3/3) btree-middle", - "Index: (3/3) BTree Middle Progress"); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + CurOp::get(txn)->setMessage_inlock("Index Bulk Build: (3/3) btree-middle", + "Index: (3/3) BTree Middle Progress"); + } LOG(timer.seconds() > 10 ? 0 : 1 ) << "\t done building bottom layer, going to commit"; diff --git a/src/mongo/db/index_builder.cpp b/src/mongo/db/index_builder.cpp index 6d205c45da9..e5939343265 100644 --- a/src/mongo/db/index_builder.cpp +++ b/src/mongo/db/index_builder.cpp @@ -87,7 +87,10 @@ namespace { AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); - CurOp::get(txn)->setOp(dbInsert); + { + stdx::lock_guard<Client> lk(*txn.getClient()); + CurOp::get(txn)->setOp_inlock(dbInsert); + } NamespaceString ns(_index["ns"].String()); ScopedTransaction transaction(&txn, MODE_IX); @@ -141,8 +144,11 @@ namespace { } } - // Show which index we're building in the curop display. - CurOp::get(txn)->setQuery(_index); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + // Show which index we're building in the curop display. + CurOp::get(txn)->setQuery_inlock(_index); + } bool haveSetBgIndexStarting = false; while (true) { diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index d11549258b4..1d14670a640 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -254,8 +254,11 @@ namespace { // Auth checking for Commands happens later. int nToReturn = queryMessage.ntoreturn; - beginQueryOp(nss, queryMessage.query, nToReturn, queryMessage.ntoskip, op); - op->markCommand(); + beginQueryOp(txn, nss, queryMessage.query, nToReturn, queryMessage.ntoskip); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + op->markCommand_inlock(); + } uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn << ") for $cmd type ns - can only be 1 or -1", @@ -301,8 +304,11 @@ namespace { // We construct a legacy $cmd namespace so we can fill in curOp using // the existing logic that existed for OP_QUERY commands NamespaceString nss(request.getDatabase(), "$cmd"); - beginQueryOp(nss, request.getCommandArgs(), 1, 0, curOp); - curOp->markCommand(); + beginQueryOp(txn, nss, request.getCommandArgs(), 1, 0); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + curOp->markCommand_inlock(); + } runCommands(txn, request, &replyBuilder); @@ -393,7 +399,7 @@ namespace { audit::logQueryAuthzCheck(client, nss, q.query, status.code()); uassertStatusOK(status); - dbResponse.exhaustNS = runQuery(txn, q, nss, op, *resp); + dbResponse.exhaustNS = runQuery(txn, q, nss, *resp); verify( !resp->empty() ); } catch (const AssertionException& exception) { @@ -504,7 +510,10 @@ namespace { } CurOp& currentOp = *CurOp::get(txn); - currentOp.setOp(op); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + currentOp.setOp_inlock(op); + } OpDebug& debug = currentOp.debug(); debug.op = op; @@ -700,7 +709,10 @@ namespace { uassertStatusOK(status); op.debug().query = query; - op.setQuery(query); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + op.setQuery_inlock(query); + } UpdateRequest request(nsString); request.setUpsert(upsert); @@ -823,7 +835,10 @@ namespace { uassertStatusOK(status); op.debug().query = pattern; - op.setQuery(pattern); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + op.setQuery_inlock(pattern); + } DeleteRequest request(nsString); request.setQuery(pattern); @@ -922,7 +937,6 @@ namespace { ns, ntoreturn, cursorid, - curop, pass, exhaust, &isCursorAuthorized); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 291fd3b6923..e7a05b65b20 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -119,11 +119,13 @@ namespace mongo { /** * Delegates to CurOp, but is included here to break dependencies. * Caller does not own the pointer. + * + * Caller must have locked the "Client" associated with this context. */ - virtual ProgressMeter* setMessage(const char* msg, - const std::string& name = "Progress", - unsigned long long progressMeterTotal = 0, - int secondsBetween = 3) = 0; + virtual ProgressMeter* setMessage_inlock(const char* msg, + const std::string& name = "Progress", + unsigned long long progressMeterTotal = 0, + int secondsBetween = 3) = 0; /** * Delegates to CurOp, but is included here to break dependencies. diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index 3f048b598d2..44dfbf7dc84 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -115,11 +115,11 @@ namespace { return oldState; } - ProgressMeter* OperationContextImpl::setMessage(const char * msg, - const std::string &name, - unsigned long long progressMeterTotal, - int secondsBetween) { - return &CurOp::get(this)->setMessage(msg, name, progressMeterTotal, secondsBetween); + ProgressMeter* OperationContextImpl::setMessage_inlock(const char * msg, + const std::string &name, + unsigned long long progressMeterTotal, + int secondsBetween) { + return &CurOp::get(this)->setMessage_inlock(msg, name, progressMeterTotal, secondsBetween); } string OperationContextImpl::getNS() const { diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index e2065a1ba84..39fbd95e43f 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -47,10 +47,10 @@ namespace mongo { virtual RecoveryUnitState setRecoveryUnit(RecoveryUnit* unit, RecoveryUnitState state) override; - virtual ProgressMeter* setMessage(const char* msg, - const std::string& name, - unsigned long long progressMeterTotal, - int secondsBetween) override; + virtual ProgressMeter* setMessage_inlock(const char* msg, + const std::string& name, + unsigned long long progressMeterTotal, + int secondsBetween) override; virtual std::string getNS() const override; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index cd73c42dee0..0ee7c2001ca 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -81,10 +81,10 @@ namespace mongo { return oldState; } - virtual ProgressMeter* setMessage(const char * msg, - const std::string &name, - unsigned long long progressMeterTotal, - int secondsBetween) override { + virtual ProgressMeter* setMessage_inlock(const char * msg, + const std::string &name, + unsigned long long progressMeterTotal, + int secondsBetween) override { return &_pm; } diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index eb2b62f8d4a..274a4761b3e 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -181,25 +181,27 @@ namespace mongo { return !exec->isEOF(); } - void beginQueryOp(const NamespaceString& nss, + void beginQueryOp(OperationContext* txn, + const NamespaceString& nss, const BSONObj& queryObj, int ntoreturn, - int ntoskip, - CurOp* curop) { + int ntoskip) { + auto curop = CurOp::get(txn); curop->debug().ns = nss.ns(); curop->debug().query = queryObj; curop->debug().ntoreturn = ntoreturn; curop->debug().ntoskip = ntoskip; - curop->setQuery(queryObj); + stdx::lock_guard<Client> lk(*txn->getClient()); + curop->setQuery_inlock(queryObj); } - void endQueryOp(PlanExecutor* exec, + void endQueryOp(OperationContext* txn, + PlanExecutor* exec, int dbProfilingLevel, int numResults, - CursorId cursorId, - CurOp* curop) { + CursorId cursorId) { + auto curop = CurOp::get(txn); invariant(exec); - invariant(curop); // Fill out basic curop query exec properties. curop->debug().nreturned = numResults; @@ -253,11 +255,12 @@ namespace mongo { const char* ns, int ntoreturn, long long cursorid, - CurOp& curop, int pass, bool& exhaust, bool* isCursorAuthorized) { + CurOp& curop = *CurOp::get(txn); + // For testing, we may want to fail if we receive a getmore. if (MONGO_FAIL_POINT(failReceivedGetmore)) { invariant(0); @@ -378,7 +381,10 @@ namespace mongo { // Ensure that the original query or command object is available in the slow query log, // profiler, and currentOp. curop.debug().query = cc->getQuery(); - curop.setQuery(cc->getQuery()); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + curop.setQuery_inlock(cc->getQuery()); + } if (0 == pass) { cc->updateSlaveLocation(txn); @@ -526,14 +532,14 @@ namespace mongo { std::string runQuery(OperationContext* txn, QueryMessage& q, const NamespaceString& nss, - CurOp& curop, Message &result) { + CurOp& curop = *CurOp::get(txn); // Validate the namespace. uassert(16256, str::stream() << "Invalid ns [" << nss.ns() << "]", nss.isValid()); invariant(!nss.isCommand()); // Set curop information. - beginQueryOp(nss, q.query, q.ntoreturn, q.ntoskip, &curop); + beginQueryOp(txn, nss, q.query, q.ntoreturn, q.ntoskip); // Parse the qm into a CanonicalQuery. std::auto_ptr<CanonicalQuery> cq; @@ -745,11 +751,11 @@ namespace mongo { // use by future getmore ops). cc->setLeftoverMaxTimeMicros(curop.getRemainingMaxTimeMicros()); - endQueryOp(cc->getExecutor(), dbProfilingLevel, numResults, ccId, &curop); + endQueryOp(txn, cc->getExecutor(), dbProfilingLevel, numResults, ccId); } else { LOG(5) << "Not caching executor but returning " << numResults << " results.\n"; - endQueryOp(exec.get(), dbProfilingLevel, numResults, ccId, &curop); + endQueryOp(txn, exec.get(), dbProfilingLevel, numResults, ccId); } // Add the results from the query into the output buffer. diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 650a6f0e4fd..81803b66364 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -38,7 +38,6 @@ namespace mongo { - class CurOp; class NamespaceString; class OperationContext; @@ -111,27 +110,27 @@ namespace mongo { bool isTailable); /** - * Fills out CurOp with information about this query. + * Fills out the CurOp for "txn" with information about this query. */ - void beginQueryOp(const NamespaceString& nss, + void beginQueryOp(OperationContext* txn, + const NamespaceString& nss, const BSONObj& queryObj, int ntoreturn, - int ntoskip, - CurOp* curop); + int ntoskip); /** - * Fills out CurOp with information regarding this query's execution. + * Fills out CurOp for "txn" with information regarding this query's execution. * * Uses explain functionality to extract stats from 'exec'. * * The database profiling level, 'dbProfilingLevel', is used to conditionalize whether or not we * do expensive stats gathering. */ - void endQueryOp(PlanExecutor* exec, + void endQueryOp(OperationContext* txn, + PlanExecutor* exec, int dbProfilingLevel, int numResults, - CursorId cursorId, - CurOp* curop); + CursorId cursorId); /** * Constructs a PlanExecutor for a query with the oplogReplay option set to true, @@ -155,7 +154,6 @@ namespace mongo { const char* ns, int ntoreturn, long long cursorid, - CurOp& curop, int pass, bool& exhaust, bool* isCursorAuthorized); @@ -166,7 +164,6 @@ namespace mongo { std::string runQuery(OperationContext* txn, QueryMessage& q, const NamespaceString& ns, - CurOp& curop, Message &result); } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp index 785d4f45b01..3d73f35b2e9 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp @@ -35,6 +35,7 @@ #include <boost/scoped_ptr.hpp> #include "mongo/db/catalog/collection.h" +#include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/mmap_v1/extent.h" #include "mongo/db/storage/mmap_v1/extent_manager.h" @@ -900,9 +901,11 @@ namespace mongo { } std::string progress_msg = "touch " + std::string(txn->getNS()) + " extents"; - ProgressMeterHolder pm(*txn->setMessage(progress_msg.c_str(), - "Touch Progress", - ranges.size())); + stdx::unique_lock<Client> lk(*txn->getClient()); + ProgressMeterHolder pm(*txn->setMessage_inlock(progress_msg.c_str(), + "Touch Progress", + ranges.size())); + lk.unlock(); for ( std::vector<touch_location>::iterator it = ranges.begin(); it != ranges.end(); ++it ) { touch_pages( it->root, it->length ); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp index 281a9ee1d76..10bbc79b7fd 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp @@ -36,6 +36,7 @@ #include "mongo/base/counter.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/storage/mmap_v1/extent.h" @@ -466,9 +467,11 @@ namespace mongo { wunit.commit(); } - ProgressMeterHolder pm(*txn->setMessage("compact extent", - "Extent Compacting Progress", - extents.size())); + stdx::unique_lock<Client> lk(*txn->getClient()); + ProgressMeterHolder pm(*txn->setMessage_inlock("compact extent", + "Extent Compacting Progress", + extents.size())); + lk.unlock(); // Go through all old extents and move each record to a new set of extents. int extentNumber = 0; diff --git a/src/mongo/db/storage/record_store_test_harness.h b/src/mongo/db/storage/record_store_test_harness.h index cecaea06c55..003e7c398a1 100644 --- a/src/mongo/db/storage/record_store_test_harness.h +++ b/src/mongo/db/storage/record_store_test_harness.h @@ -31,6 +31,7 @@ #pragma once #include "mongo/db/operation_context_noop.h" +#include "mongo/db/service_context_noop.h" namespace mongo { @@ -39,15 +40,19 @@ namespace mongo { class HarnessHelper { public: - HarnessHelper(){} + HarnessHelper() : _serviceContext(), _client(_serviceContext.makeClient("hh")) {} virtual ~HarnessHelper(){} virtual RecordStore* newNonCappedRecordStore() = 0; virtual RecoveryUnit* newRecoveryUnit() = 0; virtual OperationContext* newOperationContext() { - return new OperationContextNoop( newRecoveryUnit() ); + return new OperationContextNoop(_client.get(), 1, newRecoveryUnit()); } + + private: + ServiceContextNoop _serviceContext; + ServiceContext::UniqueClient _client; }; HarnessHelper* newHarnessHelper(); diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index d1da41ed617..ffe22a37d2b 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -1499,8 +1499,7 @@ namespace QueryTests { DbMessage dbMessage( message ); QueryMessage queryMessage( dbMessage ); Message result; - string exhaust = runQuery(&_txn, queryMessage, NamespaceString(ns()), *CurOp::get(_txn), - result); + string exhaust = runQuery(&_txn, queryMessage, NamespaceString(ns()), result); ASSERT( exhaust.size() ); ASSERT_EQUALS( string( ns() ), exhaust ); } diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 82b25e9eb13..24985dd4f81 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -191,10 +191,10 @@ namespace { const string s = str::stream() << "step " << step << " of " << _total; CurOp * op = CurOp::get(_txn); - if (op) - op->setMessage(s.c_str()); - else - warning() << "op is null in MoveTimingHelper::done" << migrateLog; + { + stdx::lock_guard<Client> lk(*_txn->getClient()); + op->setMessage_inlock(s.c_str()); + } _b.appendNumber(s, _t.millis()); _t.reset(); |