diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-05-28 13:49:34 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-06-02 22:52:46 -0400 |
commit | 8c9fcc939f9f1a2b593e606bd790cc87efd4064f (patch) | |
tree | beaa313f3e53cf72ca76aa5392946b97736ea6b3 /src/mongo/db | |
parent | 4add46aa8dd05a5c6d8af2c798eef6e9b5e4164b (diff) | |
download | mongo-8c9fcc939f9f1a2b593e606bd790cc87efd4064f.tar.gz |
SERVER-13961 Start using LockState from the OperationContext
Diffstat (limited to 'src/mongo/db')
55 files changed, 478 insertions, 412 deletions
diff --git a/src/mongo/db/auth/authz_manager_external_state.h b/src/mongo/db/auth/authz_manager_external_state.h index 397d7cb718f..1e73ae4974a 100644 --- a/src/mongo/db/auth/authz_manager_external_state.h +++ b/src/mongo/db/auth/authz_manager_external_state.h @@ -147,7 +147,8 @@ namespace mongo { * Puts into the *dbnames vector the name of every database in the cluster. * May take a global lock, so should only be called during startup. */ - virtual Status getAllDatabaseNames(std::vector<std::string>* dbnames) = 0; + virtual Status getAllDatabaseNames( + OperationContext* txn, std::vector<std::string>* dbnames) = 0; /** * Finds a document matching "query" in "collectionName", and store a shared-ownership diff --git a/src/mongo/db/auth/authz_manager_external_state_d.cpp b/src/mongo/db/auth/authz_manager_external_state_d.cpp index 0f80745769d..0878364f8a3 100644 --- a/src/mongo/db/auth/authz_manager_external_state_d.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_d.cpp @@ -98,8 +98,8 @@ namespace mongo { } Status AuthzManagerExternalStateMongod::getAllDatabaseNames( - std::vector<std::string>* dbnames) { - Lock::GlobalRead lk; + OperationContext* txn, std::vector<std::string>* dbnames) { + Lock::GlobalRead lk(txn->lockState()); getDatabaseNames(*dbnames); return Status::OK(); } diff --git a/src/mongo/db/auth/authz_manager_external_state_d.h b/src/mongo/db/auth/authz_manager_external_state_d.h index 213fcc56152..be24cba57cc 100644 --- a/src/mongo/db/auth/authz_manager_external_state_d.h +++ b/src/mongo/db/auth/authz_manager_external_state_d.h @@ -50,7 +50,8 @@ namespace mongo { AuthzManagerExternalStateMongod(); virtual ~AuthzManagerExternalStateMongod(); - virtual Status getAllDatabaseNames(std::vector<std::string>* dbnames); + virtual Status getAllDatabaseNames( + OperationContext* txn, std::vector<std::string>* dbnames); virtual Status findOne(OperationContext* txn, const NamespaceString& collectionName, diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index 809d4ecb747..95bf9f5b1c7 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -131,6 +131,7 @@ namespace { } Status AuthzManagerExternalStateMock::getAllDatabaseNames( + OperationContext* txn, std::vector<std::string>* dbnames) { unordered_set<std::string> dbnameSet; NamespaceDocumentMap::const_iterator it; diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.h b/src/mongo/db/auth/authz_manager_external_state_mock.h index 06db6b77890..fe3a37f6e0d 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.h +++ b/src/mongo/db/auth/authz_manager_external_state_mock.h @@ -58,7 +58,8 @@ namespace mongo { void setAuthorizationManager(AuthorizationManager* authzManager); void setAuthzVersion(int version); - virtual Status getAllDatabaseNames(std::vector<std::string>* dbnames); + virtual Status getAllDatabaseNames( + OperationContext* txn, std::vector<std::string>* dbnames); virtual Status findOne(OperationContext* txn, const NamespaceString& collectionName, diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp index 0cd3760258c..bf5430a0b8b 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp @@ -228,6 +228,7 @@ namespace mongo { } Status AuthzManagerExternalStateMongos::getAllDatabaseNames( + OperationContext* txn, std::vector<std::string>* dbnames) { try { scoped_ptr<ScopedDbConnection> conn( diff --git a/src/mongo/db/auth/authz_manager_external_state_s.h b/src/mongo/db/auth/authz_manager_external_state_s.h index c19e3ed056e..6cf39d7e3e3 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.h +++ b/src/mongo/db/auth/authz_manager_external_state_s.h @@ -63,7 +63,7 @@ namespace mongo { bool showBuiltinRoles, std::vector<BSONObj>* result); - virtual Status getAllDatabaseNames(std::vector<std::string>* dbnames); + virtual Status getAllDatabaseNames(OperationContext* txn, std::vector<std::string>* dbnames); /** * Implements findOne of the AuthzManagerExternalState interface diff --git a/src/mongo/db/catalog/database_holder.cpp b/src/mongo/db/catalog/database_holder.cpp index aeded6b196b..1978c395600 100644 --- a/src/mongo/db/catalog/database_holder.cpp +++ b/src/mongo/db/catalog/database_holder.cpp @@ -58,7 +58,7 @@ namespace mongo { // todo: protect against getting sprayed with requests for different db names that DNE - // that would make the DBs map very large. not clear what to do to handle though, // perhaps just log it, which is what we do here with the "> 40" : - bool cant = !Lock::isWriteLocked(ns); + bool cant = !txn->lockState()->isWriteLocked(ns); if( logger::globalLogDomain()->shouldLog(logger::LogSeverity::Debug(1)) || m.size() > 40 || cant || DEBUG_BUILD ) { log() << "opening db: " diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index f8a3662e5e6..8a16d7a4594 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -186,7 +186,9 @@ namespace mongo { IndexCatalogEntry* btreeState, bool mayInterrupt ) { - string ns = collection->ns().ns(); // our copy + const string ns = collection->ns().ns(); // our copy + verify(txn->lockState()->isWriteLocked(ns)); + const IndexDescriptor* idx = btreeState->descriptor(); const BSONObj& idxInfo = idx->infoObj(); @@ -196,7 +198,6 @@ namespace mongo { Timer t; - verify( Lock::isWriteLocked( ns ) ); // this is so that people know there are more keys to look at when doing // things like in place updates, etc... collection->infoCache()->addedIndex(); diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 97e5ba1c71e..4a28e280ad6 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -214,7 +214,7 @@ namespace mongo { else if( !Lock::nested() ) { _lk.reset(0); { - Lock::GlobalWrite w; + Lock::GlobalWrite w(txn->lockState()); Context c(ns, storageGlobalParams.dbpath, doVersion); } @@ -285,7 +285,9 @@ namespace mongo { _db = dbHolderUnchecked().getOrCreate(&txn, _ns, _path, _justCreated); verify(_db); if( _doVersion ) checkNotStale(); - massert( 16107 , str::stream() << "Don't have a lock on: " << _ns , Lock::atLeastReadLocked( _ns ) ); + massert(16107, + str::stream() << "Don't have a lock on: " << _ns, + txn.lockState()->isAtLeastReadLocked(_ns)); _client->_context = this; _client->_curOp->enter( this ); } diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index bf6d55cb3be..2dfba753a1a 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -104,7 +104,7 @@ namespace mongo { void operator()( DBClientCursorBatchIterator &i ) { // XXX: can probably take dblock instead - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); context.relocked(); bool createdCollection = false; @@ -221,7 +221,7 @@ namespace mongo { int options = QueryOption_NoCursorTimeout | ( slaveOk ? QueryOption_SlaveOk : 0 ); { - dbtemprelease r; + dbtemprelease r(txn->lockState()); _conn->query(stdx::function<void(DBClientCursorBatchIterator &)>(f), from_collection, query, 0, options); } @@ -389,7 +389,7 @@ namespace mongo { /* todo: we can put these releases inside dbclient or a dbclient specialization. or just wait until we get rid of global lock anyway. */ - dbtemprelease r; + dbtemprelease r(txn->lockState()); // just using exhaust for collection copying right now diff --git a/src/mongo/db/commands/apply_ops.cpp b/src/mongo/db/commands/apply_ops.cpp index 37c72610627..2a1fd60bba6 100644 --- a/src/mongo/db/commands/apply_ops.cpp +++ b/src/mongo/db/commands/apply_ops.cpp @@ -83,7 +83,7 @@ namespace mongo { // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple // ns used so locking individually requires more analysis - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); // Preconditions check reads the database state, so needs to be done locked if ( cmdObj["preCondition"].type() == Array ) { diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index ceaaebefdcf..990ce9f5719 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -200,7 +200,7 @@ namespace mongo { bool run(OperationContext* txn, const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { // calls renamecollection which does a global lock, so we must too: // - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); Client::Context ctx(dbname); Database* db = ctx.db(); diff --git a/src/mongo/db/commands/copydb.cpp b/src/mongo/db/commands/copydb.cpp index db0025f43b2..21dd7804d22 100644 --- a/src/mongo/db/commands/copydb.cpp +++ b/src/mongo/db/commands/copydb.cpp @@ -155,7 +155,7 @@ namespace mongo { // SERVER-4328 todo lock just the two db's not everything for the fromself case scoped_ptr<Lock::ScopedLock> lk( fromSelf ? - static_cast<Lock::ScopedLock*>(new Lock::GlobalWrite()) : + static_cast<Lock::ScopedLock*>(new Lock::GlobalWrite(txn->lockState())) : static_cast<Lock::ScopedLock*>(new Lock::DBWrite(txn->lockState(), todb))); Cloner cloner; @@ -166,7 +166,7 @@ namespace mongo { uassert( 13008, "must call copydbgetnonce first", authConn_.get() ); BSONObj ret; { - dbtemprelease t; + dbtemprelease t(txn->lockState()); if ( !authConn_->runCommand( cloneOptions.fromDB, BSON( "authenticate" << 1 << "user" << username << "nonce" << nonce << "key" << key ), ret ) ) { diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp index 428dfd2cf66..8849c821bd3 100644 --- a/src/mongo/db/commands/fsync.cpp +++ b/src/mongo/db/commands/fsync.cpp @@ -44,6 +44,7 @@ #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/client.h" #include "mongo/db/jsobj.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/util/background.h" namespace mongo { @@ -130,7 +131,8 @@ namespace mongo { else { // the simple fsync command case if (sync) { - Lock::GlobalWrite w; // can this be GlobalRead? and if it can, it should be nongreedy. + // can this be GlobalRead? and if it can, it should be nongreedy. + Lock::GlobalWrite w(txn->lockState()); getDur().commitNow(); } // question : is it ok this is not in the dblock? i think so but this is a change from past behavior, @@ -145,7 +147,10 @@ namespace mongo { void FSyncLockThread::doRealWork() { SimpleMutex::scoped_lock lkf(filesLockedFsync); - Lock::GlobalWrite global(true/*stopGreed*/); + + OperationContextImpl txn; // XXX? + Lock::GlobalWrite global(txn.lockState()); + SimpleMutex::scoped_lock lk(fsyncCmd.m); verify( ! fsyncCmd.locked ); // impossible to get here if locked is true diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index c0004b02e20..d8d4b3253c2 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -495,14 +495,16 @@ namespace mongo { * Does post processing on output collection. * This may involve replacing, merging or reducing. */ - long long State::postProcessCollection(CurOp* op, ProgressMeterHolder& pm) { + long long State::postProcessCollection( + OperationContext* txn, CurOp* op, ProgressMeterHolder& pm) { + if ( _onDisk == false || _config.outputOptions.outType == Config::INMEMORY ) return numInMemKeys(); if (_config.outputOptions.outNonAtomic) - return postProcessCollectionNonAtomic(op, pm); - Lock::GlobalWrite lock; // TODO(erh): this is how it was, but seems it doesn't need to be global - return postProcessCollectionNonAtomic(op, pm); + return postProcessCollectionNonAtomic(txn, op, pm); + Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): this is how it was, but seems it doesn't need to be global + return postProcessCollectionNonAtomic(txn, op, pm); } // @@ -530,14 +532,16 @@ namespace mongo { // End SERVER-6116 // - long long State::postProcessCollectionNonAtomic(CurOp* op, ProgressMeterHolder& pm) { + long long State::postProcessCollectionNonAtomic( + OperationContext* txn, CurOp* op, ProgressMeterHolder& pm) { if ( _config.outputOptions.finalNamespace == _config.tempNamespace ) return _safeCount( _db, _config.outputOptions.finalNamespace ); if (_config.outputOptions.outType == Config::REPLACE || _safeCount(_db, _config.outputOptions.finalNamespace) == 0) { - Lock::GlobalWrite lock; // TODO(erh): why global??? + + Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): why global??? // replace: just rename from temp to final collection name, dropping previous collection _db.dropCollection( _config.outputOptions.finalNamespace ); BSONObj info; @@ -577,7 +581,7 @@ namespace mongo { _safeCount(_db, _config.tempNamespace, BSONObj())); auto_ptr<DBClientCursor> cursor = _db.query( _config.tempNamespace , BSONObj() ); while ( cursor->more() ) { - Lock::GlobalWrite lock; // TODO(erh) why global? + Lock::GlobalWrite lock(txn->lockState()); // TODO(erh) why global? BSONObj temp = cursor->nextSafe(); BSONObj old; @@ -1383,7 +1387,7 @@ namespace mongo { timingBuilder.appendNumber("reduceTime", reduceTime / 1000); timingBuilder.append( "mode" , state.jsMode() ? "js" : "mixed" ); - long long finalCount = state.postProcessCollection(op, pm); + long long finalCount = state.postProcessCollection(txn, op, pm); state.appendResults( result ); timingBuilder.appendNumber( "total" , t.millis() ); @@ -1576,7 +1580,7 @@ namespace mongo { result.append( "chunkSizes" , chunkSizes.arr() ); - long long outputCount = state.postProcessCollection(op, pm); + long long outputCount = state.postProcessCollection(txn, op, pm); state.appendResults( result ); BSONObjBuilder countsB(32); diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index 04d3c6b70d7..06e1bcb66f4 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -296,8 +296,10 @@ namespace mongo { /** @return number objects in collection */ - long long postProcessCollection( CurOp* op , ProgressMeterHolder& pm ); - long long postProcessCollectionNonAtomic( CurOp* op , ProgressMeterHolder& pm ); + long long postProcessCollection( + OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); + long long postProcessCollectionNonAtomic( + OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); /** * if INMEMORY will append diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp index 92ce1ebcd14..5cf4006908e 100644 --- a/src/mongo/db/commands/oplog_note.cpp +++ b/src/mongo/db/commands/oplog_note.cpp @@ -73,7 +73,7 @@ namespace mongo { return appendCommandStatus(result, status); } - repl::logOpComment(dataElement.Obj()); + repl::logOpComment(txn, dataElement.Obj()); return true; } diff --git a/src/mongo/db/commands/rename_collection.cpp b/src/mongo/db/commands/rename_collection.cpp index 1b9e61339b9..bae48fcc17f 100644 --- a/src/mongo/db/commands/rename_collection.cpp +++ b/src/mongo/db/commands/rename_collection.cpp @@ -93,7 +93,7 @@ namespace mongo { } virtual bool run(OperationContext* txn, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); bool ok = wrappedRun(txn, dbname, cmdObj, errmsg, result, fromRepl); if (ok && !fromRepl) repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), cmdObj); diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp index f3ba8efe67c..9b1d8b7cd3e 100644 --- a/src/mongo/db/commands/test_commands.cpp +++ b/src/mongo/db/commands/test_commands.cpp @@ -108,11 +108,11 @@ namespace mongo { } if(cmdObj.getBoolField("w")) { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); sleepmillis(millis); } else { - Lock::GlobalRead lk; + Lock::GlobalRead lk(txn->lockState()); sleepmillis(millis); } diff --git a/src/mongo/db/d_concurrency.cpp b/src/mongo/db/d_concurrency.cpp index 15ab51528fe..77092d18d8c 100644 --- a/src/mongo/db/d_concurrency.cpp +++ b/src/mongo/db/d_concurrency.cpp @@ -28,15 +28,12 @@ * it in the license file. */ -#include "mongo/pch.h" - #include "mongo/db/d_concurrency.h" #include "mongo/db/client.h" #include "mongo/db/commands/server_status.h" #include "mongo/db/curop.h" #include "mongo/db/d_globals.h" -#include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/lockstat.h" #include "mongo/db/namespace_string.h" #include "mongo/server.h" @@ -52,18 +49,13 @@ // system.profile writing // oplog now // yielding -// commitIfNeeded namespace mongo { - inline LockState& lockState() { + inline LockState& lockStateTempOnly() { return cc().lockState(); } - char threadState() { - return lockState().threadState(); - } - class DBTryLockTimeoutException : public std::exception { public: DBTryLockTimeoutException() {} @@ -95,78 +87,57 @@ namespace mongo { } class WrapperForQLock { - QLock q; public: + QLock q; LockStat stats; - void lock_r() { - verify( threadState() == 0 ); - lockState().lockedStart( 'r' ); - q.lock_r(); - } - - void lock_w() { - verify( threadState() == 0 ); - getDur().commitIfNeeded(); - lockState().lockedStart( 'w' ); - q.lock_w(); - } - - void lock_R() { - LockState& ls = lockState(); - massert(16103, str::stream() << "can't lock_R, threadState=" << (int) ls.threadState(), ls.threadState() == 0); - ls.lockedStart( 'R' ); + void lock_R(LockState* lockState) { + massert(16103, + str::stream() << "can't lock_R, threadState=" + << (int)lockState->threadState(), + lockState->threadState() == 0); + lockState->lockedStart('R'); q.lock_R(); } - void lock_W() { - LockState& ls = lockState(); - if( ls.threadState() ) { - log() << "can't lock_W, threadState=" << (int) ls.threadState() << endl; + void lock_W(LockState* lockState) { + if (lockState->threadState()) { + log() << "can't lock_W, threadState=" << (int)lockState->threadState() << endl; fassert(16114,false); } - getDur().commitIfNeeded(); // check before locking - will use an R lock for the commit if need to do one, which is better than W - ls.lockedStart( 'W' ); - { - q.lock_W(); - } + + lockState->lockedStart('W'); + q.lock_W(); } // how to count try's that fail is an interesting question. we should get rid of try(). - bool lock_R_try(int millis) { - verify( threadState() == 0 ); + bool lock_R_try(LockState* lockState, int millis) { + verify(lockState->threadState() == 0); bool got = q.lock_R_try(millis); - if( got ) - lockState().lockedStart( 'R' ); + if (got) { + lockState->lockedStart('R'); + } return got; } - bool lock_W_try(int millis) { - verify( threadState() == 0 ); + bool lock_W_try(LockState* lockState, int millis) { + verify(lockState->threadState() == 0); bool got = q.lock_W_try(millis); if( got ) { - lockState().lockedStart( 'W' ); + lockState->lockedStart('W'); } return got; } - void unlock_r() { - wassert( threadState() == 'r' ); - lockState().unlocked(); - q.unlock_r(); - } - - void unlock_w() { - wassert( threadState() == 'w' ); - lockState().unlocked(); - q.unlock_w(); + void unlock_R(LockState* lockState) { + wassert(lockState->threadState() == 'R'); + lockState->unlocked(); + q.unlock_R(); } - void unlock_R() { _unlock_R(); } - - void unlock_W() { - wassert( threadState() == 'W' ); - lockState().unlocked(); + void unlock_W(LockState* lockState) { + wassert(lockState->threadState() == 'W'); + lockState->unlocked(); q.unlock_W(); } @@ -175,13 +146,6 @@ namespace mongo { void R_to_W() { q.R_to_W(); } bool w_to_X() { return q.w_to_X(); } void X_to_w() { q.X_to_w(); } - - private: - void _unlock_R() { - wassert( threadState() == 'R' ); - lockState().unlocked(); - q.unlock_R(); - } }; static WrapperForQLock& qlk = *new WrapperForQLock(); @@ -190,50 +154,36 @@ namespace mongo { } int Lock::isLocked() { - return threadState(); - } - int Lock::isReadLocked() { - return threadState() == 'R' || threadState() == 'r'; + return lockStateTempOnly().threadState(); } int Lock::somethingWriteLocked() { - return threadState() == 'W' || threadState() == 'w'; + return lockStateTempOnly().threadState() == 'W' || lockStateTempOnly().threadState() == 'w'; } bool Lock::isRW() { - return threadState() == 'W' || threadState() == 'R'; + return lockStateTempOnly().threadState() == 'W' || lockStateTempOnly().threadState() == 'R'; } bool Lock::isW() { - return threadState() == 'W'; + return lockStateTempOnly().threadState() == 'W'; } bool Lock::isR() { - return threadState() == 'R'; + return lockStateTempOnly().threadState() == 'R'; } bool Lock::nested() { // note this doesn't tell us much actually, it tells us if we are nesting locks but // they could be the a global lock twice or a global and a specific or two specifics // (such as including local) - return lockState().recursiveCount() > 1; + return lockStateTempOnly().recursiveCount() > 1; } bool Lock::isWriteLocked(const StringData& ns) { - LockState &ls = lockState(); - if( ls.threadState() == 'W' ) - return true; - if( ls.threadState() != 'w' ) - return false; - return ls.isLocked( ns ); - } - bool Lock::atLeastReadLocked(const StringData& ns) - { - LockState &ls = lockState(); - if( ls.threadState() == 'R' || ls.threadState() == 'W' ) - return true; // global - if( ls.threadState() == 0 ) - return false; - return ls.isLocked( ns ); + return lockStateTempOnly().isWriteLocked(ns); + } + bool Lock::atLeastReadLocked(const StringData& ns) { + return lockStateTempOnly().isAtLeastReadLocked(ns); } void Lock::assertAtLeastReadLocked(const StringData& ns) { if( !atLeastReadLocked(ns) ) { - LockState &ls = lockState(); + LockState &ls = lockStateTempOnly(); log() << "error expected " << ns << " to be locked " << endl; ls.dump(); msgasserted(16104, str::stream() << "expected to be read locked for " << ns); @@ -241,17 +191,18 @@ namespace mongo { } void Lock::assertWriteLocked(const StringData& ns) { if( !Lock::isWriteLocked(ns) ) { - lockState().dump(); + lockStateTempOnly().dump(); msgasserted(16105, str::stream() << "expected to be write locked for " << ns); } } RWLockRecursive &Lock::ParallelBatchWriterMode::_batchLock = *(new RWLockRecursive("special")); void Lock::ParallelBatchWriterMode::iAmABatchParticipant() { - lockState()._batchWriter = true; + lockStateTempOnly()._batchWriter = true; } - Lock::ScopedLock::ParallelBatchWriterSupport::ParallelBatchWriterSupport() { + Lock::ScopedLock::ParallelBatchWriterSupport::ParallelBatchWriterSupport(LockState* lockState) + : _lockState(lockState) { relock(); } @@ -260,23 +211,22 @@ namespace mongo { } void Lock::ScopedLock::ParallelBatchWriterSupport::relock() { - LockState& ls = lockState(); - if ( ! ls._batchWriter ) { - AcquiringParallelWriter a(ls); + if (!_lockState->_batchWriter) { + AcquiringParallelWriter a(*_lockState); _lk.reset( new RWLockRecursive::Shared(ParallelBatchWriterMode::_batchLock) ); } } - Lock::ScopedLock::ScopedLock( char type ) - : _type(type), _stat(0) { - LockState& ls = lockState(); - ls.enterScopedLock( this ); + Lock::ScopedLock::ScopedLock(LockState* lockState, char type) + : _lockState(lockState), _pbws_lk(lockState), _type(type), _stat(0) { + + _lockState->enterScopedLock(this); } + Lock::ScopedLock::~ScopedLock() { - LockState& ls = lockState(); - int prevCount = ls.recursiveCount(); - Lock::ScopedLock* what = ls.leaveScopedLock(); + int prevCount = _lockState->recursiveCount(); + Lock::ScopedLock* what = _lockState->leaveScopedLock(); fassert( 16171 , prevCount != 1 || what == this ); } @@ -318,18 +268,19 @@ namespace mongo { _relock(); } - Lock::TempRelease::TempRelease() : cant( Lock::nested() ) - { + Lock::TempRelease::TempRelease(LockState* lockState) + : cant(lockState->isNested()), _lockState(lockState) { + if( cant ) return; - LockState& ls = lockState(); - - fassert( 16116, ls.recursiveCount() == 1 ); - fassert( 16117, ls.threadState() != 0 ); + fassert(16116, _lockState->recursiveCount() == 1); + fassert(16117, _lockState->threadState() != 0); - scopedLk = ls.leaveScopedLock(); - fassert( 16118, scopedLk ); + scopedLk = _lockState->leaveScopedLock(); + invariant(_lockState == scopedLk->_lockState); + + fassert(16118, scopedLk); scopedLk->tempRelease(); } Lock::TempRelease::~TempRelease() @@ -337,42 +288,40 @@ namespace mongo { if( cant ) return; - LockState& ls = lockState(); + fassert(16119, scopedLk); + fassert(16120, _lockState->threadState() == 0); - fassert( 16119, scopedLk ); - fassert( 16120 , ls.threadState() == 0 ); - - ls.enterScopedLock( scopedLk ); + _lockState->enterScopedLock(scopedLk); scopedLk->relock(); } void Lock::GlobalWrite::_tempRelease() { fassert(16121, !noop); - char ts = threadState(); + char ts = _lockState->threadState(); fassert(16122, ts != 'R'); // indicates downgraded; not allowed with temprelease fassert(16123, ts == 'W'); - qlk.unlock_W(); + qlk.unlock_W(_lockState); } void Lock::GlobalWrite::_relock() { fassert(16125, !noop); - char ts = threadState(); + char ts = _lockState->threadState(); fassert(16126, ts == 0); - Acquiring a(this,lockState()); - qlk.lock_W(); + Acquiring a(this, *_lockState); + qlk.lock_W(_lockState); } void Lock::GlobalRead::_tempRelease() { fassert(16127, !noop); - char ts = threadState(); + char ts = _lockState->threadState(); fassert(16128, ts == 'R'); - qlk.unlock_R(); + qlk.unlock_R(_lockState); } void Lock::GlobalRead::_relock() { fassert(16129, !noop); - char ts = threadState(); + char ts = _lockState->threadState(); fassert(16130, ts == 0); - Acquiring a(this,lockState()); - qlk.lock_R(); + Acquiring a(this, *_lockState); + qlk.lock_R(_lockState); } void Lock::DBWrite::_tempRelease() { @@ -388,9 +337,10 @@ namespace mongo { lockDB(_what); } - Lock::GlobalWrite::GlobalWrite(bool sg, int timeoutms) - : ScopedLock('W') { - char ts = threadState(); + Lock::GlobalWrite::GlobalWrite(LockState* lockState, int timeoutms) + : ScopedLock(lockState, 'W') { + + char ts = _lockState->threadState(); noop = false; if( ts == 'W' ) { noop = true; @@ -398,14 +348,14 @@ namespace mongo { } dassert( ts == 0 ); - Acquiring a(this,lockState()); + Acquiring a(this, *_lockState); if ( timeoutms != -1 ) { - bool success = qlk.lock_W_try( timeoutms ); + bool success = qlk.lock_W_try(_lockState, timeoutms); if ( !success ) throw DBTryLockTimeoutException(); } else { - qlk.lock_W(); + qlk.lock_W(_lockState); } } Lock::GlobalWrite::~GlobalWrite() { @@ -413,80 +363,83 @@ namespace mongo { return; } recordTime(); // for lock stats - if( threadState() == 'R' ) { // we downgraded - qlk.unlock_R(); + if (_lockState->threadState() == 'R') { // we downgraded + qlk.unlock_R(_lockState); } else { - qlk.unlock_W(); + qlk.unlock_W(_lockState); } } void Lock::GlobalWrite::downgrade() { verify( !noop ); - verify( threadState() == 'W' ); + verify(_lockState->threadState() == 'W'); + qlk.W_to_R(); - lockState().changeLockState( 'R' ); + _lockState->changeLockState('R'); } // you will deadlock if 2 threads doing this void Lock::GlobalWrite::upgrade() { verify( !noop ); - verify( threadState() == 'R' ); + verify(_lockState->threadState() == 'R'); + qlk.R_to_W(); - lockState().changeLockState( 'W' ); + _lockState->changeLockState('W'); } - Lock::GlobalRead::GlobalRead( int timeoutms ) - : ScopedLock( 'R' ) { - LockState& ls = lockState(); - char ts = ls.threadState(); + Lock::GlobalRead::GlobalRead(LockState* lockState, int timeoutms) + : ScopedLock(lockState, 'R') { + + char ts = _lockState->threadState(); noop = false; if( ts == 'R' || ts == 'W' ) { noop = true; return; } - Acquiring a(this,ls); + Acquiring a(this, *_lockState); if ( timeoutms != -1 ) { - bool success = qlk.lock_R_try( timeoutms ); + bool success = qlk.lock_R_try(_lockState, timeoutms); if ( !success ) throw DBTryLockTimeoutException(); } else { - qlk.lock_R(); // we are unlocked in the qlock/top sense. lock_R will assert if we are in an in compatible state + // we are unlocked in the qlock/top sense. lock_R will assert if we are in an in compatible state + qlk.lock_R(_lockState); } } Lock::GlobalRead::~GlobalRead() { if( !noop ) { recordTime(); // for lock stats - qlk.unlock_R(); + qlk.unlock_R(_lockState); } } void Lock::DBWrite::lockNestable(Nestable db) { _nested = true; - LockState& ls = lockState(); - if( ls.nestableCount() ) { - if( db != ls.whichNestable() ) { - error() << "can't lock local and admin db at the same time " << (int) db << ' ' << (int) ls.whichNestable() << endl; + + if (_lockState->nestableCount()) { + if( db != _lockState->whichNestable() ) { + error() << "can't lock local and admin db at the same time " << (int) db << ' ' << (int) _lockState->whichNestable() << endl; fassert(16131,false); } - verify( ls.nestableCount() > 0 ); + verify( _lockState->nestableCount() > 0 ); } else { fassert(16132,_weLocked==0); - ls.lockedNestable(db, 1); + _lockState->lockedNestable(db, 1); _weLocked = nestableLocks[db]; _weLocked->lock(); } } void Lock::DBRead::lockNestable(Nestable db) { _nested = true; - LockState& ls = lockState(); - if( ls.nestableCount() ) { + + if (_lockState->nestableCount()) { // we are nested in our locking of local. previous lock could be read OR write lock on local. } else { - ls.lockedNestable(db,-1); + _lockState->lockedNestable(db, -1); fassert(16133,_weLocked==0); _weLocked = nestableLocks[db]; _weLocked->lock_shared(); @@ -495,36 +448,42 @@ namespace mongo { void Lock::DBWrite::lockOther(const StringData& db) { fassert(16252, !db.empty()); - LockState& ls = lockState(); // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work. - if (ls.otherCount()) { + if (_lockState->otherCount()) { // nested. if/when we do temprelease with DBWrite we will need to increment here // (so we can not release or assert if nested). - massert(16106, str::stream() << "internal error tried to lock two databases at the same time. old:" << ls.otherName() << " new:" << db, db == ls.otherName()); + massert(16106, + str::stream() << "internal error tried to lock two databases at the same " + << "time. old:" << _lockState->otherName() << " new:" << db, + db == _lockState->otherName()); return; } // first lock for this db. check consistent order with local db lock so we never deadlock. local always comes last - massert(16098, str::stream() << "can't dblock:" << db << " when local or admin is already locked", ls.nestableCount() == 0); + massert(16098, + str::stream() << "can't dblock:" << db + << " when local or admin is already locked", + _lockState->nestableCount() == 0); - if (db != ls.otherName()) { + if (db != _lockState->otherName()) { DBLocksMap::ref r(dblocks); WrapperForRWLock*& lock = r[db]; if (lock == NULL) { lock = new WrapperForRWLock(db); } - ls.lockedOther( db , 1 , lock ); + _lockState->lockedOther(db, 1, lock); } else { - DEV OCCASIONALLY { dassert( dblocks.get(db) == ls.otherLock() ); } - ls.lockedOther(1); + DEV OCCASIONALLY{ dassert(dblocks.get(db) == _lockState->otherLock()); } + _lockState->lockedOther(1); } fassert(16134,_weLocked==0); - ls.otherLock()->lock(); - _weLocked = ls.otherLock(); + + _lockState->otherLock()->lock(); + _weLocked = _lockState->otherLock(); } static Lock::Nestable n(const StringData& db) { @@ -537,59 +496,57 @@ namespace mongo { void Lock::DBWrite::lockDB(const string& ns) { fassert( 16253, !ns.empty() ); - LockState& ls = lockState(); - - Acquiring a(this,ls); + + Acquiring a(this, *_lockState); _locked_W=false; _locked_w=false; _weLocked=0; - massert( 16186 , "can't get a DBWrite while having a read lock" , ! ls.hasAnyReadLock() ); - if( ls.isW() ) + massert(16186, "can't get a DBWrite while having a read lock", !_lockState->hasAnyReadLock()); + if (_lockState->isW()) return; StringData db = nsToDatabaseSubstring( ns ); Nestable nested = n(db); if( nested == admin ) { // we can't nestedly lock both admin and local as implemented. so lock_W. - qlk.lock_W(); + qlk.lock_W(_lockState); _locked_W = true; return; } if( !nested ) lockOther(db); - lockTop(ls); + lockTop(); if( nested ) lockNestable(nested); } void Lock::DBRead::lockDB(const string& ns) { fassert( 16254, !ns.empty() ); - LockState& ls = lockState(); - - Acquiring a(this,ls); + + Acquiring a(this, *_lockState); _locked_r=false; _weLocked=0; - if ( ls.isRW() ) + if (_lockState->isRW()) return; StringData db = nsToDatabaseSubstring(ns); Nestable nested = n(db); if( !nested ) lockOther(db); - lockTop(ls); + lockTop(); if( nested ) lockNestable(nested); } Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns) - : ScopedLock( 'w' ), _what(ns.toString()), _nested(false) { + : ScopedLock(lockState, 'w' ), _what(ns.toString()), _nested(false) { lockDB( _what ); } Lock::DBRead::DBRead(LockState* lockState, const StringData& ns) - : ScopedLock( 'r' ), _what(ns.toString()), _nested(false) { + : ScopedLock(lockState, 'r' ), _what(ns.toString()), _nested(false) { lockDB( _what ); } @@ -605,19 +562,21 @@ namespace mongo { recordTime(); // for lock stats if ( _nested ) - lockState().unlockedNestable(); + _lockState->unlockedNestable(); else - lockState().unlockedOther(); + _lockState->unlockedOther(); _weLocked->unlock(); } if( _locked_w ) { - qlk.unlock_w(); + wassert(_lockState->threadState() == 'w'); + _lockState->unlocked(); + qlk.q.unlock_w(); } if( _locked_W ) { - qlk.unlock_W(); + qlk.unlock_W(_lockState); } _weLocked = 0; @@ -628,135 +587,151 @@ namespace mongo { recordTime(); // for lock stats if( _nested ) - lockState().unlockedNestable(); + _lockState->unlockedNestable(); else - lockState().unlockedOther(); + _lockState->unlockedOther(); _weLocked->unlock_shared(); } if( _locked_r ) { - qlk.unlock_r(); + wassert(_lockState->threadState() == 'r'); + _lockState->unlocked(); + qlk.q.unlock_r(); } _weLocked = 0; _locked_r = false; } - void Lock::DBWrite::lockTop(LockState& ls) { - switch( ls.threadState() ) { + void Lock::DBWrite::lockTop() { + switch (_lockState->threadState()) { case 'w': break; default: verify(false); case 0 : - qlk.lock_w(); + verify(_lockState->threadState() == 0); + _lockState->lockedStart('w'); + qlk.q.lock_w(); _locked_w = true; } } - void Lock::DBRead::lockTop(LockState& ls) { - switch( ls.threadState() ) { + void Lock::DBRead::lockTop() { + switch (_lockState->threadState()) { case 'r': case 'w': break; default: verify(false); case 0 : - qlk.lock_r(); + verify(_lockState->threadState() == 0); + _lockState->lockedStart('r'); + qlk.q.lock_r(); _locked_r = true; } } void Lock::DBRead::lockOther(const StringData& db) { fassert( 16255, !db.empty() ); - LockState& ls = lockState(); // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work. - if( ls.otherCount() ) { + if( _lockState->otherCount() ) { // nested. prev could be read or write. if/when we do temprelease with DBRead/DBWrite we will need to increment/decrement here // (so we can not release or assert if nested). temprelease we should avoid if we can though, it's a bit of an anti-pattern. - massert(16099, str::stream() << "internal error tried to lock two databases at the same time. old:" << ls.otherName() << " new:" << db, db == ls.otherName() ); + massert(16099, + str::stream() << "internal error tried to lock two databases at the same time. old:" + << _lockState->otherName() << " new:" << db, + db == _lockState->otherName()); return; } // first lock for this db. check consistent order with local db lock so we never deadlock. local always comes last - massert(16100, str::stream() << "can't dblock:" << db << " when local or admin is already locked", ls.nestableCount() == 0); + massert(16100, + str::stream() << "can't dblock:" << db + << " when local or admin is already locked", + _lockState->nestableCount() == 0); - if (db != ls.otherName()) { + if (db != _lockState->otherName()) { DBLocksMap::ref r(dblocks); WrapperForRWLock*& lock = r[db]; if (lock == NULL) { lock = new WrapperForRWLock(db); } - ls.lockedOther( db , -1 , lock ); + _lockState->lockedOther(db, -1, lock); } else { - DEV OCCASIONALLY { dassert( dblocks.get(db) == ls.otherLock() ); } - ls.lockedOther(-1); + DEV OCCASIONALLY{ dassert(dblocks.get(db) == _lockState->otherLock()); } + _lockState->lockedOther(-1); } fassert(16135,_weLocked==0); - ls.otherLock()->lock_shared(); - _weLocked = ls.otherLock(); + _lockState->otherLock()->lock_shared(); + _weLocked = _lockState->otherLock(); } - Lock::UpgradeGlobalLockToExclusive::UpgradeGlobalLockToExclusive() { - fassert( 16187, lockState().threadState() == 'w' ); + Lock::UpgradeGlobalLockToExclusive::UpgradeGlobalLockToExclusive(LockState* lockState) + : _lockState(lockState) { + fassert( 16187, _lockState->threadState() == 'w' ); // We're about to temporarily drop w, so stop the lock time stopwatch - lockState().recordLockTime(); + _lockState->recordLockTime(); _gotUpgrade = qlk.w_to_X(); if ( _gotUpgrade ) { - lockState().changeLockState('W'); - lockState().resetLockTime(); + _lockState->changeLockState('W'); + _lockState->resetLockTime(); } } Lock::UpgradeGlobalLockToExclusive::~UpgradeGlobalLockToExclusive() { if ( _gotUpgrade ) { - fassert( 16188, lockState().threadState() == 'W' ); - lockState().recordLockTime(); + fassert(16188, _lockState->threadState() == 'W'); + _lockState->recordLockTime(); qlk.X_to_w(); - lockState().changeLockState('w'); + _lockState->changeLockState('w'); } else { - fassert( 16189, lockState().threadState() == 'w' ); + fassert(16189, _lockState->threadState() == 'w'); } // Start recording lock time again - lockState().resetLockTime(); + _lockState->resetLockTime(); } - writelocktry::writelocktry( int tryms ) : + writelocktry::writelocktry(LockState* lockState, int tryms) : _got( false ), _dbwlock( NULL ) { try { - _dbwlock.reset(new Lock::GlobalWrite( false, tryms )); + _dbwlock.reset(new Lock::GlobalWrite(lockState, tryms)); } catch ( DBTryLockTimeoutException & ) { return; } _got = true; } - writelocktry::~writelocktry() { + + writelocktry::~writelocktry() { + } // note: the 'already' concept here might be a bad idea as a temprelease wouldn't notice it is nested then - readlocktry::readlocktry( int tryms ) : + readlocktry::readlocktry(LockState* lockState, int tryms) : _got( false ), _dbrlock( NULL ) { try { - _dbrlock.reset(new Lock::GlobalRead( tryms )); + _dbrlock.reset(new Lock::GlobalRead(lockState, tryms)); } catch ( DBTryLockTimeoutException & ) { return; } _got = true; } - readlocktry::~readlocktry() { + + readlocktry::~readlocktry() { + } class GlobalLockServerStatusSection : public ServerStatusSection { diff --git a/src/mongo/db/d_concurrency.h b/src/mongo/db/d_concurrency.h index 8359f23614d..5fac04e3a13 100644 --- a/src/mongo/db/d_concurrency.h +++ b/src/mongo/db/d_concurrency.h @@ -49,7 +49,6 @@ namespace mongo { public: enum Nestable { notnestable=0, local, admin }; static int isLocked(); // true if *anything* is locked (by us) - static int isReadLocked(); // r or R static int somethingWriteLocked(); // w or W static bool isW(); // W static bool isR(); @@ -67,9 +66,12 @@ namespace mongo { // note: avoid TempRelease when possible. not a good thing. struct TempRelease { - TempRelease(); + TempRelease(LockState* lockState); ~TempRelease(); const bool cant; // true if couldn't because of recursive locking + + // Not owned + LockState* _lockState; ScopedLock *scopedLk; }; @@ -101,7 +103,7 @@ namespace mongo { void resetTime(); protected: - explicit ScopedLock( char type ); + explicit ScopedLock(LockState* lockState, char type ); private: friend struct TempRelease; @@ -118,12 +120,13 @@ namespace mongo { class ParallelBatchWriterSupport : boost::noncopyable { public: - ParallelBatchWriterSupport(); + ParallelBatchWriterSupport(LockState* lockState); private: void tempRelease(); void relock(); + LockState* _lockState; scoped_ptr<RWLockRecursive::Shared> _lk; friend class ScopedLock; }; @@ -147,11 +150,12 @@ namespace mongo { public: // stopGreed is removed and does NOT work // timeoutms is only for writelocktry -- deprecated -- do not use - GlobalWrite(bool stopGreed = false, int timeoutms = -1 ); + GlobalWrite(LockState* lockState, int timeoutms = -1); virtual ~GlobalWrite(); void downgrade(); // W -> R void upgrade(); // caution see notes }; + class GlobalRead : public ScopedLock { // recursive is ok public: bool noop; @@ -160,7 +164,7 @@ namespace mongo { void _relock(); public: // timeoutms is only for readlocktry -- deprecated -- do not use - GlobalRead( int timeoutms = -1 ); + GlobalRead(LockState* lockState, int timeoutms = -1); virtual ~GlobalRead(); }; @@ -174,7 +178,7 @@ namespace mongo { * 2) unlockDB */ - void lockTop(LockState&); + void lockTop(); void lockNestable(Nestable db); void lockOther(const StringData& db); void lockDB(const std::string& ns); @@ -198,7 +202,7 @@ namespace mongo { // lock this database for reading. do not shared_lock globally first, that is handledin herein. class DBRead : public ScopedLock { - void lockTop(LockState&); + void lockTop(); void lockNestable(Nestable db); void lockOther(const StringData& db); void lockDB(const std::string& ns); @@ -226,12 +230,13 @@ namespace mongo { */ class UpgradeGlobalLockToExclusive : private boost::noncopyable { public: - UpgradeGlobalLockToExclusive(); + UpgradeGlobalLockToExclusive(LockState* lockState); ~UpgradeGlobalLockToExclusive(); bool gotUpgrade() const { return _gotUpgrade; } private: + LockState* _lockState; bool _gotUpgrade; }; }; @@ -240,7 +245,7 @@ namespace mongo { bool _got; scoped_ptr<Lock::GlobalRead> _dbrlock; public: - readlocktry( int tryms ); + readlocktry(LockState* lockState, int tryms); ~readlocktry(); bool got() const { return _got; } }; @@ -249,7 +254,7 @@ namespace mongo { bool _got; scoped_ptr<Lock::GlobalWrite> _dbwlock; public: - writelocktry( int tryms ); + writelocktry(LockState* lockState, int tryms); ~writelocktry(); bool got() const { return _got; } }; diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index c4fb3221ff5..bf1faf228fc 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -239,7 +239,7 @@ namespace mongo { }; - void logStartup() { + static void logStartup() { BSONObjBuilder toLog; stringstream id; id << getHostNameCached() << "-" << jsTime(); @@ -259,14 +259,17 @@ namespace mongo { BSONObj o = toLog.obj(); - Lock::GlobalWrite lk; - DBDirectClient c; - const char* name = "local.startup_log"; + OperationContextImpl txn; + + Lock::GlobalWrite lk(txn.lockState()); + DBDirectClient c(&txn); + + static const char* name = "local.startup_log"; c.createCollection( name, 10 * 1024 * 1024, true ); c.insert( name, o); } - void listen(int port) { + static void listen(int port) { //testTheDb(); MessageServer::Options options; options.port = port; @@ -356,11 +359,12 @@ namespace mongo { // ran at startup. static void repairDatabasesAndCheckVersion(bool shouldClearNonLocalTmpCollections) { - // LastError * le = lastError.get( true ); LOG(1) << "enter repairDatabases (to check pdfile version #)" << endl; - Lock::GlobalWrite lk; OperationContextImpl txn; + + Lock::GlobalWrite lk(txn.lockState()); + vector< string > dbNames; getDatabaseNames( dbNames ); for ( vector< string >::iterator i = dbNames.begin(); i != dbNames.end(); ++i ) { @@ -368,7 +372,7 @@ namespace mongo { LOG(1) << "\t" << dbName << endl; Client::Context ctx( dbName ); - DataFile *p = ctx.db()->getExtentManager()->getFile( &txn, 0 ); + DataFile *p = ctx.db()->getExtentManager()->getFile(&txn, 0); DataFileHeader *h = p->getHeader(); if (repl::replSettings.usingReplSets()) { @@ -476,10 +480,13 @@ namespace mongo { * @returns the number of documents in local.system.replset or 0 if this was started with * --replset. */ - unsigned long long checkIfReplMissingFromCommandLine() { - Lock::GlobalWrite lk; // this is helpful for the query below to work as you can't open files when readlocked + static unsigned long long checkIfReplMissingFromCommandLine() { + OperationContextImpl txn; + + // This is helpful for the query below to work as you can't open files when readlocked + Lock::GlobalWrite lk(txn.lockState()); if (!repl::replSettings.usingReplSets()) { - DBDirectClient c; + DBDirectClient c(&txn); return c.count("local.system.replset"); } return 0; diff --git a/src/mongo/db/db.h b/src/mongo/db/db.h index 63b1e38bbfb..32cadc2ac1c 100644 --- a/src/mongo/db/db.h +++ b/src/mongo/db/db.h @@ -50,7 +50,7 @@ namespace mongo { struct dbtemprelease { Client::Context * _context; scoped_ptr<Lock::TempRelease> tr; - dbtemprelease() { + dbtemprelease(LockState* lockState) { const Client& c = cc(); _context = c.getContext(); verify( Lock::isLocked() ); @@ -60,7 +60,7 @@ namespace mongo { if ( _context ) { _context->unlocked(); } - tr.reset(new Lock::TempRelease); + tr.reset(new Lock::TempRelease(lockState)); verify( c.curop() ); c.curop()->yielded(); } diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index f28397752da..a9d1da2431e 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -140,7 +140,7 @@ namespace mongo { // regardless of whether they caught up, we'll shut down } - writelocktry wlt( 2 * 60 * 1000 ); + writelocktry wlt(txn->lockState(), 2 * 60 * 1000); uassert( 13455 , "dbexit timed out getting lock" , wlt.got() ); return shutdownHelper(); } @@ -207,7 +207,7 @@ namespace mongo { // this is suboptimal but syncDataAndTruncateJournal is called from dropDatabase, // and that may need a global lock. - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); Client::Context context(dbname); log() << "dropDatabase " << dbname << " starting" << endl; @@ -284,7 +284,7 @@ namespace mongo { // SERVER-4328 todo don't lock globally. currently syncDataAndTruncateJournal is being // called within, and that requires a global lock i believe. - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); Client::Context context( dbname ); log() << "repairDatabase " << dbname; @@ -683,7 +683,7 @@ namespace mongo { // path != storageGlobalParams.dbpath ?? set<string> allShortNames; { - Lock::GlobalRead lk; + Lock::GlobalRead lk(txn->lockState()); dbHolder().getAllShortNames( allShortNames ); } @@ -737,7 +737,7 @@ namespace mongo { } bool run(OperationContext* txn, const string& dbname , BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); Client::Context ctx(dbname); try { diff --git a/src/mongo/db/dbeval.cpp b/src/mongo/db/dbeval.cpp index 79207366e77..35a566a3a32 100644 --- a/src/mongo/db/dbeval.cpp +++ b/src/mongo/db/dbeval.cpp @@ -144,7 +144,7 @@ namespace mongo { return dbEval(dbname, cmdObj, result, errmsg); } - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); Client::Context ctx( dbname ); return dbEval(dbname, cmdObj, result, errmsg); diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index a138eac4f75..467217f0425 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -501,7 +501,7 @@ namespace mongo { if ( currentOp.shouldDBProfile( debug.executionTime ) ) { // performance profiling is on - if ( Lock::isReadLocked() ) { + if (txn->lockState()->hasAnyReadLock()) { LOG(1) << "note: not profiling because recursive read lock" << endl; } else if ( lockedForWriting() ) { @@ -1074,7 +1074,7 @@ namespace { while( 1 ) { // we may already be in a read lock from earlier in the call stack, so do read lock here // to be consistent with that. - readlocktry w(20000); + readlocktry w(&cc().lockState(), 20000); if( w.got() ) { log() << "shutdown: final commit..." << endl; getDur().commitNow(); @@ -1125,7 +1125,7 @@ namespace { } { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(&cc().lockState()); log() << "now exiting" << endl; dbexit( code ); } diff --git a/src/mongo/db/lockstate.cpp b/src/mongo/db/lockstate.cpp index bd8fc4adb1e..852e4b89a83 100644 --- a/src/mongo/db/lockstate.cpp +++ b/src/mongo/db/lockstate.cpp @@ -66,7 +66,7 @@ namespace mongo { return _threadState == 'w' || _threadState == 'W'; } - bool LockState::isLocked( const StringData& ns ) { + bool LockState::isLocked( const StringData& ns ) const { char db[MaxDatabaseNameLen]; nsToDatabase(ns, db); @@ -84,6 +84,26 @@ namespace mongo { return false; } + bool LockState::isWriteLocked(const StringData& ns) { + if (threadState() == 'W') + return true; + if (threadState() != 'w') + return false; + return isLocked(ns); + } + + bool LockState::isAtLeastReadLocked(const StringData& ns) const { + if (threadState() == 'R' || threadState() == 'W') + return true; // global + if (threadState() == 0) + return false; + return isLocked(ns); + } + + bool LockState::isNested() const { + return recursiveCount() > 1; + } + void LockState::lockedStart( char newState ) { _threadState = newState; } diff --git a/src/mongo/db/lockstate.h b/src/mongo/db/lockstate.h index 569d6d2104d..693ae520a18 100644 --- a/src/mongo/db/lockstate.h +++ b/src/mongo/db/lockstate.h @@ -59,7 +59,10 @@ namespace mongo { bool hasAnyReadLock() const; // explicitly rR bool hasAnyWriteLock() const; // wW - bool isLocked( const StringData& ns ); // rwRW + bool isLocked(const StringData& ns) const; // rwRW + bool isWriteLocked(const StringData& ns); + bool isAtLeastReadLocked(const StringData& ns) const; + bool isNested() const; /** pending means we are currently trying to get a lock */ bool hasLockPending() const { return _lockPending || _lockPendingParallelWriter; } diff --git a/src/mongo/db/pdfile.cpp b/src/mongo/db/pdfile.cpp index 182493e802a..ed7630b4ea9 100644 --- a/src/mongo/db/pdfile.cpp +++ b/src/mongo/db/pdfile.cpp @@ -138,9 +138,8 @@ namespace mongo { return Status::OK(); } - void dropAllDatabasesExceptLocal() { - Lock::GlobalWrite lk; - OperationContextImpl txn; + void dropAllDatabasesExceptLocal(OperationContext* txn) { + Lock::GlobalWrite lk(txn->lockState()); vector<string> n; getDatabaseNames(n); @@ -149,7 +148,7 @@ namespace mongo { for( vector<string>::iterator i = n.begin(); i != n.end(); i++ ) { if( *i != "local" ) { Client::Context ctx(*i); - dropDatabase(&txn, ctx.db()); + dropDatabase(txn, ctx.db()); } } } diff --git a/src/mongo/db/pdfile.h b/src/mongo/db/pdfile.h index 9544d999840..43e3336f23e 100644 --- a/src/mongo/db/pdfile.h +++ b/src/mongo/db/pdfile.h @@ -49,7 +49,7 @@ namespace mongo { void dropDatabase(OperationContext* txn, Database* db); - void dropAllDatabasesExceptLocal(); + void dropAllDatabasesExceptLocal(OperationContext* txn); Status userCreateNS( OperationContext* txn, Database* db, diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp index 34d295bfe52..9a23f9f559e 100644 --- a/src/mongo/db/repl/consensus.cpp +++ b/src/mongo/db/repl/consensus.cpp @@ -457,7 +457,7 @@ namespace repl { setElectionTime(getNextGlobalOptime()); - rs.assumePrimary(); + rs._assumePrimary(); } } } diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index cfb1bd780de..aaf11da5759 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -249,7 +249,7 @@ namespace repl { string myMinValid; try { - readlocktry lk(/*"local.replset.minvalid", */300); + readlocktry lk(txn->lockState(), /*"local.replset.minvalid", */300); if( lk.got() ) { BSONObj mv; if( Helpers::getSingleton(txn, "local.replset.minvalid", mv) ) { diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index d03bf04f756..794f010fea3 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -30,10 +30,12 @@ #include "mongo/db/repl/initial_sync.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replset_commands.h" #include "mongo/db/repl/rs.h" + namespace mongo { namespace repl { InitialSync::InitialSync(BackgroundSyncInterface *q) : @@ -52,7 +54,8 @@ namespace repl { } // create the initial oplog entry - syncApply(applyGTEObj); + OperationContextImpl txn; + syncApply(&txn, applyGTEObj); _logOpObjRS(applyGTEObj); return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply); diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 93681b32d28..5405307dd5c 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -331,7 +331,7 @@ namespace repl { void ReplSource::forceResync( OperationContext* txn, const char *requester ) { BSONObj info; { - dbtemprelease t; + dbtemprelease t(txn->lockState()); if (!oplogReader.connect(hostName, _me)) { msgassertedNoTrace( 14051 , "unable to connect to resync"); } @@ -445,7 +445,7 @@ namespace repl { OpTime lastTime; bool dbOk = false; { - dbtemprelease release; + dbtemprelease release(txn->lockState()); // We always log an operation after executing it (never before), so // a database list will always be valid as of an oplog entry generated @@ -512,7 +512,7 @@ namespace repl { bool failedUpdate = applyOperation_inlock( txn, db, op ); if (failedUpdate) { Sync sync(hostName); - if (sync.shouldRetry(op)) { + if (sync.shouldRetry(txn, op)) { uassert(15914, "Failure retrying initial sync update", !applyOperation_inlock(txn, db, op)); @@ -535,7 +535,7 @@ namespace repl { @param alreadyLocked caller already put us in write lock if true */ - void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked) { + void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked) { LOG(6) << "processing op: " << op << endl; if( op.getStringField("op")[0] == 'n' ) @@ -562,8 +562,6 @@ namespace repl { if ( !only.empty() && only != clientName ) return; - OperationContextImpl txn; // XXX? - if (replSettings.pretouch && !alreadyLocked/*doesn't make sense if in write lock already*/) { if (replSettings.pretouch > 1) { @@ -592,17 +590,17 @@ namespace repl { a += m; } // we do one too... - pretouchOperation(&txn, op); + pretouchOperation(txn, op); tp->join(); countdown = v.size(); } } else { - pretouchOperation(&txn, op); + pretouchOperation(txn, op); } } - scoped_ptr<Lock::GlobalWrite> lk( alreadyLocked ? 0 : new Lock::GlobalWrite() ); + scoped_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState())); if ( replAllDead ) { // hmmm why is this check here and not at top of this function? does it get set between top and here? @@ -610,7 +608,7 @@ namespace repl { throw SyncException(); } - if ( !handleDuplicateDbName( &txn, op, ns, clientName ) ) { + if (!handleDuplicateDbName(txn, op, ns, clientName)) { return; } @@ -625,7 +623,7 @@ namespace repl { // always apply admin command command // this is a bit hacky -- the semantics of replication/commands aren't well specified if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) { - applyOperation( &txn, ctx.db(), op ); + applyOperation(txn, ctx.db(), op); return; } @@ -647,14 +645,14 @@ namespace repl { save(); Client::Context ctx(ns); nClonedThisPass++; - resync(&txn, ctx.db()->name()); + resync(txn, ctx.db()->name()); addDbNextPass.erase(clientName); incompleteCloneDbs.erase( clientName ); } save(); } else { - applyOperation( &txn, ctx.db(), op ); + applyOperation(txn, ctx.db(), op); addDbNextPass.erase( clientName ); } } @@ -723,7 +721,7 @@ namespace repl { 0 ok, don't sleep 1 ok, sleep */ - int ReplSource::sync_pullOpLog(int& nApplied) { + int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { int okResultCode = 1; string ns = string("local.oplog.$") + sourceName(); LOG(2) << "repl: sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; @@ -757,7 +755,7 @@ namespace repl { } // obviously global isn't ideal, but non-repl set is old so // keeping it simple - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); save(); } @@ -794,7 +792,7 @@ namespace repl { b.append("ns", *i + '.'); b.append("op", "db"); BSONObj op = b.done(); - sync_pullOpLog_applyOperation(op, false); + _sync_pullOpLog_applyOperation(txn, op, false); } } @@ -809,7 +807,7 @@ namespace repl { log() << "repl: " << ns << " oplog is empty" << endl; } { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); save(); } return okResultCode; @@ -880,11 +878,11 @@ namespace repl { bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place. if ( moreInitialSyncsPending || !oplogReader.more() ) { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); // NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior. { - dbtemprelease t; + dbtemprelease t(txn->lockState()); if ( !moreInitialSyncsPending && oplogReader.more() ) { continue; } @@ -905,7 +903,7 @@ namespace repl { OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) { // periodically note our progress, in case we are doing a lot of work and crash - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); syncedTo = nextOpTime; // can't update local log ts since there are pending operations from our peer save(); @@ -919,7 +917,7 @@ namespace repl { int b = replApplyBatchSize.get(); bool justOne = b == 1; - scoped_ptr<Lock::GlobalWrite> lk( justOne ? 0 : new Lock::GlobalWrite() ); + scoped_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState())); while( 1 ) { BSONElement ts = op.getField("ts"); @@ -944,7 +942,7 @@ namespace repl { verify( justOne ); oplogReader.putBack( op ); _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if ( n > 0 ) { syncedTo = last; save(); @@ -955,7 +953,7 @@ namespace repl { return okResultCode; } - sync_pullOpLog_applyOperation(op, !justOne); + _sync_pullOpLog_applyOperation(txn, op, !justOne); n++; if( --b == 0 ) @@ -1006,7 +1004,8 @@ namespace repl { return -1; } - return sync_pullOpLog(nApplied); + OperationContextImpl txn; // XXX? + return _sync_pullOpLog(&txn, nApplied); } /* --------------------------------------------------------------*/ @@ -1025,8 +1024,9 @@ namespace repl { OperationContextImpl txn; { ReplInfo r("replMain load sources"); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn.lockState()); ReplSource::loadAll(&txn, sources); + replSettings.fastsync = false; // only need this param for initial reset } @@ -1089,13 +1089,13 @@ namespace repl { return sleepAdvice; } - void replMain() { + static void replMain() { ReplSource::SourceVector sources; while ( 1 ) { int s = 0; { - Lock::GlobalWrite lk; OperationContextImpl txn; + Lock::GlobalWrite lk(txn.lockState()); if ( replAllDead ) { // throttledForceResyncDead can throw if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( &txn, "auto" ) ) { @@ -1106,6 +1106,7 @@ namespace repl { verify( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this. syncing++; } + try { int nApplied = 0; s = _replMain(sources, nApplied); @@ -1122,8 +1123,10 @@ namespace repl { out() << "caught exception in _replMain" << endl; s = 4; } + { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); verify( syncing == 1 ); syncing--; } @@ -1157,14 +1160,15 @@ namespace repl { even when things are idle. */ { - writelocktry lk(1); + OperationContextImpl txn; + writelocktry lk(txn.lockState(), 1); if ( lk.got() ) { toSleep = 10; replLocalAuth(); try { - logKeepalive(); + logKeepalive(&txn); } catch(...) { log() << "caught exception in replMasterThread()" << endl; @@ -1178,12 +1182,13 @@ namespace repl { } } - void replSlaveThread() { + static void replSlaveThread() { sleepsecs(1); Client::initThread("replslave"); { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); replLocalAuth(); } @@ -1217,7 +1222,8 @@ namespace repl { return; { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); replLocalAuth(); } @@ -1249,7 +1255,8 @@ namespace repl { } OperationContextImpl txn; // XXX - Lock::GlobalRead lk; + Lock::GlobalRead lk(txn.lockState()); + for( unsigned i = a; i <= b; i++ ) { const BSONObj& op = v[i]; const char *which = "o"; diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 15445f68ede..f30f9f2bdd5 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -83,12 +83,12 @@ namespace repl { void resync(OperationContext* txn, const std::string& dbName); /** @param alreadyLocked caller already put us in write lock if true */ - void sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked); + void _sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked); /* pull some operations from the master's oplog, and apply them. calls sync_pullOpLog_applyOperation */ - int sync_pullOpLog(int& nApplied); + int _sync_pullOpLog(OperationContext* txn, int& nApplied); /* we only clone one database per pass, even if a lot need done. This helps us avoid overflowing the master's transaction log by doing too much work before going diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 1c40bbdfa6f..bf5cf5348b9 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -392,13 +392,11 @@ namespace repl { } void oldRepl() { _logOp = _logOpOld; } - void logKeepalive() { - OperationContextImpl txn; - _logOp(&txn, "n", "", 0, BSONObj(), 0, 0, false); + void logKeepalive(OperationContext* txn) { + _logOp(txn, "n", "", 0, BSONObj(), 0, 0, false); } - void logOpComment(const BSONObj& obj) { - OperationContextImpl txn; - _logOp(&txn, "n", "", 0, obj, 0, 0, false); + void logOpComment(OperationContext* txn, const BSONObj& obj) { + _logOp(txn, "n", "", 0, obj, 0, 0, false); } void logOpInitiate(OperationContext* txn, const BSONObj& obj) { _logOpRS(txn, "n", "", 0, obj, 0, 0, false); @@ -433,7 +431,8 @@ namespace repl { } void createOplog() { - Lock::GlobalWrite lk; + OperationContextImpl txn; + Lock::GlobalWrite lk(txn.lockState()); const char * ns = "local.oplog.$main"; @@ -442,7 +441,6 @@ namespace repl { ns = rsoplog; Client::Context ctx(ns); - OperationContextImpl txn; Collection* collection = ctx.db()->getCollection( &txn, ns ); if ( collection ) { @@ -460,7 +458,7 @@ namespace repl { if( rs ) return; - initOpTimeFromOplog(ns); + initOpTimeFromOplog(&txn, ns); return; } @@ -711,12 +709,12 @@ namespace repl { Status status = Command::getStatusFromCommandResult(ob.done()); switch (status.code()) { case ErrorCodes::BackgroundOperationInProgressForDatabase: { - dbtemprelease release; + dbtemprelease release(txn->lockState()); BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns)); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { - dbtemprelease release; + dbtemprelease release(txn->lockState());; Command* cmd = Command::findCommand(o.firstElement().fieldName()); invariant(cmd); BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nsToDatabase(ns), o)); @@ -761,8 +759,8 @@ namespace repl { } - void initOpTimeFromOplog(const std::string& oplogNS) { - DBDirectClient c; + void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) { + DBDirectClient c(txn); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 40803c0a007..6728811667e 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -78,13 +78,13 @@ namespace repl { bool fromMigrate = false); // Log an empty no-op operation to the local oplog - void logKeepalive(); + void logKeepalive(OperationContext* txn); /** puts obj in the oplog as a comment (a no-op). Just for diags. convention is { msg : "text", ... } */ - void logOpComment(const BSONObj& obj); + void logOpComment(OperationContext* txn, const BSONObj& obj); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. @@ -112,6 +112,6 @@ namespace repl { /** * Initializes the global OpTime with the value from the timestamp of the last oplog entry. */ - void initOpTimeFromOplog(const std::string& oplogNS); + void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index 5d7a37d8806..c5b00f59c20 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -109,7 +109,7 @@ namespace { } } - void ReplSetImpl::assumePrimary() { + void ReplSetImpl::_assumePrimary() { LOG(1) << "replSet assuming primary" << endl; verify(iAmPotentiallyHot()); @@ -119,9 +119,11 @@ namespace { // Lock here to prevent stepping down & becoming primary from getting interleaved LOG(1) << "replSet waiting for global write lock"; - Lock::GlobalWrite lk; - initOpTimeFromOplog("local.oplog.rs"); + OperationContextImpl txn; // XXX? + Lock::GlobalWrite lk(txn.lockState()); + + initOpTimeFromOplog(&txn, "local.oplog.rs"); // Generate new election unique id elect.setElectionId(OID::gen()); @@ -138,8 +140,10 @@ namespace { bool ReplSetImpl::setMaintenanceMode(const bool inc) { lock replLock(this); + // Lock here to prevent state from changing between checking the state and changing it - Lock::GlobalWrite writeLock; + LockState lockState; + Lock::GlobalWrite writeLock(&lockState); if (box.getState().primary()) { return false; @@ -191,7 +195,8 @@ namespace { void ReplSetImpl::relinquish() { { - Lock::GlobalWrite lk; // so we are synchronized with _logOp() + LockState lockState; + Lock::GlobalWrite writeLock(&lockState); // so we are synchronized with _logOp() LOG(2) << "replSet attempting to relinquish" << endl; if (box.getState().primary()) { diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index b5b6254826d..da6d2efdf3d 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -41,7 +41,9 @@ #include "mongo/util/concurrency/value.h" namespace mongo { + class Cloner; + class OperationContext; namespace repl { @@ -118,7 +120,7 @@ namespace repl { bool _stepDown(int secs); bool _freeze(int secs); private: - void assumePrimary(); + void _assumePrimary(); void loadLastOpTimeWritten(bool quiet=false); void changeState(MemberState s); @@ -288,7 +290,7 @@ namespace repl { void syncDoInitialSync(); void _syncThread(); void syncTail(); - unsigned _syncRollback(OplogReader& r); + unsigned _syncRollback(OperationContext* txn, OplogReader& r); void syncFixUp(FixUpInfo& h, OplogReader& r); // keep a list of hosts that we've tried recently that didn't work @@ -317,7 +319,7 @@ namespace repl { threadpool::ThreadPool& getWriterPool() { return _writerPool; } const ReplSetConfig::MemberCfg& myConfig() const { return _config; } - bool tryToGoLiveAsASecondary(OpTime&); // readlocks + bool tryToGoLiveAsASecondary(OperationContext* txn, OpTime&); // readlocks void syncRollback(OplogReader& r); void syncThread(); const OpTime lastOtherOpTime() const; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 5605d962987..b7df8ebe227 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -186,14 +186,14 @@ namespace repl { virtual bool run(OperationContext* txn, const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) { try { rwlock_try_write lk(mutex); - return _run(a,b,e,errmsg,c,d); + return _run(txn, a,b,e,errmsg,c,d); } catch(rwlock_try_write::exception&) { } errmsg = "a replSetReconfig is already in progress"; return false; } private: - bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + bool _run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( cmdObj["replSetReconfig"].type() != Object ) { errmsg = "no configuration specified"; return false; @@ -220,7 +220,7 @@ namespace repl { // later. of course it could be stuck then, but this check lowers the risk if weird things // are up - we probably don't want a change to apply 30 minutes after the initial attempt. time_t t = time(0); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if( time(0)-t > 20 ) { errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; return false; diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp index 8c71fa2748b..21f013318ea 100644 --- a/src/mongo/db/repl/replset_web_handler.cpp +++ b/src/mongo/db/repl/replset_web_handler.cpp @@ -51,9 +51,13 @@ namespace repl { } virtual void handle( OperationContext* txn, - const char *rq, const std::string& url, BSONObj params, - string& responseMsg, int& responseCode, - vector<string>& headers, const SockAddr &from ) { + const char *rq, + const std::string& url, + BSONObj params, + string& responseMsg, + int& responseCode, + vector<string>& headers, + const SockAddr &from ) { if( url == "/_replSetOplog" ) { responseMsg = _replSetOplog(params); diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp index 3bd6b268cdf..d1c693d4b92 100644 --- a/src/mongo/db/repl/resync.cpp +++ b/src/mongo/db/repl/resync.cpp @@ -67,7 +67,7 @@ namespace repl { bool fromRepl) { const std::string ns = parseNs(dbname, cmdObj); - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); Client::Context ctx(ns); if (replSettings.usingReplSets()) { if (!theReplSet) { @@ -83,7 +83,7 @@ namespace repl { // below this comment pertains only to master/slave replication if ( cmdObj.getBoolField( "force" ) ) { - if ( !waitForSyncToFinish( errmsg ) ) + if ( !waitForSyncToFinish(txn, errmsg ) ) return false; replAllDead = "resync forced"; } @@ -91,14 +91,15 @@ namespace repl { errmsg = "not dead, no need to resync"; return false; } - if ( !waitForSyncToFinish( errmsg ) ) + if ( !waitForSyncToFinish(txn, errmsg ) ) return false; ReplSource::forceResyncDead( txn, "client" ); result.append( "info", "triggered resync for all sources" ); return true; } - bool waitForSyncToFinish( string &errmsg ) const { + + bool waitForSyncToFinish(OperationContext* txn, string &errmsg) const { // Wait for slave thread to finish syncing, so sources will be be // reloaded with new saved state on next pass. Timer t; @@ -106,7 +107,7 @@ namespace repl { if ( syncing == 0 || t.millis() > 30000 ) break; { - Lock::TempRelease t; + Lock::TempRelease t(txn->lockState()); relinquishSyncingSome = 1; sleepmillis(1); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index be6c8aaf75a..5faea85e949 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -393,6 +393,8 @@ namespace repl { return; } + OperationContextImpl txn; + // written by applyToHead calls BSONObj minValid; @@ -408,7 +410,7 @@ namespace repl { theReplSet->setInitialSyncFlag(); sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(); + dropAllDatabasesExceptLocal(&txn); sethbmsg("initial sync clone all databases", 0); @@ -467,7 +469,6 @@ namespace repl { verify( !box.getState().primary() ); // wouldn't make sense if we were. { - OperationContextImpl txn; Client::WriteContext cx(&txn, "local."); cx.ctx().db()->flushFiles(true); diff --git a/src/mongo/db/repl/rs_initiate.cpp b/src/mongo/db/repl/rs_initiate.cpp index e50d2a80568..c0cd5042abd 100644 --- a/src/mongo/db/repl/rs_initiate.cpp +++ b/src/mongo/db/repl/rs_initiate.cpp @@ -197,7 +197,7 @@ namespace repl { // later. of course it could be stuck then, but this check lowers the risk if weird things // are up. time_t t = time(0); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if( time(0)-t > 10 ) { errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; return false; @@ -270,7 +270,7 @@ namespace repl { createOplog(); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); bo comment = BSON( "msg" << "initiating set"); newConfig->saveConfigLocally(comment); log() << "replSet replSetInitiate config now saved locally. " diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 9d07024e16a..4ecf5407c24 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -428,7 +428,7 @@ namespace repl { ctx.db()->dropCollection(&txn, ns); { string errmsg; - dbtemprelease release; + dbtemprelease release(txn.lockState()); bool ok = Cloner::copyCollectionFromRemote(&txn, them->getServerAddress(), ns, errmsg); uassert(15909, str::stream() << "replSet rollback error resyncing collection " @@ -662,8 +662,9 @@ namespace repl { void ReplSetImpl::syncRollback(OplogReader& oplogreader) { // check that we are at minvalid, otherwise we cannot rollback as we may be in an // inconsistent state + OperationContextImpl txn; + { - OperationContextImpl txn; Lock::DBRead lk(txn.lockState(), "local.replset.minvalid"); BSONObj mv; if (Helpers::getSingleton(&txn, "local.replset.minvalid", mv)) { @@ -678,18 +679,18 @@ namespace repl { } } - unsigned s = _syncRollback(oplogreader); + unsigned s = _syncRollback(&txn, oplogreader); if (s) sleepsecs(s); } - unsigned ReplSetImpl::_syncRollback(OplogReader& oplogreader) { + unsigned ReplSetImpl::_syncRollback(OperationContext* txn, OplogReader& oplogreader) { verify(!lockedByMe()); verify(!Lock::isLocked()); sethbmsg("rollback 0"); - writelocktry lk(20000); + writelocktry lk(txn->lockState(), 20000); if (!lk.got()) { sethbmsg("rollback couldn't get write lock in a reasonable time"); return 2; @@ -721,7 +722,7 @@ namespace repl { } catch (DBException& e) { sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); - dbtemprelease release; + dbtemprelease release(txn->lockState()); sleepsecs(60); throw; } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 2fd1aa5b0f4..ad9c8a5ebe7 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -72,7 +72,7 @@ namespace repl { readlocks @return true if transitioned to SECONDARY */ - bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool ReplSetImpl::tryToGoLiveAsASecondary(OperationContext* txn, OpTime& /*out*/ minvalid) { bool golive = false; lock rsLock( this ); @@ -87,7 +87,7 @@ namespace repl { return false; } - Lock::GlobalWrite writeLock; + Lock::GlobalWrite writeLock(txn->lockState()); // make sure we're not primary, secondary, rollback, or fatal already if (box.getState().primary() || box.getState().secondary() || diff --git a/src/mongo/db/repl/sync.cpp b/src/mongo/db/repl/sync.cpp index 8ca15ed9386..16e6225a1fb 100644 --- a/src/mongo/db/repl/sync.cpp +++ b/src/mongo/db/repl/sync.cpp @@ -36,7 +36,6 @@ #include "mongo/db/diskloc.h" #include "mongo/db/pdfile.h" #include "mongo/db/repl/oplogreader.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/catalog/collection.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -106,16 +105,17 @@ namespace repl { str::stream() << "Can no longer connect to initial sync source: " << hn); } - bool Sync::shouldRetry(const BSONObj& o) { + bool Sync::shouldRetry(OperationContext* txn, const BSONObj& o) { + invariant(txn->lockState()->hasAnyWriteLock()); + // should already have write lock const char *ns = o.getStringField("ns"); Client::Context ctx(ns); - OperationContextImpl txn; // we don't have the object yet, which is possible on initial sync. get it. log() << "replication info adding missing object" << endl; // rare enough we can log - BSONObj missingObj = getMissingDoc(&txn, ctx.db(), o); + BSONObj missingObj = getMissingDoc(txn, ctx.db(), o); if( missingObj.isEmpty() ) { log() << "replication missing object not found on source. presumably deleted later in oplog" << endl; @@ -125,9 +125,10 @@ namespace repl { return false; } else { - Collection* collection = ctx.db()->getOrCreateCollection( &txn, ns ); - verify( collection ); // should never happen - StatusWith<DiskLoc> result = collection->insertDocument( &txn, missingObj, true ); + Collection* collection = ctx.db()->getOrCreateCollection(txn, ns); + invariant(collection != NULL); // should never happen + + StatusWith<DiskLoc> result = collection->insertDocument(txn, missingObj, true); uassert(15917, str::stream() << "failed to insert missing doc: " << result.toString(), result.isOK() ); diff --git a/src/mongo/db/repl/sync.h b/src/mongo/db/repl/sync.h index 67cb5e63a60..cdda55f4f13 100644 --- a/src/mongo/db/repl/sync.h +++ b/src/mongo/db/repl/sync.h @@ -49,7 +49,7 @@ namespace repl { /** * If applyOperation_inlock should be called again after an update fails. */ - virtual bool shouldRetry(const BSONObj& o); + virtual bool shouldRetry(OperationContext* txn, const BSONObj& o); void setHostname(const std::string& hostname); }; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 92eea631595..07d6eab4243 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -78,7 +78,8 @@ namespace repl { /* apply the log op that is in param o @return bool success (true) or failure (false) */ - bool SyncTail::syncApply(const BSONObj &op, bool convertUpdateToUpsert) { + bool SyncTail::syncApply( + OperationContext* txn, const BSONObj &op, bool convertUpdateToUpsert) { const char *ns = op.getStringField("ns"); verify(ns); @@ -94,25 +95,24 @@ namespace repl { bool isCommand(op["op"].valuestrsafe()[0] == 'c'); - OperationContextImpl txn; boost::scoped_ptr<Lock::ScopedLock> lk; if(isCommand) { // a command may need a global write lock. so we will conservatively go // ahead and grab one here. suboptimal. :-( - lk.reset(new Lock::GlobalWrite()); + lk.reset(new Lock::GlobalWrite(txn->lockState())); } else { // DB level lock for this operation - lk.reset(new Lock::DBWrite(txn.lockState(), ns)); + lk.reset(new Lock::DBWrite(txn->lockState(), ns)); } - Client::Context ctx(ns, storageGlobalParams.dbpath); + Client::Context ctx(ns); ctx.getClient()->curop()->reset(); // For non-initial-sync, we convert updates to upserts // to suppress errors when replaying oplog entries. - bool ok = !applyOperation_inlock(&txn, ctx.db(), op, true, convertUpdateToUpsert); + bool ok = !applyOperation_inlock(txn, ctx.db(), op, true, convertUpdateToUpsert); opsAppliedStats.increment(); - txn.recoveryUnit()->commitIfNeeded(); + txn->recoveryUnit()->commitIfNeeded(); return ok; } @@ -325,7 +325,9 @@ namespace repl { // become primary if (!theReplSet->isSecondary()) { OpTime minvalid; - theReplSet->tryToGoLiveAsASecondary(minvalid); + + OperationContextImpl txn; + theReplSet->tryToGoLiveAsASecondary(&txn, minvalid); } // normally msgCheckNewState gets called periodically, but in a single node @@ -555,7 +557,8 @@ namespace repl { it != ops.end(); ++it) { try { - if (!st->syncApply(*it, convertUpdatesToUpserts)) { + OperationContextImpl txn; + if (!st->syncApply(&txn, *it, convertUpdatesToUpserts)) { fassertFailedNoTrace(16359); } } catch (const DBException& e) { @@ -573,15 +576,18 @@ namespace repl { it != ops.end(); ++it) { try { - if (!st->syncApply(*it)) { + OperationContextImpl txn; + + if (!st->syncApply(&txn, *it)) { bool status; { - Lock::GlobalWrite lk; - status = st->shouldRetry(*it); + Lock::GlobalWrite lk(txn.lockState()); + status = st->shouldRetry(&txn, *it); } + if (status) { // retry - if (!st->syncApply(*it)) { + if (!st->syncApply(&txn, *it)) { fassertFailedNoTrace(15915); } } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 581a37498ae..ea9995aaed2 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -34,6 +34,9 @@ #include "mongo/db/repl/sync.h" namespace mongo { + + class OperationContext; + namespace repl { class BackgroundSyncInterface; @@ -46,7 +49,9 @@ namespace repl { public: SyncTail(BackgroundSyncInterface *q); virtual ~SyncTail(); - virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false); + virtual bool syncApply(OperationContext* txn, + const BSONObj &o, + bool convertUpdateToUpsert = false); /** * Apply ops from applyGTEObj's ts to at least minValidObj's ts. Note that, due to diff --git a/src/mongo/db/restapi.cpp b/src/mongo/db/restapi.cpp index 657dc627d16..1653bdd4822 100644 --- a/src/mongo/db/restapi.cpp +++ b/src/mongo/db/restapi.cpp @@ -297,7 +297,8 @@ namespace mongo { virtual void run( stringstream& ss ) { Timer t; - readlocktry lk( 300 ); + LockState lockState; + readlocktry lk(&lockState, 300); if ( lk.got() ) { _gotLock( t.millis() , ss ); } diff --git a/src/mongo/db/storage/mmap_v1/dur.cpp b/src/mongo/db/storage/mmap_v1/dur.cpp index 0bfab752496..377d608f3d7 100644 --- a/src/mongo/db/storage/mmap_v1/dur.cpp +++ b/src/mongo/db/storage/mmap_v1/dur.cpp @@ -76,6 +76,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/storage/mmap_v1/dur_commitjob.h" #include "mongo/db/storage/mmap_v1/dur_journal.h" @@ -282,7 +283,7 @@ namespace mongo { case '\0': { // lock_w() can call in this state at times if a commit is needed before attempting // its lock. - Lock::GlobalRead r; + Lock::GlobalRead r(&cc().lockState()); if( commitJob.bytes() < UncommittedBytesLimit ) { // someone else beat us to it // @@ -307,7 +308,7 @@ namespace mongo { LOG(1) << "commitIfNeeded upgrading from shared write to exclusive write state" << endl; - Lock::UpgradeGlobalLockToExclusive ex; + Lock::UpgradeGlobalLockToExclusive ex(&cc().lockState()); if (ex.gotUpgrade()) { commitNow(); } @@ -575,7 +576,7 @@ namespace mongo { // probably: as this is a read lock, it wouldn't change anything if only reads anyway. // also needs to stop greed. our time to work before clearing lk1 is not too bad, so // not super critical, but likely 'correct'. todo. - scoped_ptr<Lock::GlobalRead> lk1( new Lock::GlobalRead() ); + scoped_ptr<Lock::GlobalRead> lk1(new Lock::GlobalRead(&cc().lockState())); SimpleMutex::scoped_lock lk2(commitJob.groupCommitMutex); @@ -774,7 +775,7 @@ namespace mongo { // getting a write lock is helpful also as we need to be greedy and not be starved here // note our "stopgreed" parm -- to stop greed by others while we are working. you can't write // anytime soon anyway if we are journaling for a while, that was the idea. - Lock::GlobalWrite w(/*stopgreed:*/true); + Lock::GlobalWrite w(&cc().lockState()); w.downgrade(); groupCommit(&w); } @@ -848,7 +849,7 @@ namespace mongo { cc().shutdown(); } - void recover(); + void recover(OperationContext* txn); void preallocateFiles(); /** at startup, recover, and then start the journal threads */ @@ -872,8 +873,10 @@ namespace mongo { DurableInterface::enableDurability(); journalMakeDir(); + + OperationContextImpl txn; try { - recover(); + recover(&txn); } catch(DBException& e) { log() << "dbexception during recovery: " << e.toString() << endl; diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.cpp b/src/mongo/db/storage/mmap_v1/dur_recover.cpp index f912375af51..c6bba357c7a 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recover.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recover.cpp @@ -578,10 +578,10 @@ namespace mongo { called during startup throws on error */ - void recover() { + void recover(OperationContext* txn) { // we use a lock so that exitCleanly will wait for us // to finish (or at least to notice what is up and stop) - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); // this is so the mutexdebugger doesn't get confused. we are actually single threaded // at this point in the program so it wouldn't have been a true problem (I think) diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp index a4c4fe12b23..b2124d86259 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp @@ -134,7 +134,7 @@ namespace mongo { if ( !preallocateOnly ) { while ( n >= (int) _files.size() ) { verify(this); - if( !Lock::isWriteLocked(_dbname) ) { + if (!txn->lockState()->isWriteLocked(_dbname)) { log() << "error: getFile() called in a read lock, yet file to return is not yet open"; log() << " getFile(" << n << ") _files.size:" <<_files.size() << ' ' << fileName(n).string(); invariant(false); |