summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2014-07-17 17:03:15 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2014-07-18 15:16:14 -0400
commit3c3d656668e26645492ee3dafb241631352426d4 (patch)
treef69bcbf52ad1ef98aba01cdcb61b86f44e34d4ca /src/mongo/db/repl
parent848999f1527e8390d4d76f9fad0860218b73be4d (diff)
downloadmongo-3c3d656668e26645492ee3dafb241631352426d4.tar.gz
SERVER-13961 Cleanup some nested instantiations of OperationContextImpl in repl
These are causing deadlocks if LockState is removed from TLS because of pseudo-conflicting locks on the same code path.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/bgsync.cpp13
-rw-r--r--src/mongo/db/repl/bgsync.h4
-rw-r--r--src/mongo/db/repl/initial_sync.cpp9
-rw-r--r--src/mongo/db/repl/initial_sync.h4
-rw-r--r--src/mongo/db/repl/master_slave.cpp94
-rw-r--r--src/mongo/db/repl/master_slave.h9
-rw-r--r--src/mongo/db/repl/oplog.cpp34
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp11
-rw-r--r--src/mongo/db/repl/repl_set.h4
-rw-r--r--src/mongo/db/repl/repl_set_impl.cpp78
-rw-r--r--src/mongo/db/repl/repl_set_impl.h24
-rw-r--r--src/mongo/db/repl/resync.cpp2
-rw-r--r--src/mongo/db/repl/rs.cpp19
-rw-r--r--src/mongo/db/repl/rs_config.cpp9
-rw-r--r--src/mongo/db/repl/rs_config.h5
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp36
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp6
-rw-r--r--src/mongo/db/repl/rs_sync.cpp9
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp13
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h5
-rw-r--r--src/mongo/db/repl/sync_tail.cpp11
-rw-r--r--src/mongo/db/repl/sync_tail.h4
23 files changed, 209 insertions, 198 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index beb4904428b..6eba92a1625 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
@@ -153,6 +154,8 @@ namespace repl {
}
void BackgroundSync::_producerThread() {
+ OperationContextImpl txn;
+
MemberState state = theReplSet->state();
// we want to pause when the state changes to primary
@@ -180,16 +183,16 @@ namespace repl {
start();
}
- produce();
+ produce(&txn);
}
- void BackgroundSync::produce() {
+ void BackgroundSync::produce(OperationContext* txn) {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
OplogReader r;
OpTime lastOpTimeFetched;
// find a target to sync from the last op time written
- getOplogReader(r);
+ getOplogReader(txn, r);
// no server found
{
@@ -365,7 +368,7 @@ namespace repl {
return true;
}
- void BackgroundSync::getOplogReader(OplogReader& r) {
+ void BackgroundSync::getOplogReader(OperationContext* txn, OplogReader& r) {
const Member *target = NULL, *stale = NULL;
BSONObj oldest;
@@ -419,7 +422,7 @@ namespace repl {
// the only viable sync target was stale
if (stale) {
- theReplSet->goStale(stale, oldest);
+ theReplSet->goStale(txn, stale, oldest);
sleepsecs(120);
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 9539d11aa0f..e6e701de237 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -98,10 +98,10 @@ namespace repl {
// Production thread
void _producerThread();
// Adds elements to the list, up to maxSize.
- void produce();
+ void produce(OperationContext* txn);
// Check if rollback is necessary
bool isRollbackRequired(OplogReader& r);
- void getOplogReader(OplogReader& r);
+ void getOplogReader(OperationContext* txn, OplogReader& r);
// Evaluate if the current sync target is still good
bool shouldChangeSyncTarget();
// check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp.
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index 794f010fea3..19822bbc75b 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -45,7 +45,9 @@ namespace repl {
/* initial oplog application, during initial sync, after cloning.
*/
- BSONObj InitialSync::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) {
+ BSONObj InitialSync::oplogApplication(OperationContext* txn,
+ const BSONObj& applyGTEObj,
+ const BSONObj& minValidObj) {
if (replSetForceInitialSyncFailure > 0) {
log() << "replSet test code invoked, forced InitialSync failure: "
<< replSetForceInitialSyncFailure << rsLog;
@@ -54,9 +56,8 @@ namespace repl {
}
// create the initial oplog entry
- OperationContextImpl txn;
- syncApply(&txn, applyGTEObj);
- _logOpObjRS(applyGTEObj);
+ syncApply(txn, applyGTEObj);
+ _logOpObjRS(txn, applyGTEObj);
return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply);
}
diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h
index 3ce40226bab..90f4e9dacea 100644
--- a/src/mongo/db/repl/initial_sync.h
+++ b/src/mongo/db/repl/initial_sync.h
@@ -47,7 +47,9 @@ namespace repl {
* Creates the initial oplog entry: applies applyGTEObj and writes it to the oplog. Then
* this runs oplogApplySegment allowing recloning documents.
*/
- BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj);
+ BSONObj oplogApplication(OperationContext* txn,
+ const BSONObj& applyGTEObj,
+ const BSONObj& minValidObj);
};
} // namespace repl
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index ca6e2cc2e1d..0e216cdd05b 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -93,12 +93,12 @@ namespace repl {
};
- ReplSource::ReplSource() {
+ ReplSource::ReplSource(OperationContext* txn) {
nClonedThisPass = 0;
- ensureMe();
+ ensureMe(txn);
}
- ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) {
+ ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) {
only = o.getStringField("only");
hostName = o.getStringField("host");
_sourceName = o.getStringField("source");
@@ -132,7 +132,7 @@ namespace repl {
incompleteCloneDbs.insert( e.fieldName() );
}
}
- ensureMe();
+ ensureMe(txn);
}
/* Turn our C++ Source object into a BSONObj */
@@ -166,32 +166,30 @@ namespace repl {
return b.obj();
}
- void ReplSource::ensureMe() {
+ void ReplSource::ensureMe(OperationContext* txn) {
string myname = getHostName();
bool exists = false;
{
- OperationContextImpl txn;
- Client::ReadContext ctx(&txn, "local");
+ Client::ReadContext ctx(txn, "local");
// local.me is an identifier for a server for getLastError w:2+
- exists = Helpers::getSingleton(&txn, "local.me", _me);
+ exists = Helpers::getSingleton(txn, "local.me", _me);
}
if (!exists || !_me.hasField("host") || _me["host"].String() != myname) {
- OperationContextImpl txn;
- Client::WriteContext ctx(&txn, "local");
+ Client::WriteContext ctx(txn, "local");
// clean out local.me
- Helpers::emptyCollection(&txn, "local.me");
+ Helpers::emptyCollection(txn, "local.me");
// repopulate
BSONObjBuilder b;
b.appendOID("_id", 0, true);
b.append("host", myname);
_me = b.obj();
- Helpers::putSingleton(&txn, "local.me", _me);
+ Helpers::putSingleton(txn, "local.me", _me);
ctx.commit();
}
}
- void ReplSource::save() {
+ void ReplSource::save(OperationContext* txn) {
BSONObjBuilder b;
verify( !hostName.empty() );
b.append("host", hostName);
@@ -204,10 +202,9 @@ namespace repl {
LOG( 1 ) << "Saving repl source: " << o << endl;
{
- OperationContextImpl txn;
OpDebug debug;
- Client::Context ctx(&txn, "local.sources");
+ Client::Context ctx(txn, "local.sources");
const NamespaceString requestNs("local.sources");
UpdateRequest request(requestNs);
@@ -216,14 +213,17 @@ namespace repl {
request.setUpdates(o);
request.setUpsert();
- UpdateResult res = update(&txn, ctx.db(), request, &debug);
+ UpdateResult res = update(txn, ctx.db(), request, &debug);
verify( ! res.modifiers );
verify( res.numMatched == 1 );
}
}
- static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) {
+ static void addSourceToList(OperationContext* txn,
+ ReplSource::SourceVector &v,
+ ReplSource& s,
+ ReplSource::SourceVector &old) {
if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync.
for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) {
if ( s == **i ) {
@@ -261,7 +261,7 @@ namespace repl {
Runner::RunnerState state;
while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) {
n++;
- ReplSource tmp(obj);
+ ReplSource tmp(txn, obj);
if (tmp.hostName != replSettings.source) {
log() << "repl: --source " << replSettings.source << " != " << tmp.hostName
<< " from local.sources collection" << endl;
@@ -283,10 +283,10 @@ namespace repl {
uassert( 10002 , "local.sources collection corrupt?", n<2 );
if ( n == 0 ) {
// source missing. add.
- ReplSource s;
+ ReplSource s(txn);
s.hostName = replSettings.source;
s.only = replSettings.only;
- s.save();
+ s.save(txn);
}
}
else {
@@ -305,7 +305,7 @@ namespace repl {
BSONObj obj;
Runner::RunnerState state;
while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) {
- ReplSource tmp(obj);
+ ReplSource tmp(txn, obj);
if ( tmp.syncedTo.isNull() ) {
DBDirectClient c(txn);
if ( c.exists( "local.oplog.$main" ) ) {
@@ -315,7 +315,7 @@ namespace repl {
}
}
}
- addSourceToList(v, tmp, old);
+ addSourceToList(txn, v, tmp, old);
}
uassert(17066, "Internal error reading from local.sources", Runner::RUNNER_EOF == state);
}
@@ -371,7 +371,7 @@ namespace repl {
}
syncedTo = OpTime();
addDbNextPass.clear();
- save();
+ save(txn);
}
void ReplSource::resyncDrop( OperationContext* txn, const string& db ) {
@@ -674,14 +674,14 @@ namespace repl {
if ( incompleteClone ) {
log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl;
}
- save();
+ save(txn);
Client::Context ctx(txn, ns);
nClonedThisPass++;
resync(txn, ctx.db()->name());
addDbNextPass.erase(clientName);
incompleteCloneDbs.erase( clientName );
}
- save();
+ save(txn);
}
else {
applyOperation(txn, ctx.db(), op);
@@ -789,7 +789,7 @@ namespace repl {
// obviously global isn't ideal, but non-repl set is old so
// keeping it simple
Lock::GlobalWrite lk(txn->lockState());
- save();
+ save(txn);
}
BSONObjBuilder gte;
@@ -841,7 +841,7 @@ namespace repl {
}
{
Lock::GlobalWrite lk(txn->lockState());
- save();
+ save(txn);
}
return okResultCode;
}
@@ -919,7 +919,7 @@ namespace repl {
}
syncedTo = nextOpTime;
- save(); // note how far we are synced up to now
+ save(txn); // note how far we are synced up to now
log() << "repl: applied " << n << " operations" << endl;
nApplied = n;
log() << "repl: end sync_pullOpLog syncedTo: " << syncedTo.toStringLong() << endl;
@@ -931,7 +931,7 @@ namespace repl {
Lock::GlobalWrite lk(txn->lockState());
syncedTo = nextOpTime;
// can't update local log ts since there are pending operations from our peer
- save();
+ save(txn);
log() << "repl: checkpoint applied " << n << " operations" << endl;
log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
saveLast = time(0);
@@ -972,7 +972,7 @@ namespace repl {
Lock::GlobalWrite lk(txn->lockState());
if ( n > 0 ) {
syncedTo = last;
- save();
+ save(txn);
}
log() << "repl: applied " << n << " operations" << endl;
log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
@@ -1047,12 +1047,11 @@ namespace repl {
0 = no sleep recommended
1 = special sentinel indicating adaptive sleep recommended
*/
- int _replMain(ReplSource::SourceVector& sources, int& nApplied) {
- OperationContextImpl txn;
+ int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) {
{
ReplInfo r("replMain load sources");
- Lock::GlobalWrite lk(txn.lockState());
- ReplSource::loadAll(&txn, sources);
+ Lock::GlobalWrite lk(txn->lockState());
+ ReplSource::loadAll(txn, sources);
// only need this param for initial reset
getGlobalReplicationCoordinator()->getSettings().fastsync = false;
@@ -1117,17 +1116,16 @@ namespace repl {
return sleepAdvice;
}
- static void replMain() {
+ static void replMain(OperationContext* txn) {
ReplSource::SourceVector sources;
while ( 1 ) {
int s = 0;
{
- OperationContextImpl txn;
- Lock::GlobalWrite lk(txn.lockState());
+ Lock::GlobalWrite lk(txn->lockState());
if ( replAllDead ) {
// throttledForceResyncDead can throw
if ( !getGlobalReplicationCoordinator()->getSettings().autoresync ||
- !ReplSource::throttledForceResyncDead( &txn, "auto" ) ) {
+ !ReplSource::throttledForceResyncDead( txn, "auto" ) ) {
log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl;
break;
}
@@ -1138,7 +1136,7 @@ namespace repl {
try {
int nApplied = 0;
- s = _replMain(sources, nApplied);
+ s = _replMain(txn, sources, nApplied);
if( s == 1 ) {
if( nApplied == 0 ) s = 2;
else if( nApplied > 100 ) {
@@ -1154,8 +1152,7 @@ namespace repl {
}
{
- LockState lockState;
- Lock::GlobalWrite lk(&lockState);
+ Lock::GlobalWrite lk(txn->lockState());
verify( syncing == 1 );
syncing--;
}
@@ -1215,15 +1212,16 @@ namespace repl {
sleepsecs(1);
Client::initThread("replslave");
+ OperationContextImpl txn;
+
{
- LockState lockState;
- Lock::GlobalWrite lk(&lockState);
+ Lock::GlobalWrite lk(txn.lockState());
replLocalAuth();
}
while ( 1 ) {
try {
- replMain();
+ replMain(&txn);
sleepsecs(5);
}
catch ( AssertionException& ) {
@@ -1244,6 +1242,7 @@ namespace repl {
}
void startMasterSlave() {
+ OperationContextImpl txn;
oldRepl();
@@ -1252,13 +1251,12 @@ namespace repl {
return;
{
- LockState lockState;
- Lock::GlobalWrite lk(&lockState);
+ Lock::GlobalWrite lk(txn.lockState());
replLocalAuth();
}
{
- ReplSource temp; // Ensures local.me is populated
+ ReplSource temp(&txn); // Ensures local.me is populated
}
if ( replSettings.slave ) {
@@ -1270,7 +1268,7 @@ namespace repl {
if ( replSettings.master ) {
LOG(1) << "master=true" << endl;
replSettings.master = true;
- createOplog();
+ createOplog(&txn);
boost::thread t(replMasterThread);
}
diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h
index 15fd74b2176..760f0d27c1d 100644
--- a/src/mongo/db/repl/master_slave.h
+++ b/src/mongo/db/repl/master_slave.h
@@ -119,7 +119,7 @@ namespace repl {
const char* db );
// populates _me so that it can be passed to oplogreader for handshakes
- void ensureMe();
+ void ensureMe(OperationContext* txn);
void forceResync(OperationContext* txn, const char *requester);
@@ -142,16 +142,17 @@ namespace repl {
typedef std::vector< shared_ptr< ReplSource > > SourceVector;
static void loadAll(OperationContext* txn, SourceVector&);
- explicit ReplSource(BSONObj);
+
+ explicit ReplSource(OperationContext* txn, BSONObj);
// This is not the constructor you are looking for. Always prefer the version that takes
// a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find
// out the process's RID in master/slave setups.
- ReplSource();
+ ReplSource(OperationContext* txn);
/* -1 = error */
int sync(int& nApplied);
- void save(); // write ourself to local.sources
+ void save(OperationContext* txn); // write ourself to local.sources
// make a jsobj from our member fields of the form
// { host: ..., source: ..., syncedTo: ... }
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 37c3057ec15..e911c19a852 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -118,29 +118,28 @@ namespace repl {
/** write an op to the oplog that is already built.
todo : make _logOpRS() call this so we don't repeat ourself?
*/
- void _logOpObjRS(const BSONObj& op) {
- OperationContextImpl txn;
- Lock::DBWrite lk(txn.lockState(), "local");
+ void _logOpObjRS(OperationContext* txn, const BSONObj& op) {
+ Lock::DBWrite lk(txn->lockState(), "local");
// XXX soon this needs to be part of an outer WUOW not its own.
// We can't do this yet due to locking limitations.
- WriteUnitOfWork wunit(txn.recoveryUnit());
+ WriteUnitOfWork wunit(txn->recoveryUnit());
const OpTime ts = op["ts"]._opTime();
long long h = op["h"].numberLong();
{
if ( localOplogRSCollection == 0 ) {
- Client::Context ctx(&txn, rsoplog);
+ Client::Context ctx(txn, rsoplog);
localDB = ctx.db();
verify( localDB );
- localOplogRSCollection = localDB->getCollection( &txn, rsoplog );
+ localOplogRSCollection = localDB->getCollection(txn, rsoplog);
massert(13389,
"local.oplog.rs missing. did you drop it? if so restart server",
localOplogRSCollection);
}
- Client::Context ctx(&txn, rsoplog, localDB);
- checkOplogInsert( localOplogRSCollection->insertDocument( &txn, op, false ) );
+ Client::Context ctx(txn, rsoplog, localDB);
+ checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
/* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
this code (or code in now() maybe) should be improved.
@@ -459,9 +458,8 @@ namespace repl {
}
}
- void createOplog() {
- OperationContextImpl txn;
- Lock::GlobalWrite lk(txn.lockState());
+ void createOplog(OperationContext* txn) {
+ Lock::GlobalWrite lk(txn->lockState());
const char * ns = "local.oplog.$main";
@@ -470,13 +468,13 @@ namespace repl {
if( rs )
ns = rsoplog;
- Client::Context ctx(&txn, ns);
- Collection* collection = ctx.db()->getCollection( &txn, ns );
+ Client::Context ctx(txn, ns);
+ Collection* collection = ctx.db()->getCollection(txn, ns );
if ( collection ) {
if (replSettings.oplogSize != 0) {
- int o = (int)(collection->getRecordStore()->storageSize(&txn) / ( 1024 * 1024 ) );
+ int o = (int)(collection->getRecordStore()->storageSize(txn) / ( 1024 * 1024 ) );
int n = (int)(replSettings.oplogSize / (1024 * 1024));
if ( n != o ) {
stringstream ss;
@@ -488,7 +486,7 @@ namespace repl {
if( rs ) return;
- initOpTimeFromOplog(&txn, ns);
+ initOpTimeFromOplog(txn, ns);
return;
}
@@ -527,10 +525,10 @@ namespace repl {
options.cappedSize = sz;
options.autoIndexId = CollectionOptions::NO;
- WriteUnitOfWork wunit(txn.recoveryUnit());
- invariant( ctx.db()->createCollection( &txn, ns, options ) );
+ WriteUnitOfWork wunit(txn->recoveryUnit());
+ invariant(ctx.db()->createCollection(txn, ns, options));
if( !rs )
- logOp( &txn, "n", "", BSONObj() );
+ logOp(txn, "n", "", BSONObj() );
wunit.commit();
/* sync here so we don't get any surprising lag later when we try to sync */
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 1c3067a4182..79574c8e6d3 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -48,11 +48,11 @@ namespace repl {
// Create a new capped collection for the oplog if it doesn't yet exist.
// This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave)
// If the collection already exists, set the 'last' OpTime if master/slave (side effect!)
- void createOplog();
+ void createOplog(OperationContext* txn);
// This poorly-named function writes an op into the replica-set oplog;
// used internally by replication secondaries after they have applied an op
- void _logOpObjRS(const BSONObj& op);
+ void _logOpObjRS(OperationContext* txn, const BSONObj& op);
const char rsoplog[] = "local.oplog.rs";
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index 9f68a8a7116..c8bb9bf3a50 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -36,7 +36,7 @@
#include "mongo/bson/optime.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/instance.h"
-#include "mongo/db/operation_context.h"
+#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/connections.h"
#include "mongo/db/repl/master_slave.h"
@@ -419,7 +419,8 @@ namespace {
if (mode == modeReplSet) {
return theReplSet->syncSourceFeedback.getMyRID();
} else if (mode == modeMasterSlave) {
- ReplSource source;
+ OperationContextImpl txn;
+ ReplSource source(&txn);
return source.getMyRID();
}
invariant(false); // Don't have an RID if no replication is enabled
@@ -614,7 +615,7 @@ namespace {
log() << "replSet replSetReconfig [2]" << rsLog;
- theReplSet->haveNewConfig(*newConfig, true);
+ theReplSet->haveNewConfig(txn, *newConfig, true);
ReplSet::startupStatusMsg.set("replSetReconfig'd");
}
catch(const DBException& e) {
@@ -753,11 +754,11 @@ namespace {
log() << "replSet replSetInitiate all members seem up" << rsLog;
- createOplog();
+ createOplog(txn);
Lock::GlobalWrite lk(txn->lockState());
BSONObj comment = BSON( "msg" << "initiating set");
- newConfig->saveConfigLocally(comment);
+ newConfig->saveConfigLocally(txn, comment);
log() << "replSet replSetInitiate config now saved locally. "
"Should come online in about a minute." << rsLog;
resultObj->append("info",
diff --git a/src/mongo/db/repl/repl_set.h b/src/mongo/db/repl/repl_set.h
index 8b2c9f83f12..57d2b715a0a 100644
--- a/src/mongo/db/repl/repl_set.h
+++ b/src/mongo/db/repl/repl_set.h
@@ -36,7 +36,7 @@ namespace repl {
class ReplSet : public ReplSetImpl {
public:
- static ReplSet* make(ReplSetSeedList& replSetSeedList);
+ static ReplSet* make(OperationContext* txn, ReplSetSeedList& replSetSeedList);
virtual ~ReplSet() {}
// for the replSetStepDown command
@@ -78,7 +78,7 @@ namespace repl {
* The slaves are updated when they get a heartbeat indicating the new
* config. The comment is a no-op.
*/
- void haveNewConfig(ReplSetConfig& c, bool comment);
+ void haveNewConfig(OperationContext* txn, ReplSetConfig& c, bool comment);
/**
* Pointer assignment isn't necessarily atomic, so this needs to assure
diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp
index 000d4065ed0..0de50e9ddf4 100644
--- a/src/mongo/db/repl/repl_set_impl.cpp
+++ b/src/mongo/db/repl/repl_set_impl.cpp
@@ -84,7 +84,7 @@ namespace repl {
}
}
- void ReplSetImpl::goStale(const Member* stale, const BSONObj& oldest) {
+ void ReplSetImpl::goStale(OperationContext* txn, const Member* stale, const BSONObj& oldest) {
log() << "replSet error RS102 too stale to catch up, at least from "
<< stale->fullName() << rsLog;
log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
@@ -94,26 +94,25 @@ namespace repl {
<< rsLog;
// reset minvalid so that we can't become primary prematurely
- setMinValid(oldest);
+ setMinValid(txn, oldest);
sethbmsg("error RS102 too stale to catch up");
changeState(MemberState::RS_RECOVERING);
}
namespace {
- void dropAllTempCollections() {
+ static void dropAllTempCollections(OperationContext* txn) {
vector<string> dbNames;
globalStorageEngine->listDatabases( &dbNames );
- OperationContextImpl txn;
for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
// The local db is special because it isn't replicated. It is cleared at startup even on
// replica set members.
if (*it == "local")
continue;
- Client::Context ctx(&txn, *it);
- ctx.db()->clearTmpCollections(&txn);
+ Client::Context ctx(txn, *it);
+ ctx.db()->clearTmpCollections(txn);
}
}
}
@@ -142,7 +141,7 @@ namespace {
// This must be done after becoming primary but before releasing the write lock. This adds
// the dropCollection entries for every temp collection to the opLog since we want it to be
// replicated to secondaries.
- dropAllTempCollections();
+ dropAllTempCollections(&txn);
}
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
@@ -383,7 +382,7 @@ namespace {
b.append("me", myConfig().h.toString());
}
- void ReplSetImpl::init(ReplSetSeedList& replSetSeedList) {
+ void ReplSetImpl::init(OperationContext* txn, ReplSetSeedList& replSetSeedList) {
mgr = new Manager(this);
_cfg = 0;
@@ -396,7 +395,7 @@ namespace {
LOG(1) << "replSet beginning startup..." << rsLog;
- loadConfig();
+ loadConfig(txn);
unsigned sss = replSetSeedList.seedSet.size();
for (Member *m = head(); m; m = m->next()) {
@@ -448,11 +447,10 @@ namespace {
_indexPrefetchConfig(PREFETCH_ALL) {
}
- void ReplSetImpl::loadLastOpTimeWritten(bool quiet) {
- OperationContextImpl txn; // XXX?
- Lock::DBRead lk(txn.lockState(), rsoplog);
+ void ReplSetImpl::loadLastOpTimeWritten(OperationContext* txn, bool quiet) {
+ Lock::DBRead lk(txn->lockState(), rsoplog);
BSONObj o;
- if (Helpers::getLast(&txn, rsoplog, o)) {
+ if (Helpers::getLast(txn, rsoplog, o)) {
lastH = o["h"].numberLong();
lastOpTimeWritten = o["ts"]._opTime();
uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull());
@@ -470,9 +468,11 @@ namespace {
// call after constructing to start - returns fairly quickly after launching its threads
void ReplSetImpl::_go() {
+ OperationContextImpl txn;
+
indexRebuilder.wait();
try {
- loadLastOpTimeWritten();
+ loadLastOpTimeWritten(&txn);
}
catch (std::exception& e) {
log() << "replSet error fatal couldn't query the local " << rsoplog
@@ -487,7 +487,7 @@ namespace {
bool meEnsured = false;
while (!inShutdown() && !meEnsured) {
try {
- theReplSet->syncSourceFeedback.ensureMe();
+ theReplSet->syncSourceFeedback.ensureMe(&txn);
meEnsured = true;
}
catch (const DBException& e) {
@@ -513,7 +513,7 @@ namespace {
// @param reconf true if this is a reconfiguration and not an initial load of the configuration.
// @return true if ok; throws if config really bad; false if config doesn't include self
- bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
+ bool ReplSetImpl::initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf) {
// NOTE: haveNewConfig() writes the new config to disk before we get here. So
// we cannot error out at this point, except fatally. Check errors earlier.
lock lk(this);
@@ -580,7 +580,7 @@ namespace {
log() << "replSet info self not present in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
- loadConfig(); // redo config from scratch
+ loadConfig(txn); // redo config from scratch
return false;
}
uassert(13302, "replSet error self appears twice in the repl set configuration", me<=1);
@@ -718,7 +718,7 @@ namespace {
}
// Our own config must be the first one.
- bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig*>& cfgs) {
+ bool ReplSetImpl::_loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& cfgs) {
int v = -1;
ReplSetConfig *highest = 0;
int myVersion = -2000;
@@ -734,18 +734,18 @@ namespace {
}
verify(highest);
- if (!initFromConfig(*highest))
+ if (!initFromConfig(txn, *highest))
return false;
if (highest->version > myVersion && highest->version >= 0) {
log() << "replSet got config version " << highest->version
<< " from a remote, saving locally" << rsLog;
- highest->saveConfigLocally(BSONObj());
+ highest->saveConfigLocally(txn, BSONObj());
}
return true;
}
- void ReplSetImpl::loadConfig() {
+ void ReplSetImpl::loadConfig(OperationContext* txn) {
startupStatus = LOADINGCONFIG;
startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)");
LOG(1) << "loadConfig() " << rsConfigNs << endl;
@@ -841,7 +841,7 @@ namespace {
continue;
}
- if (!_loadConfigFinish(configs.mutableVector())) {
+ if (!_loadConfigFinish(txn, configs.mutableVector())) {
log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try "
"again." << rsLog;
sleepsecs(20);
@@ -871,19 +871,17 @@ namespace {
const char* ReplSetImpl::_initialSyncFlagString = "doingInitialSync";
const BSONObj ReplSetImpl::_initialSyncFlag(BSON(_initialSyncFlagString << true));
- void ReplSetImpl::clearInitialSyncFlag() {
- OperationContextImpl txn; // XXX?
- Lock::DBWrite lk(txn.lockState(), "local");
- WriteUnitOfWork wunit(txn.recoveryUnit());
- Helpers::putSingleton(&txn, "local.replset.minvalid", BSON("$unset" << _initialSyncFlag));
+ void ReplSetImpl::clearInitialSyncFlag(OperationContext* txn) {
+ Lock::DBWrite lk(txn->lockState(), "local");
+ WriteUnitOfWork wunit(txn->recoveryUnit());
+ Helpers::putSingleton(txn, "local.replset.minvalid", BSON("$unset" << _initialSyncFlag));
wunit.commit();
}
- void ReplSetImpl::setInitialSyncFlag() {
- OperationContextImpl txn; // XXX?
- Lock::DBWrite lk(txn.lockState(), "local");
- WriteUnitOfWork wunit(txn.recoveryUnit());
- Helpers::putSingleton(&txn, "local.replset.minvalid", BSON("$set" << _initialSyncFlag));
+ void ReplSetImpl::setInitialSyncFlag(OperationContext* txn) {
+ Lock::DBWrite lk(txn->lockState(), "local");
+ WriteUnitOfWork wunit(txn->recoveryUnit());
+ Helpers::putSingleton(txn, "local.replset.minvalid", BSON("$set" << _initialSyncFlag));
wunit.commit();
}
@@ -897,24 +895,22 @@ namespace {
return false;
}
- void ReplSetImpl::setMinValid(BSONObj obj) {
+ void ReplSetImpl::setMinValid(OperationContext* txn, BSONObj obj) {
BSONObjBuilder builder;
BSONObjBuilder subobj(builder.subobjStart("$set"));
subobj.appendTimestamp("ts", obj["ts"].date());
subobj.done();
- OperationContextImpl txn; // XXX?
- Lock::DBWrite lk(txn.lockState(), "local");
- WriteUnitOfWork wunit(txn.recoveryUnit());
- Helpers::putSingleton(&txn, "local.replset.minvalid", builder.obj());
+ Lock::DBWrite lk(txn->lockState(), "local");
+ WriteUnitOfWork wunit(txn->recoveryUnit());
+ Helpers::putSingleton(txn, "local.replset.minvalid", builder.obj());
wunit.commit();
}
- OpTime ReplSetImpl::getMinValid() {
- OperationContextImpl txn; // XXX?
- Lock::DBRead lk(txn.lockState(), "local.replset.minvalid");
+ OpTime ReplSetImpl::getMinValid(OperationContext* txn) {
+ Lock::DBRead lk(txn->lockState(), "local.replset.minvalid");
BSONObj mv;
- if (Helpers::getSingleton(&txn, "local.replset.minvalid", mv)) {
+ if (Helpers::getSingleton(txn, "local.replset.minvalid", mv)) {
return mv["ts"]._opTime();
}
return OpTime();
diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h
index cbeef0fe08e..5b010e65d50 100644
--- a/src/mongo/db/repl/repl_set_impl.h
+++ b/src/mongo/db/repl/repl_set_impl.h
@@ -103,7 +103,7 @@ namespace repl {
const Member* getMemberToSyncTo();
void veto(const string& host, unsigned secs=10);
bool gotForceSync();
- void goStale(const Member* m, const BSONObj& o);
+ void goStale(OperationContext* txn, const Member* m, const BSONObj& o);
OID getElectionId() const { return elect.getElectionId(); }
OpTime getElectionTime() const { return elect.getElectionTime(); }
@@ -120,7 +120,7 @@ namespace repl {
bool _freeze(int secs);
private:
void _assumePrimary();
- void loadLastOpTimeWritten(bool quiet=false);
+ void loadLastOpTimeWritten(OperationContext* txn, bool quiet = false);
void changeState(MemberState s);
Member* _forceSyncTarget;
@@ -190,7 +190,7 @@ namespace repl {
* - intentionally leaks the old _cfg and any old _members (if the
* change isn't strictly additive)
*/
- bool initFromConfig(ReplSetConfig& c, bool reconf=false);
+ bool initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf = false);
void _fillIsMaster(BSONObjBuilder&);
void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; }
@@ -214,13 +214,13 @@ namespace repl {
* Finds the configuration with the highest version number and attempts
* load it.
*/
- bool _loadConfigFinish(vector<ReplSetConfig*>& v);
+ bool _loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& v);
/**
* Gather all possible configs (from command line seeds, our own config
* doc, and any hosts listed therein) and try to initiate from the most
* recent config we find.
*/
- void loadConfig();
+ void loadConfig(OperationContext* txn);
list<HostAndPort> memberHostnames() const;
bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
@@ -235,7 +235,7 @@ namespace repl {
ReplSetImpl();
/* throws exception if a problem initializing. */
- void init(ReplSetSeedList&);
+ void init(OperationContext* txn, ReplSetSeedList&);
void setSelfTo(Member *); // use this as it sets buildIndexes var
private:
@@ -268,7 +268,7 @@ namespace repl {
/**
* Cause the node to resync from scratch.
*/
- bool resync(std::string& errmsg);
+ bool resync(OperationContext* txn, std::string& errmsg);
private:
void _getTargets(list<Target>&, int &configVersion);
void getTargets(list<Target>&, int &configVersion);
@@ -281,7 +281,7 @@ namespace repl {
private:
bool _syncDoInitialSync_clone(OperationContext* txn, Cloner &cloner, const char *master,
const list<string>& dbs, bool dataPass);
- bool _syncDoInitialSync_applyToHead( SyncTail& syncer, OplogReader* r ,
+ bool _syncDoInitialSync_applyToHead( OperationContext* txn, SyncTail& syncer, OplogReader* r ,
const Member* source, const BSONObj& lastOp,
BSONObj& minValidOut);
void _syncDoInitialSync();
@@ -335,11 +335,11 @@ namespace repl {
* minValid, to indicate that we are in a consistent state when the batch has been fully
* applied.
*/
- static void setMinValid(BSONObj obj);
- static OpTime getMinValid();
- static void clearInitialSyncFlag();
+ static void setMinValid(OperationContext* txn, BSONObj obj);
+ static OpTime getMinValid(OperationContext* txn);
+ static void clearInitialSyncFlag(OperationContext* txn);
static bool getInitialSyncFlag();
- static void setInitialSyncFlag();
+ static void setInitialSyncFlag(OperationContext* txn);
int oplogVersion;
diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp
index 431050b3551..d3349f42eb9 100644
--- a/src/mongo/db/repl/resync.cpp
+++ b/src/mongo/db/repl/resync.cpp
@@ -79,7 +79,7 @@ namespace repl {
errmsg = "primaries cannot resync";
return false;
}
- return theReplSet->resync(errmsg);
+ return theReplSet->resync(txn, errmsg);
}
// below this comment pertains only to master/slave replication
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index a007b80f66d..ed2f98329c7 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
+#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/connections.h"
#include "mongo/db/repl/repl_set_impl.h"
@@ -57,21 +58,21 @@ namespace repl {
ReplSet::ReplSet() {
}
- ReplSet* ReplSet::make(ReplSetSeedList& replSetSeedList) {
+ ReplSet* ReplSet::make(OperationContext* txn, ReplSetSeedList& replSetSeedList) {
auto_ptr<ReplSet> ret(new ReplSet());
- ret->init(replSetSeedList);
+ ret->init(txn, replSetSeedList);
return ret.release();
}
ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART;
DiagStr ReplSetImpl::startupStatusMsg;
- void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
+ void ReplSet::haveNewConfig(OperationContext* txn, ReplSetConfig& newConfig, bool addComment) {
bo comment;
if( addComment )
comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
- newConfig.saveConfigLocally(comment);
+ newConfig.saveConfigLocally(txn, comment);
try {
BSONObj oldConfForAudit = config().asBson();
@@ -79,7 +80,7 @@ namespace repl {
audit::logReplSetReconfig(ClientBasic::getCurrent(),
&oldConfForAudit,
&newConfForAudit);
- if (initFromConfig(newConfig, true)) {
+ if (initFromConfig(txn, newConfig, true)) {
log() << "replSet replSetReconfig new config saved locally" << rsLog;
}
}
@@ -94,10 +95,12 @@ namespace repl {
}
void Manager::msgReceivedNewConfig(BSONObj o) {
+ OperationContextImpl txn;
+
log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog;
scoped_ptr<ReplSetConfig> config(ReplSetConfig::make(o));
if( config->version > rs->config().version )
- theReplSet->haveNewConfig(*config, false);
+ theReplSet->haveNewConfig(&txn, *config, false);
else {
log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
config->version << ' ' << rs->config().version << rsLog;
@@ -111,13 +114,15 @@ namespace repl {
*/
void startReplSets(ReplSetSeedList *replSetSeedList) {
Client::initThread("rsStart");
+ OperationContextImpl txn;
+
try {
verify( theReplSet == 0 );
if( replSetSeedList == 0 ) {
return;
}
replLocalAuth();
- (theReplSet = ReplSet::make(*replSetSeedList))->go();
+ (theReplSet = ReplSet::make(&txn, *replSetSeedList))->go();
}
catch(std::exception& e) {
log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp
index 0867776d71e..65b70dd9c01 100644
--- a/src/mongo/db/repl/rs_config.cpp
+++ b/src/mongo/db/repl/rs_config.cpp
@@ -79,7 +79,7 @@ namespace {
}
/* comment MUST only be set when initiating the set by the initiator */
- void ReplSetConfig::saveConfigLocally(bo comment) {
+ void ReplSetConfig::saveConfigLocally(OperationContext* txn, bo comment) {
checkRsConfig();
BSONObj newConfigBSON = asBson();
@@ -87,17 +87,16 @@ namespace {
log() << "replSet info saving a newer config version to local.system.replset: "
<< newConfigBSON << rsLog;
{
- OperationContextImpl txn;
- Client::WriteContext cx(&txn, rsConfigNs);
+ Client::WriteContext cx(txn, rsConfigNs);
//theReplSet->lastOpTimeWritten = ??;
//rather than above, do a logOp()? probably
- Helpers::putSingletonGod(&txn,
+ Helpers::putSingletonGod(txn,
rsConfigNs.c_str(),
newConfigBSON,
false/*logOp=false; local db so would work regardless...*/);
if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) )
- logOpInitiate(&txn, comment);
+ logOpInitiate(txn, comment);
cx.commit();
}
log() << "replSet saveConfigLocally done" << rsLog;
diff --git a/src/mongo/db/repl/rs_config.h b/src/mongo/db/repl/rs_config.h
index ac2e8a5ac7e..f86dbb13000 100644
--- a/src/mongo/db/repl/rs_config.h
+++ b/src/mongo/db/repl/rs_config.h
@@ -38,6 +38,9 @@
#include "mongo/util/net/hostandport.h"
namespace mongo {
+
+ class OperationContext;
+
namespace repl {
class Member;
const std::string rsConfigNs = "local.system.replset";
@@ -157,7 +160,7 @@ namespace repl {
* 3. If 'comment' isn't empty and we're a primary or not yet initiated, log an 'n' op
* to the oplog. This is important because it establishes our lastOpWritten time.
*/
- void saveConfigLocally(BSONObj comment); // to local db
+ void saveConfigLocally(OperationContext* txn, BSONObj comment); // to local db
/**
* Update members' groups when the config changes but members stay the same.
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 0ca30eabd27..a9c03f72bb2 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -68,7 +68,10 @@ namespace repl {
void ReplSetImpl::syncDoInitialSync() {
static const int maxFailedAttempts = 10;
- createOplog();
+
+ OperationContextImpl txn;
+ createOplog(&txn);
+
int failedAttempts = 0;
while ( failedAttempts < maxFailedAttempts ) {
try {
@@ -133,20 +136,17 @@ namespace repl {
return true;
}
- void _logOpObjRS(const BSONObj& op);
-
- static void emptyOplog() {
- OperationContextImpl txn;
- Client::WriteContext ctx(&txn, rsoplog);
+ static void emptyOplog(OperationContext* txn) {
+ Client::WriteContext ctx(txn, rsoplog);
- Collection* collection = ctx.ctx().db()->getCollection(&txn, rsoplog);
+ Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog);
// temp
if( collection->numRecords() == 0 )
return; // already empty, ok.
LOG(1) << "replSet empty oplog" << rsLog;
- uassertStatusOK( collection->truncate(&txn) );
+ uassertStatusOK( collection->truncate(txn) );
ctx.commit();
}
@@ -283,7 +283,7 @@ namespace repl {
* this function syncs to this value (inclusive)
* @return if applying the oplog succeeded
*/
- bool ReplSetImpl::_syncDoInitialSync_applyToHead( SyncTail& syncer, OplogReader* r,
+ bool ReplSetImpl::_syncDoInitialSync_applyToHead( OperationContext* txn, SyncTail& syncer, OplogReader* r,
const Member* source, const BSONObj& lastOp ,
BSONObj& minValid ) {
/* our cloned copy will be strange until we apply oplog events that occurred
@@ -317,12 +317,12 @@ namespace repl {
// apply startingTS..mvoptime portion of the oplog
{
try {
- minValid = syncer.oplogApplication(lastOp, minValid);
+ minValid = syncer.oplogApplication(txn, lastOp, minValid);
}
catch (const DBException&) {
log() << "replSet initial sync failed during oplog application phase" << rsLog;
- emptyOplog(); // otherwise we'll be up!
+ emptyOplog(txn); // otherwise we'll be up!
lastOpTimeWritten = OpTime();
lastH = 0;
@@ -403,12 +403,12 @@ namespace repl {
log() << "fastsync: skipping database clone" << rsLog;
// prime oplog
- init.oplogApplication(lastOp, lastOp);
+ init.oplogApplication(&txn, lastOp, lastOp);
return;
}
else {
// Add field to minvalid document to tell us to restart initial sync if we crash
- theReplSet->setInitialSyncFlag();
+ theReplSet->setInitialSyncFlag(&txn);
sethbmsg("initial sync drop all databases", 0);
dropAllDatabasesExceptLocal(&txn);
@@ -427,7 +427,7 @@ namespace repl {
sethbmsg("initial sync data copy, starting syncup",0);
log() << "oplog sync 1 of 3" << endl;
- if ( ! _syncDoInitialSync_applyToHead( init, &r , source , lastOp , minValid ) ) {
+ if (!_syncDoInitialSync_applyToHead(&txn, init, &r, source, lastOp, minValid)) {
return;
}
@@ -437,7 +437,7 @@ namespace repl {
// that were "from the future" compared with minValid. During this second application,
// nothing should need to be recloned.
log() << "oplog sync 2 of 3" << endl;
- if (!_syncDoInitialSync_applyToHead(tail, &r , source , lastOp , minValid)) {
+ if (!_syncDoInitialSync_applyToHead(&txn, tail, &r, source, lastOp, minValid)) {
return;
}
// data should now be consistent
@@ -453,7 +453,7 @@ namespace repl {
}
log() << "oplog sync 3 of 3" << endl;
- if (!_syncDoInitialSync_applyToHead(tail, &r, source, lastOp, minValid)) {
+ if (!_syncDoInitialSync_applyToHead(&txn, tail, &r, source, lastOp, minValid)) {
return;
}
@@ -479,10 +479,10 @@ namespace repl {
// Initial sync is now complete. Flag this by setting minValid to the last thing
// we synced.
- theReplSet->setMinValid(minValid);
+ theReplSet->setMinValid(&txn, minValid);
// Clear the initial sync flag.
- theReplSet->clearInitialSyncFlag();
+ theReplSet->clearInitialSyncFlag(&txn);
cx.commit();
}
{
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 4f7f31c81bf..445183fc5c9 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -427,7 +427,7 @@ namespace repl {
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
- setMinValid(newMinValid);
+ setMinValid(txn, newMinValid);
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResync.empty()) {
@@ -473,7 +473,7 @@ namespace repl {
else {
log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong()
<< rsLog;
- setMinValid(newMinValid);
+ setMinValid(txn, newMinValid);
}
}
catch (DBException& e) {
@@ -669,7 +669,7 @@ namespace repl {
}
// reset cached lastoptimewritten and h value
- loadLastOpTimeWritten();
+ loadLastOpTimeWritten(txn);
// done
if (warn)
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index e6ef65ba88f..1b1860f6e72 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -93,7 +93,7 @@ namespace repl {
return false;
}
- minvalid = getMinValid();
+ minvalid = getMinValid(txn);
if( minvalid <= lastOpTimeWritten ) {
golive=true;
}
@@ -223,13 +223,12 @@ namespace repl {
tail.oplogApplication();
}
- bool ReplSetImpl::resync(string& errmsg) {
+ bool ReplSetImpl::resync(OperationContext* txn, string& errmsg) {
changeState(MemberState::RS_RECOVERING);
- OperationContextImpl txn;
- Client::Context ctx(&txn, "local");
+ Client::Context ctx(txn, "local");
- ctx.db()->dropCollection(&txn, "local.oplog.rs");
+ ctx.db()->dropCollection(txn, "local.oplog.rs");
{
boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
theReplSet->initialSyncRequested = true;
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 20df8d80397..9bcd8b6a412 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -40,7 +40,7 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rs.h" // theReplSet
-#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/operation_context.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -70,26 +70,25 @@ namespace repl {
return authenticateInternalUser(_connection.get());
}
- void SyncSourceFeedback::ensureMe() {
+ void SyncSourceFeedback::ensureMe(OperationContext* txn) {
string myname = getHostName();
{
- OperationContextImpl txn;
- Client::WriteContext ctx(&txn, "local");
+ Client::WriteContext ctx(txn, "local");
// local.me is an identifier for a server for getLastError w:2+
- if (!Helpers::getSingleton(&txn, "local.me", _me) ||
+ if (!Helpers::getSingleton(txn, "local.me", _me) ||
!_me.hasField("host") ||
_me["host"].String() != myname) {
// clean out local.me
- Helpers::emptyCollection(&txn, "local.me");
+ Helpers::emptyCollection(txn, "local.me");
// repopulate
BSONObjBuilder b;
b.appendOID("_id", 0, true);
b.append("host", myname);
_me = b.obj();
- Helpers::putSingleton(&txn, "local.me", _me);
+ Helpers::putSingleton(txn, "local.me", _me);
}
ctx.commit();
// _me is used outside of a read lock, so we must copy it out of the mmap
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index 3225620afa9..371912b26c8 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -36,6 +36,9 @@
#include "mongo/util/log.h"
namespace mongo {
+
+ class OperationContext;
+
namespace repl {
class Member;
@@ -53,7 +56,7 @@ namespace repl {
void associateMember(const OID& rid, Member* member);
/// Ensures local.me is populated and populates it if not.
- void ensureMe();
+ void ensureMe(OperationContext* txn);
/// Notifies the SyncSourceFeedbackThread to wake up and send a handshake up the replication
/// chain, upon receiving a handshake.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 7665c995102..345a2c60d2e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -274,7 +274,9 @@ namespace repl {
return lastOp;
}
- BSONObj SyncTail::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) {
+ BSONObj SyncTail::oplogApplication(OperationContext* txn,
+ const BSONObj& applyGTEObj,
+ const BSONObj& minValidObj) {
return oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply);
}
@@ -296,6 +298,7 @@ namespace repl {
void SyncTail::oplogApplication() {
while( 1 ) {
OpQueue ops;
+ OperationContextImpl txn;
Timer batchTimer;
int lastTimeChecked = 0;
@@ -332,8 +335,6 @@ namespace repl {
// become primary
if (!theReplSet->isSecondary()) {
OpTime minvalid;
-
- OperationContextImpl txn;
theReplSet->tryToGoLiveAsASecondary(&txn, minvalid);
}
@@ -385,7 +386,7 @@ namespace repl {
// Set minValid to the last op to be applied in this next batch.
// This will cause this node to go into RECOVERING state
// if we should crash and restart before updating the oplog
- theReplSet->setMinValid(lastOp);
+ theReplSet->setMinValid(&txn, lastOp);
if (BackgroundSync::get()->isAssumingPrimary()) {
LOG(1) << "about to apply batch up to optime: "
@@ -492,7 +493,7 @@ namespace repl {
while (!ops->empty()) {
const BSONObj& op = ops->front();
// this updates theReplSet->lastOpTimeWritten
- _logOpObjRS(op);
+ _logOpObjRS(&txn, op);
ops->pop_front();
}
wunit.commit();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index ea9995aaed2..c24ab958ac3 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -75,7 +75,9 @@ namespace repl {
/**
* Runs oplogApplySegment without allowing recloning documents.
*/
- virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj);
+ virtual BSONObj oplogApplication(OperationContext* txn,
+ const BSONObj& applyGTEObj,
+ const BSONObj& minValidObj);
void oplogApplication();
bool peek(BSONObj* obj);