diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-05-28 13:49:34 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-06-02 22:52:46 -0400 |
commit | 8c9fcc939f9f1a2b593e606bd790cc87efd4064f (patch) | |
tree | beaa313f3e53cf72ca76aa5392946b97736ea6b3 /src/mongo/db/repl | |
parent | 4add46aa8dd05a5c6d8af2c798eef6e9b5e4164b (diff) | |
download | mongo-8c9fcc939f9f1a2b593e606bd790cc87efd4064f.tar.gz |
SERVER-13961 Start using LockState from the OperationContext
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/consensus.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/health.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_web_handler.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/resync.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initiate.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/sync.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 7 |
20 files changed, 142 insertions, 108 deletions
diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp index 34d295bfe52..9a23f9f559e 100644 --- a/src/mongo/db/repl/consensus.cpp +++ b/src/mongo/db/repl/consensus.cpp @@ -457,7 +457,7 @@ namespace repl { setElectionTime(getNextGlobalOptime()); - rs.assumePrimary(); + rs._assumePrimary(); } } } diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index cfb1bd780de..aaf11da5759 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -249,7 +249,7 @@ namespace repl { string myMinValid; try { - readlocktry lk(/*"local.replset.minvalid", */300); + readlocktry lk(txn->lockState(), /*"local.replset.minvalid", */300); if( lk.got() ) { BSONObj mv; if( Helpers::getSingleton(txn, "local.replset.minvalid", mv) ) { diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index d03bf04f756..794f010fea3 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -30,10 +30,12 @@ #include "mongo/db/repl/initial_sync.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replset_commands.h" #include "mongo/db/repl/rs.h" + namespace mongo { namespace repl { InitialSync::InitialSync(BackgroundSyncInterface *q) : @@ -52,7 +54,8 @@ namespace repl { } // create the initial oplog entry - syncApply(applyGTEObj); + OperationContextImpl txn; + syncApply(&txn, applyGTEObj); _logOpObjRS(applyGTEObj); return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply); diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 93681b32d28..5405307dd5c 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -331,7 +331,7 @@ namespace repl { void ReplSource::forceResync( OperationContext* txn, const char *requester ) { BSONObj info; { - dbtemprelease t; + dbtemprelease t(txn->lockState()); if (!oplogReader.connect(hostName, _me)) { msgassertedNoTrace( 14051 , "unable to connect to resync"); } @@ -445,7 +445,7 @@ namespace repl { OpTime lastTime; bool dbOk = false; { - dbtemprelease release; + dbtemprelease release(txn->lockState()); // We always log an operation after executing it (never before), so // a database list will always be valid as of an oplog entry generated @@ -512,7 +512,7 @@ namespace repl { bool failedUpdate = applyOperation_inlock( txn, db, op ); if (failedUpdate) { Sync sync(hostName); - if (sync.shouldRetry(op)) { + if (sync.shouldRetry(txn, op)) { uassert(15914, "Failure retrying initial sync update", !applyOperation_inlock(txn, db, op)); @@ -535,7 +535,7 @@ namespace repl { @param alreadyLocked caller already put us in write lock if true */ - void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked) { + void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked) { LOG(6) << "processing op: " << op << endl; if( op.getStringField("op")[0] == 'n' ) @@ -562,8 +562,6 @@ namespace repl { if ( !only.empty() && only != clientName ) return; - OperationContextImpl txn; // XXX? - if (replSettings.pretouch && !alreadyLocked/*doesn't make sense if in write lock already*/) { if (replSettings.pretouch > 1) { @@ -592,17 +590,17 @@ namespace repl { a += m; } // we do one too... - pretouchOperation(&txn, op); + pretouchOperation(txn, op); tp->join(); countdown = v.size(); } } else { - pretouchOperation(&txn, op); + pretouchOperation(txn, op); } } - scoped_ptr<Lock::GlobalWrite> lk( alreadyLocked ? 0 : new Lock::GlobalWrite() ); + scoped_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState())); if ( replAllDead ) { // hmmm why is this check here and not at top of this function? does it get set between top and here? @@ -610,7 +608,7 @@ namespace repl { throw SyncException(); } - if ( !handleDuplicateDbName( &txn, op, ns, clientName ) ) { + if (!handleDuplicateDbName(txn, op, ns, clientName)) { return; } @@ -625,7 +623,7 @@ namespace repl { // always apply admin command command // this is a bit hacky -- the semantics of replication/commands aren't well specified if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) { - applyOperation( &txn, ctx.db(), op ); + applyOperation(txn, ctx.db(), op); return; } @@ -647,14 +645,14 @@ namespace repl { save(); Client::Context ctx(ns); nClonedThisPass++; - resync(&txn, ctx.db()->name()); + resync(txn, ctx.db()->name()); addDbNextPass.erase(clientName); incompleteCloneDbs.erase( clientName ); } save(); } else { - applyOperation( &txn, ctx.db(), op ); + applyOperation(txn, ctx.db(), op); addDbNextPass.erase( clientName ); } } @@ -723,7 +721,7 @@ namespace repl { 0 ok, don't sleep 1 ok, sleep */ - int ReplSource::sync_pullOpLog(int& nApplied) { + int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { int okResultCode = 1; string ns = string("local.oplog.$") + sourceName(); LOG(2) << "repl: sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; @@ -757,7 +755,7 @@ namespace repl { } // obviously global isn't ideal, but non-repl set is old so // keeping it simple - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); save(); } @@ -794,7 +792,7 @@ namespace repl { b.append("ns", *i + '.'); b.append("op", "db"); BSONObj op = b.done(); - sync_pullOpLog_applyOperation(op, false); + _sync_pullOpLog_applyOperation(txn, op, false); } } @@ -809,7 +807,7 @@ namespace repl { log() << "repl: " << ns << " oplog is empty" << endl; } { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); save(); } return okResultCode; @@ -880,11 +878,11 @@ namespace repl { bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place. if ( moreInitialSyncsPending || !oplogReader.more() ) { - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); // NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior. { - dbtemprelease t; + dbtemprelease t(txn->lockState()); if ( !moreInitialSyncsPending && oplogReader.more() ) { continue; } @@ -905,7 +903,7 @@ namespace repl { OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) { // periodically note our progress, in case we are doing a lot of work and crash - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); syncedTo = nextOpTime; // can't update local log ts since there are pending operations from our peer save(); @@ -919,7 +917,7 @@ namespace repl { int b = replApplyBatchSize.get(); bool justOne = b == 1; - scoped_ptr<Lock::GlobalWrite> lk( justOne ? 0 : new Lock::GlobalWrite() ); + scoped_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState())); while( 1 ) { BSONElement ts = op.getField("ts"); @@ -944,7 +942,7 @@ namespace repl { verify( justOne ); oplogReader.putBack( op ); _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if ( n > 0 ) { syncedTo = last; save(); @@ -955,7 +953,7 @@ namespace repl { return okResultCode; } - sync_pullOpLog_applyOperation(op, !justOne); + _sync_pullOpLog_applyOperation(txn, op, !justOne); n++; if( --b == 0 ) @@ -1006,7 +1004,8 @@ namespace repl { return -1; } - return sync_pullOpLog(nApplied); + OperationContextImpl txn; // XXX? + return _sync_pullOpLog(&txn, nApplied); } /* --------------------------------------------------------------*/ @@ -1025,8 +1024,9 @@ namespace repl { OperationContextImpl txn; { ReplInfo r("replMain load sources"); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn.lockState()); ReplSource::loadAll(&txn, sources); + replSettings.fastsync = false; // only need this param for initial reset } @@ -1089,13 +1089,13 @@ namespace repl { return sleepAdvice; } - void replMain() { + static void replMain() { ReplSource::SourceVector sources; while ( 1 ) { int s = 0; { - Lock::GlobalWrite lk; OperationContextImpl txn; + Lock::GlobalWrite lk(txn.lockState()); if ( replAllDead ) { // throttledForceResyncDead can throw if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( &txn, "auto" ) ) { @@ -1106,6 +1106,7 @@ namespace repl { verify( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this. syncing++; } + try { int nApplied = 0; s = _replMain(sources, nApplied); @@ -1122,8 +1123,10 @@ namespace repl { out() << "caught exception in _replMain" << endl; s = 4; } + { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); verify( syncing == 1 ); syncing--; } @@ -1157,14 +1160,15 @@ namespace repl { even when things are idle. */ { - writelocktry lk(1); + OperationContextImpl txn; + writelocktry lk(txn.lockState(), 1); if ( lk.got() ) { toSleep = 10; replLocalAuth(); try { - logKeepalive(); + logKeepalive(&txn); } catch(...) { log() << "caught exception in replMasterThread()" << endl; @@ -1178,12 +1182,13 @@ namespace repl { } } - void replSlaveThread() { + static void replSlaveThread() { sleepsecs(1); Client::initThread("replslave"); { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); replLocalAuth(); } @@ -1217,7 +1222,8 @@ namespace repl { return; { - Lock::GlobalWrite lk; + LockState lockState; + Lock::GlobalWrite lk(&lockState); replLocalAuth(); } @@ -1249,7 +1255,8 @@ namespace repl { } OperationContextImpl txn; // XXX - Lock::GlobalRead lk; + Lock::GlobalRead lk(txn.lockState()); + for( unsigned i = a; i <= b; i++ ) { const BSONObj& op = v[i]; const char *which = "o"; diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 15445f68ede..f30f9f2bdd5 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -83,12 +83,12 @@ namespace repl { void resync(OperationContext* txn, const std::string& dbName); /** @param alreadyLocked caller already put us in write lock if true */ - void sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked); + void _sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked); /* pull some operations from the master's oplog, and apply them. calls sync_pullOpLog_applyOperation */ - int sync_pullOpLog(int& nApplied); + int _sync_pullOpLog(OperationContext* txn, int& nApplied); /* we only clone one database per pass, even if a lot need done. This helps us avoid overflowing the master's transaction log by doing too much work before going diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 1c40bbdfa6f..bf5cf5348b9 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -392,13 +392,11 @@ namespace repl { } void oldRepl() { _logOp = _logOpOld; } - void logKeepalive() { - OperationContextImpl txn; - _logOp(&txn, "n", "", 0, BSONObj(), 0, 0, false); + void logKeepalive(OperationContext* txn) { + _logOp(txn, "n", "", 0, BSONObj(), 0, 0, false); } - void logOpComment(const BSONObj& obj) { - OperationContextImpl txn; - _logOp(&txn, "n", "", 0, obj, 0, 0, false); + void logOpComment(OperationContext* txn, const BSONObj& obj) { + _logOp(txn, "n", "", 0, obj, 0, 0, false); } void logOpInitiate(OperationContext* txn, const BSONObj& obj) { _logOpRS(txn, "n", "", 0, obj, 0, 0, false); @@ -433,7 +431,8 @@ namespace repl { } void createOplog() { - Lock::GlobalWrite lk; + OperationContextImpl txn; + Lock::GlobalWrite lk(txn.lockState()); const char * ns = "local.oplog.$main"; @@ -442,7 +441,6 @@ namespace repl { ns = rsoplog; Client::Context ctx(ns); - OperationContextImpl txn; Collection* collection = ctx.db()->getCollection( &txn, ns ); if ( collection ) { @@ -460,7 +458,7 @@ namespace repl { if( rs ) return; - initOpTimeFromOplog(ns); + initOpTimeFromOplog(&txn, ns); return; } @@ -711,12 +709,12 @@ namespace repl { Status status = Command::getStatusFromCommandResult(ob.done()); switch (status.code()) { case ErrorCodes::BackgroundOperationInProgressForDatabase: { - dbtemprelease release; + dbtemprelease release(txn->lockState()); BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns)); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { - dbtemprelease release; + dbtemprelease release(txn->lockState());; Command* cmd = Command::findCommand(o.firstElement().fieldName()); invariant(cmd); BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nsToDatabase(ns), o)); @@ -761,8 +759,8 @@ namespace repl { } - void initOpTimeFromOplog(const std::string& oplogNS) { - DBDirectClient c; + void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) { + DBDirectClient c(txn); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 40803c0a007..6728811667e 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -78,13 +78,13 @@ namespace repl { bool fromMigrate = false); // Log an empty no-op operation to the local oplog - void logKeepalive(); + void logKeepalive(OperationContext* txn); /** puts obj in the oplog as a comment (a no-op). Just for diags. convention is { msg : "text", ... } */ - void logOpComment(const BSONObj& obj); + void logOpComment(OperationContext* txn, const BSONObj& obj); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. @@ -112,6 +112,6 @@ namespace repl { /** * Initializes the global OpTime with the value from the timestamp of the last oplog entry. */ - void initOpTimeFromOplog(const std::string& oplogNS); + void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index 5d7a37d8806..c5b00f59c20 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -109,7 +109,7 @@ namespace { } } - void ReplSetImpl::assumePrimary() { + void ReplSetImpl::_assumePrimary() { LOG(1) << "replSet assuming primary" << endl; verify(iAmPotentiallyHot()); @@ -119,9 +119,11 @@ namespace { // Lock here to prevent stepping down & becoming primary from getting interleaved LOG(1) << "replSet waiting for global write lock"; - Lock::GlobalWrite lk; - initOpTimeFromOplog("local.oplog.rs"); + OperationContextImpl txn; // XXX? + Lock::GlobalWrite lk(txn.lockState()); + + initOpTimeFromOplog(&txn, "local.oplog.rs"); // Generate new election unique id elect.setElectionId(OID::gen()); @@ -138,8 +140,10 @@ namespace { bool ReplSetImpl::setMaintenanceMode(const bool inc) { lock replLock(this); + // Lock here to prevent state from changing between checking the state and changing it - Lock::GlobalWrite writeLock; + LockState lockState; + Lock::GlobalWrite writeLock(&lockState); if (box.getState().primary()) { return false; @@ -191,7 +195,8 @@ namespace { void ReplSetImpl::relinquish() { { - Lock::GlobalWrite lk; // so we are synchronized with _logOp() + LockState lockState; + Lock::GlobalWrite writeLock(&lockState); // so we are synchronized with _logOp() LOG(2) << "replSet attempting to relinquish" << endl; if (box.getState().primary()) { diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index b5b6254826d..da6d2efdf3d 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -41,7 +41,9 @@ #include "mongo/util/concurrency/value.h" namespace mongo { + class Cloner; + class OperationContext; namespace repl { @@ -118,7 +120,7 @@ namespace repl { bool _stepDown(int secs); bool _freeze(int secs); private: - void assumePrimary(); + void _assumePrimary(); void loadLastOpTimeWritten(bool quiet=false); void changeState(MemberState s); @@ -288,7 +290,7 @@ namespace repl { void syncDoInitialSync(); void _syncThread(); void syncTail(); - unsigned _syncRollback(OplogReader& r); + unsigned _syncRollback(OperationContext* txn, OplogReader& r); void syncFixUp(FixUpInfo& h, OplogReader& r); // keep a list of hosts that we've tried recently that didn't work @@ -317,7 +319,7 @@ namespace repl { threadpool::ThreadPool& getWriterPool() { return _writerPool; } const ReplSetConfig::MemberCfg& myConfig() const { return _config; } - bool tryToGoLiveAsASecondary(OpTime&); // readlocks + bool tryToGoLiveAsASecondary(OperationContext* txn, OpTime&); // readlocks void syncRollback(OplogReader& r); void syncThread(); const OpTime lastOtherOpTime() const; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 5605d962987..b7df8ebe227 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -186,14 +186,14 @@ namespace repl { virtual bool run(OperationContext* txn, const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) { try { rwlock_try_write lk(mutex); - return _run(a,b,e,errmsg,c,d); + return _run(txn, a,b,e,errmsg,c,d); } catch(rwlock_try_write::exception&) { } errmsg = "a replSetReconfig is already in progress"; return false; } private: - bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + bool _run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( cmdObj["replSetReconfig"].type() != Object ) { errmsg = "no configuration specified"; return false; @@ -220,7 +220,7 @@ namespace repl { // later. of course it could be stuck then, but this check lowers the risk if weird things // are up - we probably don't want a change to apply 30 minutes after the initial attempt. time_t t = time(0); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if( time(0)-t > 20 ) { errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; return false; diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp index 8c71fa2748b..21f013318ea 100644 --- a/src/mongo/db/repl/replset_web_handler.cpp +++ b/src/mongo/db/repl/replset_web_handler.cpp @@ -51,9 +51,13 @@ namespace repl { } virtual void handle( OperationContext* txn, - const char *rq, const std::string& url, BSONObj params, - string& responseMsg, int& responseCode, - vector<string>& headers, const SockAddr &from ) { + const char *rq, + const std::string& url, + BSONObj params, + string& responseMsg, + int& responseCode, + vector<string>& headers, + const SockAddr &from ) { if( url == "/_replSetOplog" ) { responseMsg = _replSetOplog(params); diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp index 3bd6b268cdf..d1c693d4b92 100644 --- a/src/mongo/db/repl/resync.cpp +++ b/src/mongo/db/repl/resync.cpp @@ -67,7 +67,7 @@ namespace repl { bool fromRepl) { const std::string ns = parseNs(dbname, cmdObj); - Lock::GlobalWrite globalWriteLock; + Lock::GlobalWrite globalWriteLock(txn->lockState()); Client::Context ctx(ns); if (replSettings.usingReplSets()) { if (!theReplSet) { @@ -83,7 +83,7 @@ namespace repl { // below this comment pertains only to master/slave replication if ( cmdObj.getBoolField( "force" ) ) { - if ( !waitForSyncToFinish( errmsg ) ) + if ( !waitForSyncToFinish(txn, errmsg ) ) return false; replAllDead = "resync forced"; } @@ -91,14 +91,15 @@ namespace repl { errmsg = "not dead, no need to resync"; return false; } - if ( !waitForSyncToFinish( errmsg ) ) + if ( !waitForSyncToFinish(txn, errmsg ) ) return false; ReplSource::forceResyncDead( txn, "client" ); result.append( "info", "triggered resync for all sources" ); return true; } - bool waitForSyncToFinish( string &errmsg ) const { + + bool waitForSyncToFinish(OperationContext* txn, string &errmsg) const { // Wait for slave thread to finish syncing, so sources will be be // reloaded with new saved state on next pass. Timer t; @@ -106,7 +107,7 @@ namespace repl { if ( syncing == 0 || t.millis() > 30000 ) break; { - Lock::TempRelease t; + Lock::TempRelease t(txn->lockState()); relinquishSyncingSome = 1; sleepmillis(1); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index be6c8aaf75a..5faea85e949 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -393,6 +393,8 @@ namespace repl { return; } + OperationContextImpl txn; + // written by applyToHead calls BSONObj minValid; @@ -408,7 +410,7 @@ namespace repl { theReplSet->setInitialSyncFlag(); sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(); + dropAllDatabasesExceptLocal(&txn); sethbmsg("initial sync clone all databases", 0); @@ -467,7 +469,6 @@ namespace repl { verify( !box.getState().primary() ); // wouldn't make sense if we were. { - OperationContextImpl txn; Client::WriteContext cx(&txn, "local."); cx.ctx().db()->flushFiles(true); diff --git a/src/mongo/db/repl/rs_initiate.cpp b/src/mongo/db/repl/rs_initiate.cpp index e50d2a80568..c0cd5042abd 100644 --- a/src/mongo/db/repl/rs_initiate.cpp +++ b/src/mongo/db/repl/rs_initiate.cpp @@ -197,7 +197,7 @@ namespace repl { // later. of course it could be stuck then, but this check lowers the risk if weird things // are up. time_t t = time(0); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); if( time(0)-t > 10 ) { errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; return false; @@ -270,7 +270,7 @@ namespace repl { createOplog(); - Lock::GlobalWrite lk; + Lock::GlobalWrite lk(txn->lockState()); bo comment = BSON( "msg" << "initiating set"); newConfig->saveConfigLocally(comment); log() << "replSet replSetInitiate config now saved locally. " diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 9d07024e16a..4ecf5407c24 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -428,7 +428,7 @@ namespace repl { ctx.db()->dropCollection(&txn, ns); { string errmsg; - dbtemprelease release; + dbtemprelease release(txn.lockState()); bool ok = Cloner::copyCollectionFromRemote(&txn, them->getServerAddress(), ns, errmsg); uassert(15909, str::stream() << "replSet rollback error resyncing collection " @@ -662,8 +662,9 @@ namespace repl { void ReplSetImpl::syncRollback(OplogReader& oplogreader) { // check that we are at minvalid, otherwise we cannot rollback as we may be in an // inconsistent state + OperationContextImpl txn; + { - OperationContextImpl txn; Lock::DBRead lk(txn.lockState(), "local.replset.minvalid"); BSONObj mv; if (Helpers::getSingleton(&txn, "local.replset.minvalid", mv)) { @@ -678,18 +679,18 @@ namespace repl { } } - unsigned s = _syncRollback(oplogreader); + unsigned s = _syncRollback(&txn, oplogreader); if (s) sleepsecs(s); } - unsigned ReplSetImpl::_syncRollback(OplogReader& oplogreader) { + unsigned ReplSetImpl::_syncRollback(OperationContext* txn, OplogReader& oplogreader) { verify(!lockedByMe()); verify(!Lock::isLocked()); sethbmsg("rollback 0"); - writelocktry lk(20000); + writelocktry lk(txn->lockState(), 20000); if (!lk.got()) { sethbmsg("rollback couldn't get write lock in a reasonable time"); return 2; @@ -721,7 +722,7 @@ namespace repl { } catch (DBException& e) { sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); - dbtemprelease release; + dbtemprelease release(txn->lockState()); sleepsecs(60); throw; } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 2fd1aa5b0f4..ad9c8a5ebe7 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -72,7 +72,7 @@ namespace repl { readlocks @return true if transitioned to SECONDARY */ - bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool ReplSetImpl::tryToGoLiveAsASecondary(OperationContext* txn, OpTime& /*out*/ minvalid) { bool golive = false; lock rsLock( this ); @@ -87,7 +87,7 @@ namespace repl { return false; } - Lock::GlobalWrite writeLock; + Lock::GlobalWrite writeLock(txn->lockState()); // make sure we're not primary, secondary, rollback, or fatal already if (box.getState().primary() || box.getState().secondary() || diff --git a/src/mongo/db/repl/sync.cpp b/src/mongo/db/repl/sync.cpp index 8ca15ed9386..16e6225a1fb 100644 --- a/src/mongo/db/repl/sync.cpp +++ b/src/mongo/db/repl/sync.cpp @@ -36,7 +36,6 @@ #include "mongo/db/diskloc.h" #include "mongo/db/pdfile.h" #include "mongo/db/repl/oplogreader.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/catalog/collection.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -106,16 +105,17 @@ namespace repl { str::stream() << "Can no longer connect to initial sync source: " << hn); } - bool Sync::shouldRetry(const BSONObj& o) { + bool Sync::shouldRetry(OperationContext* txn, const BSONObj& o) { + invariant(txn->lockState()->hasAnyWriteLock()); + // should already have write lock const char *ns = o.getStringField("ns"); Client::Context ctx(ns); - OperationContextImpl txn; // we don't have the object yet, which is possible on initial sync. get it. log() << "replication info adding missing object" << endl; // rare enough we can log - BSONObj missingObj = getMissingDoc(&txn, ctx.db(), o); + BSONObj missingObj = getMissingDoc(txn, ctx.db(), o); if( missingObj.isEmpty() ) { log() << "replication missing object not found on source. presumably deleted later in oplog" << endl; @@ -125,9 +125,10 @@ namespace repl { return false; } else { - Collection* collection = ctx.db()->getOrCreateCollection( &txn, ns ); - verify( collection ); // should never happen - StatusWith<DiskLoc> result = collection->insertDocument( &txn, missingObj, true ); + Collection* collection = ctx.db()->getOrCreateCollection(txn, ns); + invariant(collection != NULL); // should never happen + + StatusWith<DiskLoc> result = collection->insertDocument(txn, missingObj, true); uassert(15917, str::stream() << "failed to insert missing doc: " << result.toString(), result.isOK() ); diff --git a/src/mongo/db/repl/sync.h b/src/mongo/db/repl/sync.h index 67cb5e63a60..cdda55f4f13 100644 --- a/src/mongo/db/repl/sync.h +++ b/src/mongo/db/repl/sync.h @@ -49,7 +49,7 @@ namespace repl { /** * If applyOperation_inlock should be called again after an update fails. */ - virtual bool shouldRetry(const BSONObj& o); + virtual bool shouldRetry(OperationContext* txn, const BSONObj& o); void setHostname(const std::string& hostname); }; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 92eea631595..07d6eab4243 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -78,7 +78,8 @@ namespace repl { /* apply the log op that is in param o @return bool success (true) or failure (false) */ - bool SyncTail::syncApply(const BSONObj &op, bool convertUpdateToUpsert) { + bool SyncTail::syncApply( + OperationContext* txn, const BSONObj &op, bool convertUpdateToUpsert) { const char *ns = op.getStringField("ns"); verify(ns); @@ -94,25 +95,24 @@ namespace repl { bool isCommand(op["op"].valuestrsafe()[0] == 'c'); - OperationContextImpl txn; boost::scoped_ptr<Lock::ScopedLock> lk; if(isCommand) { // a command may need a global write lock. so we will conservatively go // ahead and grab one here. suboptimal. :-( - lk.reset(new Lock::GlobalWrite()); + lk.reset(new Lock::GlobalWrite(txn->lockState())); } else { // DB level lock for this operation - lk.reset(new Lock::DBWrite(txn.lockState(), ns)); + lk.reset(new Lock::DBWrite(txn->lockState(), ns)); } - Client::Context ctx(ns, storageGlobalParams.dbpath); + Client::Context ctx(ns); ctx.getClient()->curop()->reset(); // For non-initial-sync, we convert updates to upserts // to suppress errors when replaying oplog entries. - bool ok = !applyOperation_inlock(&txn, ctx.db(), op, true, convertUpdateToUpsert); + bool ok = !applyOperation_inlock(txn, ctx.db(), op, true, convertUpdateToUpsert); opsAppliedStats.increment(); - txn.recoveryUnit()->commitIfNeeded(); + txn->recoveryUnit()->commitIfNeeded(); return ok; } @@ -325,7 +325,9 @@ namespace repl { // become primary if (!theReplSet->isSecondary()) { OpTime minvalid; - theReplSet->tryToGoLiveAsASecondary(minvalid); + + OperationContextImpl txn; + theReplSet->tryToGoLiveAsASecondary(&txn, minvalid); } // normally msgCheckNewState gets called periodically, but in a single node @@ -555,7 +557,8 @@ namespace repl { it != ops.end(); ++it) { try { - if (!st->syncApply(*it, convertUpdatesToUpserts)) { + OperationContextImpl txn; + if (!st->syncApply(&txn, *it, convertUpdatesToUpserts)) { fassertFailedNoTrace(16359); } } catch (const DBException& e) { @@ -573,15 +576,18 @@ namespace repl { it != ops.end(); ++it) { try { - if (!st->syncApply(*it)) { + OperationContextImpl txn; + + if (!st->syncApply(&txn, *it)) { bool status; { - Lock::GlobalWrite lk; - status = st->shouldRetry(*it); + Lock::GlobalWrite lk(txn.lockState()); + status = st->shouldRetry(&txn, *it); } + if (status) { // retry - if (!st->syncApply(*it)) { + if (!st->syncApply(&txn, *it)) { fassertFailedNoTrace(15915); } } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 581a37498ae..ea9995aaed2 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -34,6 +34,9 @@ #include "mongo/db/repl/sync.h" namespace mongo { + + class OperationContext; + namespace repl { class BackgroundSyncInterface; @@ -46,7 +49,9 @@ namespace repl { public: SyncTail(BackgroundSyncInterface *q); virtual ~SyncTail(); - virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false); + virtual bool syncApply(OperationContext* txn, + const BSONObj &o, + bool convertUpdateToUpsert = false); /** * Apply ops from applyGTEObj's ts to at least minValidObj's ts. Note that, due to |