diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-07-17 17:03:15 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-07-18 15:16:14 -0400 |
commit | 3c3d656668e26645492ee3dafb241631352426d4 (patch) | |
tree | f69bcbf52ad1ef98aba01cdcb61b86f44e34d4ca /src/mongo/db/repl | |
parent | 848999f1527e8390d4d76f9fad0860218b73be4d (diff) | |
download | mongo-3c3d656668e26645492ee3dafb241631352426d4.tar.gz |
SERVER-13961 Cleanup some nested instantiations of OperationContextImpl in repl
These are causing deadlocks if LockState is removed from TLS because of
pseudo-conflicting locks on the same code path.
Diffstat (limited to 'src/mongo/db/repl')
23 files changed, 209 insertions, 198 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index beb4904428b..6eba92a1625 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -31,6 +31,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" @@ -153,6 +154,8 @@ namespace repl { } void BackgroundSync::_producerThread() { + OperationContextImpl txn; + MemberState state = theReplSet->state(); // we want to pause when the state changes to primary @@ -180,16 +183,16 @@ namespace repl { start(); } - produce(); + produce(&txn); } - void BackgroundSync::produce() { + void BackgroundSync::produce(OperationContext* txn) { // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced OplogReader r; OpTime lastOpTimeFetched; // find a target to sync from the last op time written - getOplogReader(r); + getOplogReader(txn, r); // no server found { @@ -365,7 +368,7 @@ namespace repl { return true; } - void BackgroundSync::getOplogReader(OplogReader& r) { + void BackgroundSync::getOplogReader(OperationContext* txn, OplogReader& r) { const Member *target = NULL, *stale = NULL; BSONObj oldest; @@ -419,7 +422,7 @@ namespace repl { // the only viable sync target was stale if (stale) { - theReplSet->goStale(stale, oldest); + theReplSet->goStale(txn, stale, oldest); sleepsecs(120); } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 9539d11aa0f..e6e701de237 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -98,10 +98,10 @@ namespace repl { // Production thread void _producerThread(); // Adds elements to the list, up to maxSize. - void produce(); + void produce(OperationContext* txn); // Check if rollback is necessary bool isRollbackRequired(OplogReader& r); - void getOplogReader(OplogReader& r); + void getOplogReader(OperationContext* txn, OplogReader& r); // Evaluate if the current sync target is still good bool shouldChangeSyncTarget(); // check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp. diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index 794f010fea3..19822bbc75b 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -45,7 +45,9 @@ namespace repl { /* initial oplog application, during initial sync, after cloning. */ - BSONObj InitialSync::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) { + BSONObj InitialSync::oplogApplication(OperationContext* txn, + const BSONObj& applyGTEObj, + const BSONObj& minValidObj) { if (replSetForceInitialSyncFailure > 0) { log() << "replSet test code invoked, forced InitialSync failure: " << replSetForceInitialSyncFailure << rsLog; @@ -54,9 +56,8 @@ namespace repl { } // create the initial oplog entry - OperationContextImpl txn; - syncApply(&txn, applyGTEObj); - _logOpObjRS(applyGTEObj); + syncApply(txn, applyGTEObj); + _logOpObjRS(txn, applyGTEObj); return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply); } diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h index 3ce40226bab..90f4e9dacea 100644 --- a/src/mongo/db/repl/initial_sync.h +++ b/src/mongo/db/repl/initial_sync.h @@ -47,7 +47,9 @@ namespace repl { * Creates the initial oplog entry: applies applyGTEObj and writes it to the oplog. Then * this runs oplogApplySegment allowing recloning documents. */ - BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj); + BSONObj oplogApplication(OperationContext* txn, + const BSONObj& applyGTEObj, + const BSONObj& minValidObj); }; } // namespace repl diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index ca6e2cc2e1d..0e216cdd05b 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -93,12 +93,12 @@ namespace repl { }; - ReplSource::ReplSource() { + ReplSource::ReplSource(OperationContext* txn) { nClonedThisPass = 0; - ensureMe(); + ensureMe(txn); } - ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) { + ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) { only = o.getStringField("only"); hostName = o.getStringField("host"); _sourceName = o.getStringField("source"); @@ -132,7 +132,7 @@ namespace repl { incompleteCloneDbs.insert( e.fieldName() ); } } - ensureMe(); + ensureMe(txn); } /* Turn our C++ Source object into a BSONObj */ @@ -166,32 +166,30 @@ namespace repl { return b.obj(); } - void ReplSource::ensureMe() { + void ReplSource::ensureMe(OperationContext* txn) { string myname = getHostName(); bool exists = false; { - OperationContextImpl txn; - Client::ReadContext ctx(&txn, "local"); + Client::ReadContext ctx(txn, "local"); // local.me is an identifier for a server for getLastError w:2+ - exists = Helpers::getSingleton(&txn, "local.me", _me); + exists = Helpers::getSingleton(txn, "local.me", _me); } if (!exists || !_me.hasField("host") || _me["host"].String() != myname) { - OperationContextImpl txn; - Client::WriteContext ctx(&txn, "local"); + Client::WriteContext ctx(txn, "local"); // clean out local.me - Helpers::emptyCollection(&txn, "local.me"); + Helpers::emptyCollection(txn, "local.me"); // repopulate BSONObjBuilder b; b.appendOID("_id", 0, true); b.append("host", myname); _me = b.obj(); - Helpers::putSingleton(&txn, "local.me", _me); + Helpers::putSingleton(txn, "local.me", _me); ctx.commit(); } } - void ReplSource::save() { + void ReplSource::save(OperationContext* txn) { BSONObjBuilder b; verify( !hostName.empty() ); b.append("host", hostName); @@ -204,10 +202,9 @@ namespace repl { LOG( 1 ) << "Saving repl source: " << o << endl; { - OperationContextImpl txn; OpDebug debug; - Client::Context ctx(&txn, "local.sources"); + Client::Context ctx(txn, "local.sources"); const NamespaceString requestNs("local.sources"); UpdateRequest request(requestNs); @@ -216,14 +213,17 @@ namespace repl { request.setUpdates(o); request.setUpsert(); - UpdateResult res = update(&txn, ctx.db(), request, &debug); + UpdateResult res = update(txn, ctx.db(), request, &debug); verify( ! res.modifiers ); verify( res.numMatched == 1 ); } } - static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) { + static void addSourceToList(OperationContext* txn, + ReplSource::SourceVector &v, + ReplSource& s, + ReplSource::SourceVector &old) { if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync. for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) { if ( s == **i ) { @@ -261,7 +261,7 @@ namespace repl { Runner::RunnerState state; while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { n++; - ReplSource tmp(obj); + ReplSource tmp(txn, obj); if (tmp.hostName != replSettings.source) { log() << "repl: --source " << replSettings.source << " != " << tmp.hostName << " from local.sources collection" << endl; @@ -283,10 +283,10 @@ namespace repl { uassert( 10002 , "local.sources collection corrupt?", n<2 ); if ( n == 0 ) { // source missing. add. - ReplSource s; + ReplSource s(txn); s.hostName = replSettings.source; s.only = replSettings.only; - s.save(); + s.save(txn); } } else { @@ -305,7 +305,7 @@ namespace repl { BSONObj obj; Runner::RunnerState state; while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { - ReplSource tmp(obj); + ReplSource tmp(txn, obj); if ( tmp.syncedTo.isNull() ) { DBDirectClient c(txn); if ( c.exists( "local.oplog.$main" ) ) { @@ -315,7 +315,7 @@ namespace repl { } } } - addSourceToList(v, tmp, old); + addSourceToList(txn, v, tmp, old); } uassert(17066, "Internal error reading from local.sources", Runner::RUNNER_EOF == state); } @@ -371,7 +371,7 @@ namespace repl { } syncedTo = OpTime(); addDbNextPass.clear(); - save(); + save(txn); } void ReplSource::resyncDrop( OperationContext* txn, const string& db ) { @@ -674,14 +674,14 @@ namespace repl { if ( incompleteClone ) { log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl; } - save(); + save(txn); Client::Context ctx(txn, ns); nClonedThisPass++; resync(txn, ctx.db()->name()); addDbNextPass.erase(clientName); incompleteCloneDbs.erase( clientName ); } - save(); + save(txn); } else { applyOperation(txn, ctx.db(), op); @@ -789,7 +789,7 @@ namespace repl { // obviously global isn't ideal, but non-repl set is old so // keeping it simple Lock::GlobalWrite lk(txn->lockState()); - save(); + save(txn); } BSONObjBuilder gte; @@ -841,7 +841,7 @@ namespace repl { } { Lock::GlobalWrite lk(txn->lockState()); - save(); + save(txn); } return okResultCode; } @@ -919,7 +919,7 @@ namespace repl { } syncedTo = nextOpTime; - save(); // note how far we are synced up to now + save(txn); // note how far we are synced up to now log() << "repl: applied " << n << " operations" << endl; nApplied = n; log() << "repl: end sync_pullOpLog syncedTo: " << syncedTo.toStringLong() << endl; @@ -931,7 +931,7 @@ namespace repl { Lock::GlobalWrite lk(txn->lockState()); syncedTo = nextOpTime; // can't update local log ts since there are pending operations from our peer - save(); + save(txn); log() << "repl: checkpoint applied " << n << " operations" << endl; log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl; saveLast = time(0); @@ -972,7 +972,7 @@ namespace repl { Lock::GlobalWrite lk(txn->lockState()); if ( n > 0 ) { syncedTo = last; - save(); + save(txn); } log() << "repl: applied " << n << " operations" << endl; log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl; @@ -1047,12 +1047,11 @@ namespace repl { 0 = no sleep recommended 1 = special sentinel indicating adaptive sleep recommended */ - int _replMain(ReplSource::SourceVector& sources, int& nApplied) { - OperationContextImpl txn; + int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) { { ReplInfo r("replMain load sources"); - Lock::GlobalWrite lk(txn.lockState()); - ReplSource::loadAll(&txn, sources); + Lock::GlobalWrite lk(txn->lockState()); + ReplSource::loadAll(txn, sources); // only need this param for initial reset getGlobalReplicationCoordinator()->getSettings().fastsync = false; @@ -1117,17 +1116,16 @@ namespace repl { return sleepAdvice; } - static void replMain() { + static void replMain(OperationContext* txn) { ReplSource::SourceVector sources; while ( 1 ) { int s = 0; { - OperationContextImpl txn; - Lock::GlobalWrite lk(txn.lockState()); + Lock::GlobalWrite lk(txn->lockState()); if ( replAllDead ) { // throttledForceResyncDead can throw if ( !getGlobalReplicationCoordinator()->getSettings().autoresync || - !ReplSource::throttledForceResyncDead( &txn, "auto" ) ) { + !ReplSource::throttledForceResyncDead( txn, "auto" ) ) { log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl; break; } @@ -1138,7 +1136,7 @@ namespace repl { try { int nApplied = 0; - s = _replMain(sources, nApplied); + s = _replMain(txn, sources, nApplied); if( s == 1 ) { if( nApplied == 0 ) s = 2; else if( nApplied > 100 ) { @@ -1154,8 +1152,7 @@ namespace repl { } { - LockState lockState; - Lock::GlobalWrite lk(&lockState); + Lock::GlobalWrite lk(txn->lockState()); verify( syncing == 1 ); syncing--; } @@ -1215,15 +1212,16 @@ namespace repl { sleepsecs(1); Client::initThread("replslave"); + OperationContextImpl txn; + { - LockState lockState; - Lock::GlobalWrite lk(&lockState); + Lock::GlobalWrite lk(txn.lockState()); replLocalAuth(); } while ( 1 ) { try { - replMain(); + replMain(&txn); sleepsecs(5); } catch ( AssertionException& ) { @@ -1244,6 +1242,7 @@ namespace repl { } void startMasterSlave() { + OperationContextImpl txn; oldRepl(); @@ -1252,13 +1251,12 @@ namespace repl { return; { - LockState lockState; - Lock::GlobalWrite lk(&lockState); + Lock::GlobalWrite lk(txn.lockState()); replLocalAuth(); } { - ReplSource temp; // Ensures local.me is populated + ReplSource temp(&txn); // Ensures local.me is populated } if ( replSettings.slave ) { @@ -1270,7 +1268,7 @@ namespace repl { if ( replSettings.master ) { LOG(1) << "master=true" << endl; replSettings.master = true; - createOplog(); + createOplog(&txn); boost::thread t(replMasterThread); } diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 15fd74b2176..760f0d27c1d 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -119,7 +119,7 @@ namespace repl { const char* db ); // populates _me so that it can be passed to oplogreader for handshakes - void ensureMe(); + void ensureMe(OperationContext* txn); void forceResync(OperationContext* txn, const char *requester); @@ -142,16 +142,17 @@ namespace repl { typedef std::vector< shared_ptr< ReplSource > > SourceVector; static void loadAll(OperationContext* txn, SourceVector&); - explicit ReplSource(BSONObj); + + explicit ReplSource(OperationContext* txn, BSONObj); // This is not the constructor you are looking for. Always prefer the version that takes // a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find // out the process's RID in master/slave setups. - ReplSource(); + ReplSource(OperationContext* txn); /* -1 = error */ int sync(int& nApplied); - void save(); // write ourself to local.sources + void save(OperationContext* txn); // write ourself to local.sources // make a jsobj from our member fields of the form // { host: ..., source: ..., syncedTo: ... } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 37c3057ec15..e911c19a852 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -118,29 +118,28 @@ namespace repl { /** write an op to the oplog that is already built. todo : make _logOpRS() call this so we don't repeat ourself? */ - void _logOpObjRS(const BSONObj& op) { - OperationContextImpl txn; - Lock::DBWrite lk(txn.lockState(), "local"); + void _logOpObjRS(OperationContext* txn, const BSONObj& op) { + Lock::DBWrite lk(txn->lockState(), "local"); // XXX soon this needs to be part of an outer WUOW not its own. // We can't do this yet due to locking limitations. - WriteUnitOfWork wunit(txn.recoveryUnit()); + WriteUnitOfWork wunit(txn->recoveryUnit()); const OpTime ts = op["ts"]._opTime(); long long h = op["h"].numberLong(); { if ( localOplogRSCollection == 0 ) { - Client::Context ctx(&txn, rsoplog); + Client::Context ctx(txn, rsoplog); localDB = ctx.db(); verify( localDB ); - localOplogRSCollection = localDB->getCollection( &txn, rsoplog ); + localOplogRSCollection = localDB->getCollection(txn, rsoplog); massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", localOplogRSCollection); } - Client::Context ctx(&txn, rsoplog, localDB); - checkOplogInsert( localOplogRSCollection->insertDocument( &txn, op, false ) ); + Client::Context ctx(txn, rsoplog, localDB); + checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. this code (or code in now() maybe) should be improved. @@ -459,9 +458,8 @@ namespace repl { } } - void createOplog() { - OperationContextImpl txn; - Lock::GlobalWrite lk(txn.lockState()); + void createOplog(OperationContext* txn) { + Lock::GlobalWrite lk(txn->lockState()); const char * ns = "local.oplog.$main"; @@ -470,13 +468,13 @@ namespace repl { if( rs ) ns = rsoplog; - Client::Context ctx(&txn, ns); - Collection* collection = ctx.db()->getCollection( &txn, ns ); + Client::Context ctx(txn, ns); + Collection* collection = ctx.db()->getCollection(txn, ns ); if ( collection ) { if (replSettings.oplogSize != 0) { - int o = (int)(collection->getRecordStore()->storageSize(&txn) / ( 1024 * 1024 ) ); + int o = (int)(collection->getRecordStore()->storageSize(txn) / ( 1024 * 1024 ) ); int n = (int)(replSettings.oplogSize / (1024 * 1024)); if ( n != o ) { stringstream ss; @@ -488,7 +486,7 @@ namespace repl { if( rs ) return; - initOpTimeFromOplog(&txn, ns); + initOpTimeFromOplog(txn, ns); return; } @@ -527,10 +525,10 @@ namespace repl { options.cappedSize = sz; options.autoIndexId = CollectionOptions::NO; - WriteUnitOfWork wunit(txn.recoveryUnit()); - invariant( ctx.db()->createCollection( &txn, ns, options ) ); + WriteUnitOfWork wunit(txn->recoveryUnit()); + invariant(ctx.db()->createCollection(txn, ns, options)); if( !rs ) - logOp( &txn, "n", "", BSONObj() ); + logOp(txn, "n", "", BSONObj() ); wunit.commit(); /* sync here so we don't get any surprising lag later when we try to sync */ diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 1c3067a4182..79574c8e6d3 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -48,11 +48,11 @@ namespace repl { // Create a new capped collection for the oplog if it doesn't yet exist. // This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave) // If the collection already exists, set the 'last' OpTime if master/slave (side effect!) - void createOplog(); + void createOplog(OperationContext* txn); // This poorly-named function writes an op into the replica-set oplog; // used internally by replication secondaries after they have applied an op - void _logOpObjRS(const BSONObj& op); + void _logOpObjRS(OperationContext* txn, const BSONObj& op); const char rsoplog[] = "local.oplog.rs"; diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index 9f68a8a7116..c8bb9bf3a50 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -36,7 +36,7 @@ #include "mongo/bson/optime.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/instance.h" -#include "mongo/db/operation_context.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/connections.h" #include "mongo/db/repl/master_slave.h" @@ -419,7 +419,8 @@ namespace { if (mode == modeReplSet) { return theReplSet->syncSourceFeedback.getMyRID(); } else if (mode == modeMasterSlave) { - ReplSource source; + OperationContextImpl txn; + ReplSource source(&txn); return source.getMyRID(); } invariant(false); // Don't have an RID if no replication is enabled @@ -614,7 +615,7 @@ namespace { log() << "replSet replSetReconfig [2]" << rsLog; - theReplSet->haveNewConfig(*newConfig, true); + theReplSet->haveNewConfig(txn, *newConfig, true); ReplSet::startupStatusMsg.set("replSetReconfig'd"); } catch(const DBException& e) { @@ -753,11 +754,11 @@ namespace { log() << "replSet replSetInitiate all members seem up" << rsLog; - createOplog(); + createOplog(txn); Lock::GlobalWrite lk(txn->lockState()); BSONObj comment = BSON( "msg" << "initiating set"); - newConfig->saveConfigLocally(comment); + newConfig->saveConfigLocally(txn, comment); log() << "replSet replSetInitiate config now saved locally. " "Should come online in about a minute." << rsLog; resultObj->append("info", diff --git a/src/mongo/db/repl/repl_set.h b/src/mongo/db/repl/repl_set.h index 8b2c9f83f12..57d2b715a0a 100644 --- a/src/mongo/db/repl/repl_set.h +++ b/src/mongo/db/repl/repl_set.h @@ -36,7 +36,7 @@ namespace repl { class ReplSet : public ReplSetImpl { public: - static ReplSet* make(ReplSetSeedList& replSetSeedList); + static ReplSet* make(OperationContext* txn, ReplSetSeedList& replSetSeedList); virtual ~ReplSet() {} // for the replSetStepDown command @@ -78,7 +78,7 @@ namespace repl { * The slaves are updated when they get a heartbeat indicating the new * config. The comment is a no-op. */ - void haveNewConfig(ReplSetConfig& c, bool comment); + void haveNewConfig(OperationContext* txn, ReplSetConfig& c, bool comment); /** * Pointer assignment isn't necessarily atomic, so this needs to assure diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index 000d4065ed0..0de50e9ddf4 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -84,7 +84,7 @@ namespace repl { } } - void ReplSetImpl::goStale(const Member* stale, const BSONObj& oldest) { + void ReplSetImpl::goStale(OperationContext* txn, const Member* stale, const BSONObj& oldest) { log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog; log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; @@ -94,26 +94,25 @@ namespace repl { << rsLog; // reset minvalid so that we can't become primary prematurely - setMinValid(oldest); + setMinValid(txn, oldest); sethbmsg("error RS102 too stale to catch up"); changeState(MemberState::RS_RECOVERING); } namespace { - void dropAllTempCollections() { + static void dropAllTempCollections(OperationContext* txn) { vector<string> dbNames; globalStorageEngine->listDatabases( &dbNames ); - OperationContextImpl txn; for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { // The local db is special because it isn't replicated. It is cleared at startup even on // replica set members. if (*it == "local") continue; - Client::Context ctx(&txn, *it); - ctx.db()->clearTmpCollections(&txn); + Client::Context ctx(txn, *it); + ctx.db()->clearTmpCollections(txn); } } } @@ -142,7 +141,7 @@ namespace { // This must be done after becoming primary but before releasing the write lock. This adds // the dropCollection entries for every temp collection to the opLog since we want it to be // replicated to secondaries. - dropAllTempCollections(); + dropAllTempCollections(&txn); } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } @@ -383,7 +382,7 @@ namespace { b.append("me", myConfig().h.toString()); } - void ReplSetImpl::init(ReplSetSeedList& replSetSeedList) { + void ReplSetImpl::init(OperationContext* txn, ReplSetSeedList& replSetSeedList) { mgr = new Manager(this); _cfg = 0; @@ -396,7 +395,7 @@ namespace { LOG(1) << "replSet beginning startup..." << rsLog; - loadConfig(); + loadConfig(txn); unsigned sss = replSetSeedList.seedSet.size(); for (Member *m = head(); m; m = m->next()) { @@ -448,11 +447,10 @@ namespace { _indexPrefetchConfig(PREFETCH_ALL) { } - void ReplSetImpl::loadLastOpTimeWritten(bool quiet) { - OperationContextImpl txn; // XXX? - Lock::DBRead lk(txn.lockState(), rsoplog); + void ReplSetImpl::loadLastOpTimeWritten(OperationContext* txn, bool quiet) { + Lock::DBRead lk(txn->lockState(), rsoplog); BSONObj o; - if (Helpers::getLast(&txn, rsoplog, o)) { + if (Helpers::getLast(txn, rsoplog, o)) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull()); @@ -470,9 +468,11 @@ namespace { // call after constructing to start - returns fairly quickly after launching its threads void ReplSetImpl::_go() { + OperationContextImpl txn; + indexRebuilder.wait(); try { - loadLastOpTimeWritten(); + loadLastOpTimeWritten(&txn); } catch (std::exception& e) { log() << "replSet error fatal couldn't query the local " << rsoplog @@ -487,7 +487,7 @@ namespace { bool meEnsured = false; while (!inShutdown() && !meEnsured) { try { - theReplSet->syncSourceFeedback.ensureMe(); + theReplSet->syncSourceFeedback.ensureMe(&txn); meEnsured = true; } catch (const DBException& e) { @@ -513,7 +513,7 @@ namespace { // @param reconf true if this is a reconfiguration and not an initial load of the configuration. // @return true if ok; throws if config really bad; false if config doesn't include self - bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { + bool ReplSetImpl::initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf) { // NOTE: haveNewConfig() writes the new config to disk before we get here. So // we cannot error out at this point, except fatally. Check errors earlier. lock lk(this); @@ -580,7 +580,7 @@ namespace { log() << "replSet info self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - loadConfig(); // redo config from scratch + loadConfig(txn); // redo config from scratch return false; } uassert(13302, "replSet error self appears twice in the repl set configuration", me<=1); @@ -718,7 +718,7 @@ namespace { } // Our own config must be the first one. - bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig*>& cfgs) { + bool ReplSetImpl::_loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& cfgs) { int v = -1; ReplSetConfig *highest = 0; int myVersion = -2000; @@ -734,18 +734,18 @@ namespace { } verify(highest); - if (!initFromConfig(*highest)) + if (!initFromConfig(txn, *highest)) return false; if (highest->version > myVersion && highest->version >= 0) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; - highest->saveConfigLocally(BSONObj()); + highest->saveConfigLocally(txn, BSONObj()); } return true; } - void ReplSetImpl::loadConfig() { + void ReplSetImpl::loadConfig(OperationContext* txn) { startupStatus = LOADINGCONFIG; startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); LOG(1) << "loadConfig() " << rsConfigNs << endl; @@ -841,7 +841,7 @@ namespace { continue; } - if (!_loadConfigFinish(configs.mutableVector())) { + if (!_loadConfigFinish(txn, configs.mutableVector())) { log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try " "again." << rsLog; sleepsecs(20); @@ -871,19 +871,17 @@ namespace { const char* ReplSetImpl::_initialSyncFlagString = "doingInitialSync"; const BSONObj ReplSetImpl::_initialSyncFlag(BSON(_initialSyncFlagString << true)); - void ReplSetImpl::clearInitialSyncFlag() { - OperationContextImpl txn; // XXX? - Lock::DBWrite lk(txn.lockState(), "local"); - WriteUnitOfWork wunit(txn.recoveryUnit()); - Helpers::putSingleton(&txn, "local.replset.minvalid", BSON("$unset" << _initialSyncFlag)); + void ReplSetImpl::clearInitialSyncFlag(OperationContext* txn) { + Lock::DBWrite lk(txn->lockState(), "local"); + WriteUnitOfWork wunit(txn->recoveryUnit()); + Helpers::putSingleton(txn, "local.replset.minvalid", BSON("$unset" << _initialSyncFlag)); wunit.commit(); } - void ReplSetImpl::setInitialSyncFlag() { - OperationContextImpl txn; // XXX? - Lock::DBWrite lk(txn.lockState(), "local"); - WriteUnitOfWork wunit(txn.recoveryUnit()); - Helpers::putSingleton(&txn, "local.replset.minvalid", BSON("$set" << _initialSyncFlag)); + void ReplSetImpl::setInitialSyncFlag(OperationContext* txn) { + Lock::DBWrite lk(txn->lockState(), "local"); + WriteUnitOfWork wunit(txn->recoveryUnit()); + Helpers::putSingleton(txn, "local.replset.minvalid", BSON("$set" << _initialSyncFlag)); wunit.commit(); } @@ -897,24 +895,22 @@ namespace { return false; } - void ReplSetImpl::setMinValid(BSONObj obj) { + void ReplSetImpl::setMinValid(OperationContext* txn, BSONObj obj) { BSONObjBuilder builder; BSONObjBuilder subobj(builder.subobjStart("$set")); subobj.appendTimestamp("ts", obj["ts"].date()); subobj.done(); - OperationContextImpl txn; // XXX? - Lock::DBWrite lk(txn.lockState(), "local"); - WriteUnitOfWork wunit(txn.recoveryUnit()); - Helpers::putSingleton(&txn, "local.replset.minvalid", builder.obj()); + Lock::DBWrite lk(txn->lockState(), "local"); + WriteUnitOfWork wunit(txn->recoveryUnit()); + Helpers::putSingleton(txn, "local.replset.minvalid", builder.obj()); wunit.commit(); } - OpTime ReplSetImpl::getMinValid() { - OperationContextImpl txn; // XXX? - Lock::DBRead lk(txn.lockState(), "local.replset.minvalid"); + OpTime ReplSetImpl::getMinValid(OperationContext* txn) { + Lock::DBRead lk(txn->lockState(), "local.replset.minvalid"); BSONObj mv; - if (Helpers::getSingleton(&txn, "local.replset.minvalid", mv)) { + if (Helpers::getSingleton(txn, "local.replset.minvalid", mv)) { return mv["ts"]._opTime(); } return OpTime(); diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index cbeef0fe08e..5b010e65d50 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -103,7 +103,7 @@ namespace repl { const Member* getMemberToSyncTo(); void veto(const string& host, unsigned secs=10); bool gotForceSync(); - void goStale(const Member* m, const BSONObj& o); + void goStale(OperationContext* txn, const Member* m, const BSONObj& o); OID getElectionId() const { return elect.getElectionId(); } OpTime getElectionTime() const { return elect.getElectionTime(); } @@ -120,7 +120,7 @@ namespace repl { bool _freeze(int secs); private: void _assumePrimary(); - void loadLastOpTimeWritten(bool quiet=false); + void loadLastOpTimeWritten(OperationContext* txn, bool quiet = false); void changeState(MemberState s); Member* _forceSyncTarget; @@ -190,7 +190,7 @@ namespace repl { * - intentionally leaks the old _cfg and any old _members (if the * change isn't strictly additive) */ - bool initFromConfig(ReplSetConfig& c, bool reconf=false); + bool initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf = false); void _fillIsMaster(BSONObjBuilder&); void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); const ReplSetConfig& config() { return *_cfg; } @@ -214,13 +214,13 @@ namespace repl { * Finds the configuration with the highest version number and attempts * load it. */ - bool _loadConfigFinish(vector<ReplSetConfig*>& v); + bool _loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& v); /** * Gather all possible configs (from command line seeds, our own config * doc, and any hosts listed therein) and try to initiate from the most * recent config we find. */ - void loadConfig(); + void loadConfig(OperationContext* txn); list<HostAndPort> memberHostnames() const; bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } @@ -235,7 +235,7 @@ namespace repl { ReplSetImpl(); /* throws exception if a problem initializing. */ - void init(ReplSetSeedList&); + void init(OperationContext* txn, ReplSetSeedList&); void setSelfTo(Member *); // use this as it sets buildIndexes var private: @@ -268,7 +268,7 @@ namespace repl { /** * Cause the node to resync from scratch. */ - bool resync(std::string& errmsg); + bool resync(OperationContext* txn, std::string& errmsg); private: void _getTargets(list<Target>&, int &configVersion); void getTargets(list<Target>&, int &configVersion); @@ -281,7 +281,7 @@ namespace repl { private: bool _syncDoInitialSync_clone(OperationContext* txn, Cloner &cloner, const char *master, const list<string>& dbs, bool dataPass); - bool _syncDoInitialSync_applyToHead( SyncTail& syncer, OplogReader* r , + bool _syncDoInitialSync_applyToHead( OperationContext* txn, SyncTail& syncer, OplogReader* r , const Member* source, const BSONObj& lastOp, BSONObj& minValidOut); void _syncDoInitialSync(); @@ -335,11 +335,11 @@ namespace repl { * minValid, to indicate that we are in a consistent state when the batch has been fully * applied. */ - static void setMinValid(BSONObj obj); - static OpTime getMinValid(); - static void clearInitialSyncFlag(); + static void setMinValid(OperationContext* txn, BSONObj obj); + static OpTime getMinValid(OperationContext* txn); + static void clearInitialSyncFlag(OperationContext* txn); static bool getInitialSyncFlag(); - static void setInitialSyncFlag(); + static void setInitialSyncFlag(OperationContext* txn); int oplogVersion; diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp index 431050b3551..d3349f42eb9 100644 --- a/src/mongo/db/repl/resync.cpp +++ b/src/mongo/db/repl/resync.cpp @@ -79,7 +79,7 @@ namespace repl { errmsg = "primaries cannot resync"; return false; } - return theReplSet->resync(errmsg); + return theReplSet->resync(txn, errmsg); } // below this comment pertains only to master/slave replication diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index a007b80f66d..ed2f98329c7 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -34,6 +34,7 @@ #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" +#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/connections.h" #include "mongo/db/repl/repl_set_impl.h" @@ -57,21 +58,21 @@ namespace repl { ReplSet::ReplSet() { } - ReplSet* ReplSet::make(ReplSetSeedList& replSetSeedList) { + ReplSet* ReplSet::make(OperationContext* txn, ReplSetSeedList& replSetSeedList) { auto_ptr<ReplSet> ret(new ReplSet()); - ret->init(replSetSeedList); + ret->init(txn, replSetSeedList); return ret.release(); } ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; DiagStr ReplSetImpl::startupStatusMsg; - void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { + void ReplSet::haveNewConfig(OperationContext* txn, ReplSetConfig& newConfig, bool addComment) { bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); - newConfig.saveConfigLocally(comment); + newConfig.saveConfigLocally(txn, comment); try { BSONObj oldConfForAudit = config().asBson(); @@ -79,7 +80,7 @@ namespace repl { audit::logReplSetReconfig(ClientBasic::getCurrent(), &oldConfForAudit, &newConfForAudit); - if (initFromConfig(newConfig, true)) { + if (initFromConfig(txn, newConfig, true)) { log() << "replSet replSetReconfig new config saved locally" << rsLog; } } @@ -94,10 +95,12 @@ namespace repl { } void Manager::msgReceivedNewConfig(BSONObj o) { + OperationContextImpl txn; + log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog; scoped_ptr<ReplSetConfig> config(ReplSetConfig::make(o)); if( config->version > rs->config().version ) - theReplSet->haveNewConfig(*config, false); + theReplSet->haveNewConfig(&txn, *config, false); else { log() << "replSet info msgReceivedNewConfig but version isn't higher " << config->version << ' ' << rs->config().version << rsLog; @@ -111,13 +114,15 @@ namespace repl { */ void startReplSets(ReplSetSeedList *replSetSeedList) { Client::initThread("rsStart"); + OperationContextImpl txn; + try { verify( theReplSet == 0 ); if( replSetSeedList == 0 ) { return; } replLocalAuth(); - (theReplSet = ReplSet::make(*replSetSeedList))->go(); + (theReplSet = ReplSet::make(&txn, *replSetSeedList))->go(); } catch(std::exception& e) { log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp index 0867776d71e..65b70dd9c01 100644 --- a/src/mongo/db/repl/rs_config.cpp +++ b/src/mongo/db/repl/rs_config.cpp @@ -79,7 +79,7 @@ namespace { } /* comment MUST only be set when initiating the set by the initiator */ - void ReplSetConfig::saveConfigLocally(bo comment) { + void ReplSetConfig::saveConfigLocally(OperationContext* txn, bo comment) { checkRsConfig(); BSONObj newConfigBSON = asBson(); @@ -87,17 +87,16 @@ namespace { log() << "replSet info saving a newer config version to local.system.replset: " << newConfigBSON << rsLog; { - OperationContextImpl txn; - Client::WriteContext cx(&txn, rsConfigNs); + Client::WriteContext cx(txn, rsConfigNs); //theReplSet->lastOpTimeWritten = ??; //rather than above, do a logOp()? probably - Helpers::putSingletonGod(&txn, + Helpers::putSingletonGod(txn, rsConfigNs.c_str(), newConfigBSON, false/*logOp=false; local db so would work regardless...*/); if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) ) - logOpInitiate(&txn, comment); + logOpInitiate(txn, comment); cx.commit(); } log() << "replSet saveConfigLocally done" << rsLog; diff --git a/src/mongo/db/repl/rs_config.h b/src/mongo/db/repl/rs_config.h index ac2e8a5ac7e..f86dbb13000 100644 --- a/src/mongo/db/repl/rs_config.h +++ b/src/mongo/db/repl/rs_config.h @@ -38,6 +38,9 @@ #include "mongo/util/net/hostandport.h" namespace mongo { + + class OperationContext; + namespace repl { class Member; const std::string rsConfigNs = "local.system.replset"; @@ -157,7 +160,7 @@ namespace repl { * 3. If 'comment' isn't empty and we're a primary or not yet initiated, log an 'n' op * to the oplog. This is important because it establishes our lastOpWritten time. */ - void saveConfigLocally(BSONObj comment); // to local db + void saveConfigLocally(OperationContext* txn, BSONObj comment); // to local db /** * Update members' groups when the config changes but members stay the same. diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 0ca30eabd27..a9c03f72bb2 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -68,7 +68,10 @@ namespace repl { void ReplSetImpl::syncDoInitialSync() { static const int maxFailedAttempts = 10; - createOplog(); + + OperationContextImpl txn; + createOplog(&txn); + int failedAttempts = 0; while ( failedAttempts < maxFailedAttempts ) { try { @@ -133,20 +136,17 @@ namespace repl { return true; } - void _logOpObjRS(const BSONObj& op); - - static void emptyOplog() { - OperationContextImpl txn; - Client::WriteContext ctx(&txn, rsoplog); + static void emptyOplog(OperationContext* txn) { + Client::WriteContext ctx(txn, rsoplog); - Collection* collection = ctx.ctx().db()->getCollection(&txn, rsoplog); + Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog); // temp if( collection->numRecords() == 0 ) return; // already empty, ok. LOG(1) << "replSet empty oplog" << rsLog; - uassertStatusOK( collection->truncate(&txn) ); + uassertStatusOK( collection->truncate(txn) ); ctx.commit(); } @@ -283,7 +283,7 @@ namespace repl { * this function syncs to this value (inclusive) * @return if applying the oplog succeeded */ - bool ReplSetImpl::_syncDoInitialSync_applyToHead( SyncTail& syncer, OplogReader* r, + bool ReplSetImpl::_syncDoInitialSync_applyToHead( OperationContext* txn, SyncTail& syncer, OplogReader* r, const Member* source, const BSONObj& lastOp , BSONObj& minValid ) { /* our cloned copy will be strange until we apply oplog events that occurred @@ -317,12 +317,12 @@ namespace repl { // apply startingTS..mvoptime portion of the oplog { try { - minValid = syncer.oplogApplication(lastOp, minValid); + minValid = syncer.oplogApplication(txn, lastOp, minValid); } catch (const DBException&) { log() << "replSet initial sync failed during oplog application phase" << rsLog; - emptyOplog(); // otherwise we'll be up! + emptyOplog(txn); // otherwise we'll be up! lastOpTimeWritten = OpTime(); lastH = 0; @@ -403,12 +403,12 @@ namespace repl { log() << "fastsync: skipping database clone" << rsLog; // prime oplog - init.oplogApplication(lastOp, lastOp); + init.oplogApplication(&txn, lastOp, lastOp); return; } else { // Add field to minvalid document to tell us to restart initial sync if we crash - theReplSet->setInitialSyncFlag(); + theReplSet->setInitialSyncFlag(&txn); sethbmsg("initial sync drop all databases", 0); dropAllDatabasesExceptLocal(&txn); @@ -427,7 +427,7 @@ namespace repl { sethbmsg("initial sync data copy, starting syncup",0); log() << "oplog sync 1 of 3" << endl; - if ( ! _syncDoInitialSync_applyToHead( init, &r , source , lastOp , minValid ) ) { + if (!_syncDoInitialSync_applyToHead(&txn, init, &r, source, lastOp, minValid)) { return; } @@ -437,7 +437,7 @@ namespace repl { // that were "from the future" compared with minValid. During this second application, // nothing should need to be recloned. log() << "oplog sync 2 of 3" << endl; - if (!_syncDoInitialSync_applyToHead(tail, &r , source , lastOp , minValid)) { + if (!_syncDoInitialSync_applyToHead(&txn, tail, &r, source, lastOp, minValid)) { return; } // data should now be consistent @@ -453,7 +453,7 @@ namespace repl { } log() << "oplog sync 3 of 3" << endl; - if (!_syncDoInitialSync_applyToHead(tail, &r, source, lastOp, minValid)) { + if (!_syncDoInitialSync_applyToHead(&txn, tail, &r, source, lastOp, minValid)) { return; } @@ -479,10 +479,10 @@ namespace repl { // Initial sync is now complete. Flag this by setting minValid to the last thing // we synced. - theReplSet->setMinValid(minValid); + theReplSet->setMinValid(&txn, minValid); // Clear the initial sync flag. - theReplSet->clearInitialSyncFlag(); + theReplSet->clearInitialSyncFlag(&txn); cx.commit(); } { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 4f7f31c81bf..445183fc5c9 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -427,7 +427,7 @@ namespace repl { // we have items we are writing that aren't from a point-in-time. thus best not to come // online until we get to that point in freshness. log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; - setMinValid(newMinValid); + setMinValid(txn, newMinValid); // any full collection resyncs required? if (!fixUpInfo.collectionsToResync.empty()) { @@ -473,7 +473,7 @@ namespace repl { else { log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; - setMinValid(newMinValid); + setMinValid(txn, newMinValid); } } catch (DBException& e) { @@ -669,7 +669,7 @@ namespace repl { } // reset cached lastoptimewritten and h value - loadLastOpTimeWritten(); + loadLastOpTimeWritten(txn); // done if (warn) diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index e6ef65ba88f..1b1860f6e72 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -93,7 +93,7 @@ namespace repl { return false; } - minvalid = getMinValid(); + minvalid = getMinValid(txn); if( minvalid <= lastOpTimeWritten ) { golive=true; } @@ -223,13 +223,12 @@ namespace repl { tail.oplogApplication(); } - bool ReplSetImpl::resync(string& errmsg) { + bool ReplSetImpl::resync(OperationContext* txn, string& errmsg) { changeState(MemberState::RS_RECOVERING); - OperationContextImpl txn; - Client::Context ctx(&txn, "local"); + Client::Context ctx(txn, "local"); - ctx.db()->dropCollection(&txn, "local.oplog.rs"); + ctx.db()->dropCollection(txn, "local.oplog.rs"); { boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); theReplSet->initialSyncRequested = true; diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 20df8d80397..9bcd8b6a412 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -40,7 +40,7 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rs.h" // theReplSet -#include "mongo/db/operation_context_impl.h" +#include "mongo/db/operation_context.h" #include "mongo/util/log.h" namespace mongo { @@ -70,26 +70,25 @@ namespace repl { return authenticateInternalUser(_connection.get()); } - void SyncSourceFeedback::ensureMe() { + void SyncSourceFeedback::ensureMe(OperationContext* txn) { string myname = getHostName(); { - OperationContextImpl txn; - Client::WriteContext ctx(&txn, "local"); + Client::WriteContext ctx(txn, "local"); // local.me is an identifier for a server for getLastError w:2+ - if (!Helpers::getSingleton(&txn, "local.me", _me) || + if (!Helpers::getSingleton(txn, "local.me", _me) || !_me.hasField("host") || _me["host"].String() != myname) { // clean out local.me - Helpers::emptyCollection(&txn, "local.me"); + Helpers::emptyCollection(txn, "local.me"); // repopulate BSONObjBuilder b; b.appendOID("_id", 0, true); b.append("host", myname); _me = b.obj(); - Helpers::putSingleton(&txn, "local.me", _me); + Helpers::putSingleton(txn, "local.me", _me); } ctx.commit(); // _me is used outside of a read lock, so we must copy it out of the mmap diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index 3225620afa9..371912b26c8 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -36,6 +36,9 @@ #include "mongo/util/log.h" namespace mongo { + + class OperationContext; + namespace repl { class Member; @@ -53,7 +56,7 @@ namespace repl { void associateMember(const OID& rid, Member* member); /// Ensures local.me is populated and populates it if not. - void ensureMe(); + void ensureMe(OperationContext* txn); /// Notifies the SyncSourceFeedbackThread to wake up and send a handshake up the replication /// chain, upon receiving a handshake. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 7665c995102..345a2c60d2e 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -274,7 +274,9 @@ namespace repl { return lastOp; } - BSONObj SyncTail::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) { + BSONObj SyncTail::oplogApplication(OperationContext* txn, + const BSONObj& applyGTEObj, + const BSONObj& minValidObj) { return oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply); } @@ -296,6 +298,7 @@ namespace repl { void SyncTail::oplogApplication() { while( 1 ) { OpQueue ops; + OperationContextImpl txn; Timer batchTimer; int lastTimeChecked = 0; @@ -332,8 +335,6 @@ namespace repl { // become primary if (!theReplSet->isSecondary()) { OpTime minvalid; - - OperationContextImpl txn; theReplSet->tryToGoLiveAsASecondary(&txn, minvalid); } @@ -385,7 +386,7 @@ namespace repl { // Set minValid to the last op to be applied in this next batch. // This will cause this node to go into RECOVERING state // if we should crash and restart before updating the oplog - theReplSet->setMinValid(lastOp); + theReplSet->setMinValid(&txn, lastOp); if (BackgroundSync::get()->isAssumingPrimary()) { LOG(1) << "about to apply batch up to optime: " @@ -492,7 +493,7 @@ namespace repl { while (!ops->empty()) { const BSONObj& op = ops->front(); // this updates theReplSet->lastOpTimeWritten - _logOpObjRS(op); + _logOpObjRS(&txn, op); ops->pop_front(); } wunit.commit(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index ea9995aaed2..c24ab958ac3 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -75,7 +75,9 @@ namespace repl { /** * Runs oplogApplySegment without allowing recloning documents. */ - virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj); + virtual BSONObj oplogApplication(OperationContext* txn, + const BSONObj& applyGTEObj, + const BSONObj& minValidObj); void oplogApplication(); bool peek(BSONObj* obj); |