summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2014-05-28 13:49:34 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2014-06-02 22:52:46 -0400
commit8c9fcc939f9f1a2b593e606bd790cc87efd4064f (patch)
treebeaa313f3e53cf72ca76aa5392946b97736ea6b3 /src/mongo/db/repl
parent4add46aa8dd05a5c6d8af2c798eef6e9b5e4164b (diff)
downloadmongo-8c9fcc939f9f1a2b593e606bd790cc87efd4064f.tar.gz
SERVER-13961 Start using LockState from the OperationContext
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/consensus.cpp2
-rw-r--r--src/mongo/db/repl/health.cpp2
-rw-r--r--src/mongo/db/repl/initial_sync.cpp5
-rw-r--r--src/mongo/db/repl/master_slave.cpp75
-rw-r--r--src/mongo/db/repl/master_slave.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp24
-rw-r--r--src/mongo/db/repl/oplog.h6
-rw-r--r--src/mongo/db/repl/repl_set_impl.cpp15
-rw-r--r--src/mongo/db/repl/repl_set_impl.h8
-rw-r--r--src/mongo/db/repl/replset_commands.cpp6
-rw-r--r--src/mongo/db/repl/replset_web_handler.cpp10
-rw-r--r--src/mongo/db/repl/resync.cpp11
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp5
-rw-r--r--src/mongo/db/repl/rs_initiate.cpp4
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp13
-rw-r--r--src/mongo/db/repl/rs_sync.cpp4
-rw-r--r--src/mongo/db/repl/sync.cpp15
-rw-r--r--src/mongo/db/repl/sync.h2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp32
-rw-r--r--src/mongo/db/repl/sync_tail.h7
20 files changed, 142 insertions, 108 deletions
diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp
index 34d295bfe52..9a23f9f559e 100644
--- a/src/mongo/db/repl/consensus.cpp
+++ b/src/mongo/db/repl/consensus.cpp
@@ -457,7 +457,7 @@ namespace repl {
setElectionTime(getNextGlobalOptime());
- rs.assumePrimary();
+ rs._assumePrimary();
}
}
}
diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp
index cfb1bd780de..aaf11da5759 100644
--- a/src/mongo/db/repl/health.cpp
+++ b/src/mongo/db/repl/health.cpp
@@ -249,7 +249,7 @@ namespace repl {
string myMinValid;
try {
- readlocktry lk(/*"local.replset.minvalid", */300);
+ readlocktry lk(txn->lockState(), /*"local.replset.minvalid", */300);
if( lk.got() ) {
BSONObj mv;
if( Helpers::getSingleton(txn, "local.replset.minvalid", mv) ) {
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index d03bf04f756..794f010fea3 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -30,10 +30,12 @@
#include "mongo/db/repl/initial_sync.h"
+#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replset_commands.h"
#include "mongo/db/repl/rs.h"
+
namespace mongo {
namespace repl {
InitialSync::InitialSync(BackgroundSyncInterface *q) :
@@ -52,7 +54,8 @@ namespace repl {
}
// create the initial oplog entry
- syncApply(applyGTEObj);
+ OperationContextImpl txn;
+ syncApply(&txn, applyGTEObj);
_logOpObjRS(applyGTEObj);
return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply);
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 93681b32d28..5405307dd5c 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -331,7 +331,7 @@ namespace repl {
void ReplSource::forceResync( OperationContext* txn, const char *requester ) {
BSONObj info;
{
- dbtemprelease t;
+ dbtemprelease t(txn->lockState());
if (!oplogReader.connect(hostName, _me)) {
msgassertedNoTrace( 14051 , "unable to connect to resync");
}
@@ -445,7 +445,7 @@ namespace repl {
OpTime lastTime;
bool dbOk = false;
{
- dbtemprelease release;
+ dbtemprelease release(txn->lockState());
// We always log an operation after executing it (never before), so
// a database list will always be valid as of an oplog entry generated
@@ -512,7 +512,7 @@ namespace repl {
bool failedUpdate = applyOperation_inlock( txn, db, op );
if (failedUpdate) {
Sync sync(hostName);
- if (sync.shouldRetry(op)) {
+ if (sync.shouldRetry(txn, op)) {
uassert(15914,
"Failure retrying initial sync update",
!applyOperation_inlock(txn, db, op));
@@ -535,7 +535,7 @@ namespace repl {
@param alreadyLocked caller already put us in write lock if true
*/
- void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked) {
+ void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked) {
LOG(6) << "processing op: " << op << endl;
if( op.getStringField("op")[0] == 'n' )
@@ -562,8 +562,6 @@ namespace repl {
if ( !only.empty() && only != clientName )
return;
- OperationContextImpl txn; // XXX?
-
if (replSettings.pretouch &&
!alreadyLocked/*doesn't make sense if in write lock already*/) {
if (replSettings.pretouch > 1) {
@@ -592,17 +590,17 @@ namespace repl {
a += m;
}
// we do one too...
- pretouchOperation(&txn, op);
+ pretouchOperation(txn, op);
tp->join();
countdown = v.size();
}
}
else {
- pretouchOperation(&txn, op);
+ pretouchOperation(txn, op);
}
}
- scoped_ptr<Lock::GlobalWrite> lk( alreadyLocked ? 0 : new Lock::GlobalWrite() );
+ scoped_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState()));
if ( replAllDead ) {
// hmmm why is this check here and not at top of this function? does it get set between top and here?
@@ -610,7 +608,7 @@ namespace repl {
throw SyncException();
}
- if ( !handleDuplicateDbName( &txn, op, ns, clientName ) ) {
+ if (!handleDuplicateDbName(txn, op, ns, clientName)) {
return;
}
@@ -625,7 +623,7 @@ namespace repl {
// always apply admin command command
// this is a bit hacky -- the semantics of replication/commands aren't well specified
if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) {
- applyOperation( &txn, ctx.db(), op );
+ applyOperation(txn, ctx.db(), op);
return;
}
@@ -647,14 +645,14 @@ namespace repl {
save();
Client::Context ctx(ns);
nClonedThisPass++;
- resync(&txn, ctx.db()->name());
+ resync(txn, ctx.db()->name());
addDbNextPass.erase(clientName);
incompleteCloneDbs.erase( clientName );
}
save();
}
else {
- applyOperation( &txn, ctx.db(), op );
+ applyOperation(txn, ctx.db(), op);
addDbNextPass.erase( clientName );
}
}
@@ -723,7 +721,7 @@ namespace repl {
0 ok, don't sleep
1 ok, sleep
*/
- int ReplSource::sync_pullOpLog(int& nApplied) {
+ int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
int okResultCode = 1;
string ns = string("local.oplog.$") + sourceName();
LOG(2) << "repl: sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n';
@@ -757,7 +755,7 @@ namespace repl {
}
// obviously global isn't ideal, but non-repl set is old so
// keeping it simple
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
save();
}
@@ -794,7 +792,7 @@ namespace repl {
b.append("ns", *i + '.');
b.append("op", "db");
BSONObj op = b.done();
- sync_pullOpLog_applyOperation(op, false);
+ _sync_pullOpLog_applyOperation(txn, op, false);
}
}
@@ -809,7 +807,7 @@ namespace repl {
log() << "repl: " << ns << " oplog is empty" << endl;
}
{
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
save();
}
return okResultCode;
@@ -880,11 +878,11 @@ namespace repl {
bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place.
if ( moreInitialSyncsPending || !oplogReader.more() ) {
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
// NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior.
{
- dbtemprelease t;
+ dbtemprelease t(txn->lockState());
if ( !moreInitialSyncsPending && oplogReader.more() ) {
continue;
}
@@ -905,7 +903,7 @@ namespace repl {
OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) {
// periodically note our progress, in case we are doing a lot of work and crash
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
syncedTo = nextOpTime;
// can't update local log ts since there are pending operations from our peer
save();
@@ -919,7 +917,7 @@ namespace repl {
int b = replApplyBatchSize.get();
bool justOne = b == 1;
- scoped_ptr<Lock::GlobalWrite> lk( justOne ? 0 : new Lock::GlobalWrite() );
+ scoped_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState()));
while( 1 ) {
BSONElement ts = op.getField("ts");
@@ -944,7 +942,7 @@ namespace repl {
verify( justOne );
oplogReader.putBack( op );
_sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
if ( n > 0 ) {
syncedTo = last;
save();
@@ -955,7 +953,7 @@ namespace repl {
return okResultCode;
}
- sync_pullOpLog_applyOperation(op, !justOne);
+ _sync_pullOpLog_applyOperation(txn, op, !justOne);
n++;
if( --b == 0 )
@@ -1006,7 +1004,8 @@ namespace repl {
return -1;
}
- return sync_pullOpLog(nApplied);
+ OperationContextImpl txn; // XXX?
+ return _sync_pullOpLog(&txn, nApplied);
}
/* --------------------------------------------------------------*/
@@ -1025,8 +1024,9 @@ namespace repl {
OperationContextImpl txn;
{
ReplInfo r("replMain load sources");
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn.lockState());
ReplSource::loadAll(&txn, sources);
+
replSettings.fastsync = false; // only need this param for initial reset
}
@@ -1089,13 +1089,13 @@ namespace repl {
return sleepAdvice;
}
- void replMain() {
+ static void replMain() {
ReplSource::SourceVector sources;
while ( 1 ) {
int s = 0;
{
- Lock::GlobalWrite lk;
OperationContextImpl txn;
+ Lock::GlobalWrite lk(txn.lockState());
if ( replAllDead ) {
// throttledForceResyncDead can throw
if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( &txn, "auto" ) ) {
@@ -1106,6 +1106,7 @@ namespace repl {
verify( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this.
syncing++;
}
+
try {
int nApplied = 0;
s = _replMain(sources, nApplied);
@@ -1122,8 +1123,10 @@ namespace repl {
out() << "caught exception in _replMain" << endl;
s = 4;
}
+
{
- Lock::GlobalWrite lk;
+ LockState lockState;
+ Lock::GlobalWrite lk(&lockState);
verify( syncing == 1 );
syncing--;
}
@@ -1157,14 +1160,15 @@ namespace repl {
even when things are idle.
*/
{
- writelocktry lk(1);
+ OperationContextImpl txn;
+ writelocktry lk(txn.lockState(), 1);
if ( lk.got() ) {
toSleep = 10;
replLocalAuth();
try {
- logKeepalive();
+ logKeepalive(&txn);
}
catch(...) {
log() << "caught exception in replMasterThread()" << endl;
@@ -1178,12 +1182,13 @@ namespace repl {
}
}
- void replSlaveThread() {
+ static void replSlaveThread() {
sleepsecs(1);
Client::initThread("replslave");
{
- Lock::GlobalWrite lk;
+ LockState lockState;
+ Lock::GlobalWrite lk(&lockState);
replLocalAuth();
}
@@ -1217,7 +1222,8 @@ namespace repl {
return;
{
- Lock::GlobalWrite lk;
+ LockState lockState;
+ Lock::GlobalWrite lk(&lockState);
replLocalAuth();
}
@@ -1249,7 +1255,8 @@ namespace repl {
}
OperationContextImpl txn; // XXX
- Lock::GlobalRead lk;
+ Lock::GlobalRead lk(txn.lockState());
+
for( unsigned i = a; i <= b; i++ ) {
const BSONObj& op = v[i];
const char *which = "o";
diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h
index 15445f68ede..f30f9f2bdd5 100644
--- a/src/mongo/db/repl/master_slave.h
+++ b/src/mongo/db/repl/master_slave.h
@@ -83,12 +83,12 @@ namespace repl {
void resync(OperationContext* txn, const std::string& dbName);
/** @param alreadyLocked caller already put us in write lock if true */
- void sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked);
+ void _sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked);
/* pull some operations from the master's oplog, and apply them.
calls sync_pullOpLog_applyOperation
*/
- int sync_pullOpLog(int& nApplied);
+ int _sync_pullOpLog(OperationContext* txn, int& nApplied);
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master's transaction log by doing too much work before going
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 1c40bbdfa6f..bf5cf5348b9 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -392,13 +392,11 @@ namespace repl {
}
void oldRepl() { _logOp = _logOpOld; }
- void logKeepalive() {
- OperationContextImpl txn;
- _logOp(&txn, "n", "", 0, BSONObj(), 0, 0, false);
+ void logKeepalive(OperationContext* txn) {
+ _logOp(txn, "n", "", 0, BSONObj(), 0, 0, false);
}
- void logOpComment(const BSONObj& obj) {
- OperationContextImpl txn;
- _logOp(&txn, "n", "", 0, obj, 0, 0, false);
+ void logOpComment(OperationContext* txn, const BSONObj& obj) {
+ _logOp(txn, "n", "", 0, obj, 0, 0, false);
}
void logOpInitiate(OperationContext* txn, const BSONObj& obj) {
_logOpRS(txn, "n", "", 0, obj, 0, 0, false);
@@ -433,7 +431,8 @@ namespace repl {
}
void createOplog() {
- Lock::GlobalWrite lk;
+ OperationContextImpl txn;
+ Lock::GlobalWrite lk(txn.lockState());
const char * ns = "local.oplog.$main";
@@ -442,7 +441,6 @@ namespace repl {
ns = rsoplog;
Client::Context ctx(ns);
- OperationContextImpl txn;
Collection* collection = ctx.db()->getCollection( &txn, ns );
if ( collection ) {
@@ -460,7 +458,7 @@ namespace repl {
if( rs ) return;
- initOpTimeFromOplog(ns);
+ initOpTimeFromOplog(&txn, ns);
return;
}
@@ -711,12 +709,12 @@ namespace repl {
Status status = Command::getStatusFromCommandResult(ob.done());
switch (status.code()) {
case ErrorCodes::BackgroundOperationInProgressForDatabase: {
- dbtemprelease release;
+ dbtemprelease release(txn->lockState());
BackgroundOperation::awaitNoBgOpInProgForDb(nsToDatabaseSubstring(ns));
break;
}
case ErrorCodes::BackgroundOperationInProgressForNamespace: {
- dbtemprelease release;
+ dbtemprelease release(txn->lockState());;
Command* cmd = Command::findCommand(o.firstElement().fieldName());
invariant(cmd);
BackgroundOperation::awaitNoBgOpInProgForNs(cmd->parseNs(nsToDatabase(ns), o));
@@ -761,8 +759,8 @@ namespace repl {
}
- void initOpTimeFromOplog(const std::string& oplogNS) {
- DBDirectClient c;
+ void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) {
+ DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS,
Query().sort(reverseNaturalObj),
NULL,
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 40803c0a007..6728811667e 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -78,13 +78,13 @@ namespace repl {
bool fromMigrate = false);
// Log an empty no-op operation to the local oplog
- void logKeepalive();
+ void logKeepalive(OperationContext* txn);
/** puts obj in the oplog as a comment (a no-op). Just for diags.
convention is
{ msg : "text", ... }
*/
- void logOpComment(const BSONObj& obj);
+ void logOpComment(OperationContext* txn, const BSONObj& obj);
// Flush out the cached pointers to the local database and oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.
@@ -112,6 +112,6 @@ namespace repl {
/**
* Initializes the global OpTime with the value from the timestamp of the last oplog entry.
*/
- void initOpTimeFromOplog(const std::string& oplogNS);
+ void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp
index 5d7a37d8806..c5b00f59c20 100644
--- a/src/mongo/db/repl/repl_set_impl.cpp
+++ b/src/mongo/db/repl/repl_set_impl.cpp
@@ -109,7 +109,7 @@ namespace {
}
}
- void ReplSetImpl::assumePrimary() {
+ void ReplSetImpl::_assumePrimary() {
LOG(1) << "replSet assuming primary" << endl;
verify(iAmPotentiallyHot());
@@ -119,9 +119,11 @@ namespace {
// Lock here to prevent stepping down & becoming primary from getting interleaved
LOG(1) << "replSet waiting for global write lock";
- Lock::GlobalWrite lk;
- initOpTimeFromOplog("local.oplog.rs");
+ OperationContextImpl txn; // XXX?
+ Lock::GlobalWrite lk(txn.lockState());
+
+ initOpTimeFromOplog(&txn, "local.oplog.rs");
// Generate new election unique id
elect.setElectionId(OID::gen());
@@ -138,8 +140,10 @@ namespace {
bool ReplSetImpl::setMaintenanceMode(const bool inc) {
lock replLock(this);
+
// Lock here to prevent state from changing between checking the state and changing it
- Lock::GlobalWrite writeLock;
+ LockState lockState;
+ Lock::GlobalWrite writeLock(&lockState);
if (box.getState().primary()) {
return false;
@@ -191,7 +195,8 @@ namespace {
void ReplSetImpl::relinquish() {
{
- Lock::GlobalWrite lk; // so we are synchronized with _logOp()
+ LockState lockState;
+ Lock::GlobalWrite writeLock(&lockState); // so we are synchronized with _logOp()
LOG(2) << "replSet attempting to relinquish" << endl;
if (box.getState().primary()) {
diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h
index b5b6254826d..da6d2efdf3d 100644
--- a/src/mongo/db/repl/repl_set_impl.h
+++ b/src/mongo/db/repl/repl_set_impl.h
@@ -41,7 +41,9 @@
#include "mongo/util/concurrency/value.h"
namespace mongo {
+
class Cloner;
+ class OperationContext;
namespace repl {
@@ -118,7 +120,7 @@ namespace repl {
bool _stepDown(int secs);
bool _freeze(int secs);
private:
- void assumePrimary();
+ void _assumePrimary();
void loadLastOpTimeWritten(bool quiet=false);
void changeState(MemberState s);
@@ -288,7 +290,7 @@ namespace repl {
void syncDoInitialSync();
void _syncThread();
void syncTail();
- unsigned _syncRollback(OplogReader& r);
+ unsigned _syncRollback(OperationContext* txn, OplogReader& r);
void syncFixUp(FixUpInfo& h, OplogReader& r);
// keep a list of hosts that we've tried recently that didn't work
@@ -317,7 +319,7 @@ namespace repl {
threadpool::ThreadPool& getWriterPool() { return _writerPool; }
const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
- bool tryToGoLiveAsASecondary(OpTime&); // readlocks
+ bool tryToGoLiveAsASecondary(OperationContext* txn, OpTime&); // readlocks
void syncRollback(OplogReader& r);
void syncThread();
const OpTime lastOtherOpTime() const;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 5605d962987..b7df8ebe227 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -186,14 +186,14 @@ namespace repl {
virtual bool run(OperationContext* txn, const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) {
try {
rwlock_try_write lk(mutex);
- return _run(a,b,e,errmsg,c,d);
+ return _run(txn, a,b,e,errmsg,c,d);
}
catch(rwlock_try_write::exception&) { }
errmsg = "a replSetReconfig is already in progress";
return false;
}
private:
- bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ bool _run(OperationContext* txn, const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( cmdObj["replSetReconfig"].type() != Object ) {
errmsg = "no configuration specified";
return false;
@@ -220,7 +220,7 @@ namespace repl {
// later. of course it could be stuck then, but this check lowers the risk if weird things
// are up - we probably don't want a change to apply 30 minutes after the initial attempt.
time_t t = time(0);
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
if( time(0)-t > 20 ) {
errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
return false;
diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp
index 8c71fa2748b..21f013318ea 100644
--- a/src/mongo/db/repl/replset_web_handler.cpp
+++ b/src/mongo/db/repl/replset_web_handler.cpp
@@ -51,9 +51,13 @@ namespace repl {
}
virtual void handle( OperationContext* txn,
- const char *rq, const std::string& url, BSONObj params,
- string& responseMsg, int& responseCode,
- vector<string>& headers, const SockAddr &from ) {
+ const char *rq,
+ const std::string& url,
+ BSONObj params,
+ string& responseMsg,
+ int& responseCode,
+ vector<string>& headers,
+ const SockAddr &from ) {
if( url == "/_replSetOplog" ) {
responseMsg = _replSetOplog(params);
diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp
index 3bd6b268cdf..d1c693d4b92 100644
--- a/src/mongo/db/repl/resync.cpp
+++ b/src/mongo/db/repl/resync.cpp
@@ -67,7 +67,7 @@ namespace repl {
bool fromRepl) {
const std::string ns = parseNs(dbname, cmdObj);
- Lock::GlobalWrite globalWriteLock;
+ Lock::GlobalWrite globalWriteLock(txn->lockState());
Client::Context ctx(ns);
if (replSettings.usingReplSets()) {
if (!theReplSet) {
@@ -83,7 +83,7 @@ namespace repl {
// below this comment pertains only to master/slave replication
if ( cmdObj.getBoolField( "force" ) ) {
- if ( !waitForSyncToFinish( errmsg ) )
+ if ( !waitForSyncToFinish(txn, errmsg ) )
return false;
replAllDead = "resync forced";
}
@@ -91,14 +91,15 @@ namespace repl {
errmsg = "not dead, no need to resync";
return false;
}
- if ( !waitForSyncToFinish( errmsg ) )
+ if ( !waitForSyncToFinish(txn, errmsg ) )
return false;
ReplSource::forceResyncDead( txn, "client" );
result.append( "info", "triggered resync for all sources" );
return true;
}
- bool waitForSyncToFinish( string &errmsg ) const {
+
+ bool waitForSyncToFinish(OperationContext* txn, string &errmsg) const {
// Wait for slave thread to finish syncing, so sources will be be
// reloaded with new saved state on next pass.
Timer t;
@@ -106,7 +107,7 @@ namespace repl {
if ( syncing == 0 || t.millis() > 30000 )
break;
{
- Lock::TempRelease t;
+ Lock::TempRelease t(txn->lockState());
relinquishSyncingSome = 1;
sleepmillis(1);
}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index be6c8aaf75a..5faea85e949 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -393,6 +393,8 @@ namespace repl {
return;
}
+ OperationContextImpl txn;
+
// written by applyToHead calls
BSONObj minValid;
@@ -408,7 +410,7 @@ namespace repl {
theReplSet->setInitialSyncFlag();
sethbmsg("initial sync drop all databases", 0);
- dropAllDatabasesExceptLocal();
+ dropAllDatabasesExceptLocal(&txn);
sethbmsg("initial sync clone all databases", 0);
@@ -467,7 +469,6 @@ namespace repl {
verify( !box.getState().primary() ); // wouldn't make sense if we were.
{
- OperationContextImpl txn;
Client::WriteContext cx(&txn, "local.");
cx.ctx().db()->flushFiles(true);
diff --git a/src/mongo/db/repl/rs_initiate.cpp b/src/mongo/db/repl/rs_initiate.cpp
index e50d2a80568..c0cd5042abd 100644
--- a/src/mongo/db/repl/rs_initiate.cpp
+++ b/src/mongo/db/repl/rs_initiate.cpp
@@ -197,7 +197,7 @@ namespace repl {
// later. of course it could be stuck then, but this check lowers the risk if weird things
// are up.
time_t t = time(0);
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
if( time(0)-t > 10 ) {
errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
return false;
@@ -270,7 +270,7 @@ namespace repl {
createOplog();
- Lock::GlobalWrite lk;
+ Lock::GlobalWrite lk(txn->lockState());
bo comment = BSON( "msg" << "initiating set");
newConfig->saveConfigLocally(comment);
log() << "replSet replSetInitiate config now saved locally. "
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 9d07024e16a..4ecf5407c24 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -428,7 +428,7 @@ namespace repl {
ctx.db()->dropCollection(&txn, ns);
{
string errmsg;
- dbtemprelease release;
+ dbtemprelease release(txn.lockState());
bool ok = Cloner::copyCollectionFromRemote(&txn, them->getServerAddress(),
ns, errmsg);
uassert(15909, str::stream() << "replSet rollback error resyncing collection "
@@ -662,8 +662,9 @@ namespace repl {
void ReplSetImpl::syncRollback(OplogReader& oplogreader) {
// check that we are at minvalid, otherwise we cannot rollback as we may be in an
// inconsistent state
+ OperationContextImpl txn;
+
{
- OperationContextImpl txn;
Lock::DBRead lk(txn.lockState(), "local.replset.minvalid");
BSONObj mv;
if (Helpers::getSingleton(&txn, "local.replset.minvalid", mv)) {
@@ -678,18 +679,18 @@ namespace repl {
}
}
- unsigned s = _syncRollback(oplogreader);
+ unsigned s = _syncRollback(&txn, oplogreader);
if (s)
sleepsecs(s);
}
- unsigned ReplSetImpl::_syncRollback(OplogReader& oplogreader) {
+ unsigned ReplSetImpl::_syncRollback(OperationContext* txn, OplogReader& oplogreader) {
verify(!lockedByMe());
verify(!Lock::isLocked());
sethbmsg("rollback 0");
- writelocktry lk(20000);
+ writelocktry lk(txn->lockState(), 20000);
if (!lk.got()) {
sethbmsg("rollback couldn't get write lock in a reasonable time");
return 2;
@@ -721,7 +722,7 @@ namespace repl {
}
catch (DBException& e) {
sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min");
- dbtemprelease release;
+ dbtemprelease release(txn->lockState());
sleepsecs(60);
throw;
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 2fd1aa5b0f4..ad9c8a5ebe7 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -72,7 +72,7 @@ namespace repl {
readlocks
@return true if transitioned to SECONDARY
*/
- bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OperationContext* txn, OpTime& /*out*/ minvalid) {
bool golive = false;
lock rsLock( this );
@@ -87,7 +87,7 @@ namespace repl {
return false;
}
- Lock::GlobalWrite writeLock;
+ Lock::GlobalWrite writeLock(txn->lockState());
// make sure we're not primary, secondary, rollback, or fatal already
if (box.getState().primary() || box.getState().secondary() ||
diff --git a/src/mongo/db/repl/sync.cpp b/src/mongo/db/repl/sync.cpp
index 8ca15ed9386..16e6225a1fb 100644
--- a/src/mongo/db/repl/sync.cpp
+++ b/src/mongo/db/repl/sync.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/diskloc.h"
#include "mongo/db/pdfile.h"
#include "mongo/db/repl/oplogreader.h"
-#include "mongo/db/operation_context_impl.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -106,16 +105,17 @@ namespace repl {
str::stream() << "Can no longer connect to initial sync source: " << hn);
}
- bool Sync::shouldRetry(const BSONObj& o) {
+ bool Sync::shouldRetry(OperationContext* txn, const BSONObj& o) {
+ invariant(txn->lockState()->hasAnyWriteLock());
+
// should already have write lock
const char *ns = o.getStringField("ns");
Client::Context ctx(ns);
- OperationContextImpl txn;
// we don't have the object yet, which is possible on initial sync. get it.
log() << "replication info adding missing object" << endl; // rare enough we can log
- BSONObj missingObj = getMissingDoc(&txn, ctx.db(), o);
+ BSONObj missingObj = getMissingDoc(txn, ctx.db(), o);
if( missingObj.isEmpty() ) {
log() << "replication missing object not found on source. presumably deleted later in oplog" << endl;
@@ -125,9 +125,10 @@ namespace repl {
return false;
}
else {
- Collection* collection = ctx.db()->getOrCreateCollection( &txn, ns );
- verify( collection ); // should never happen
- StatusWith<DiskLoc> result = collection->insertDocument( &txn, missingObj, true );
+ Collection* collection = ctx.db()->getOrCreateCollection(txn, ns);
+ invariant(collection != NULL); // should never happen
+
+ StatusWith<DiskLoc> result = collection->insertDocument(txn, missingObj, true);
uassert(15917,
str::stream() << "failed to insert missing doc: " << result.toString(),
result.isOK() );
diff --git a/src/mongo/db/repl/sync.h b/src/mongo/db/repl/sync.h
index 67cb5e63a60..cdda55f4f13 100644
--- a/src/mongo/db/repl/sync.h
+++ b/src/mongo/db/repl/sync.h
@@ -49,7 +49,7 @@ namespace repl {
/**
* If applyOperation_inlock should be called again after an update fails.
*/
- virtual bool shouldRetry(const BSONObj& o);
+ virtual bool shouldRetry(OperationContext* txn, const BSONObj& o);
void setHostname(const std::string& hostname);
};
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 92eea631595..07d6eab4243 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -78,7 +78,8 @@ namespace repl {
/* apply the log op that is in param o
@return bool success (true) or failure (false)
*/
- bool SyncTail::syncApply(const BSONObj &op, bool convertUpdateToUpsert) {
+ bool SyncTail::syncApply(
+ OperationContext* txn, const BSONObj &op, bool convertUpdateToUpsert) {
const char *ns = op.getStringField("ns");
verify(ns);
@@ -94,25 +95,24 @@ namespace repl {
bool isCommand(op["op"].valuestrsafe()[0] == 'c');
- OperationContextImpl txn;
boost::scoped_ptr<Lock::ScopedLock> lk;
if(isCommand) {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
- lk.reset(new Lock::GlobalWrite());
+ lk.reset(new Lock::GlobalWrite(txn->lockState()));
} else {
// DB level lock for this operation
- lk.reset(new Lock::DBWrite(txn.lockState(), ns));
+ lk.reset(new Lock::DBWrite(txn->lockState(), ns));
}
- Client::Context ctx(ns, storageGlobalParams.dbpath);
+ Client::Context ctx(ns);
ctx.getClient()->curop()->reset();
// For non-initial-sync, we convert updates to upserts
// to suppress errors when replaying oplog entries.
- bool ok = !applyOperation_inlock(&txn, ctx.db(), op, true, convertUpdateToUpsert);
+ bool ok = !applyOperation_inlock(txn, ctx.db(), op, true, convertUpdateToUpsert);
opsAppliedStats.increment();
- txn.recoveryUnit()->commitIfNeeded();
+ txn->recoveryUnit()->commitIfNeeded();
return ok;
}
@@ -325,7 +325,9 @@ namespace repl {
// become primary
if (!theReplSet->isSecondary()) {
OpTime minvalid;
- theReplSet->tryToGoLiveAsASecondary(minvalid);
+
+ OperationContextImpl txn;
+ theReplSet->tryToGoLiveAsASecondary(&txn, minvalid);
}
// normally msgCheckNewState gets called periodically, but in a single node
@@ -555,7 +557,8 @@ namespace repl {
it != ops.end();
++it) {
try {
- if (!st->syncApply(*it, convertUpdatesToUpserts)) {
+ OperationContextImpl txn;
+ if (!st->syncApply(&txn, *it, convertUpdatesToUpserts)) {
fassertFailedNoTrace(16359);
}
} catch (const DBException& e) {
@@ -573,15 +576,18 @@ namespace repl {
it != ops.end();
++it) {
try {
- if (!st->syncApply(*it)) {
+ OperationContextImpl txn;
+
+ if (!st->syncApply(&txn, *it)) {
bool status;
{
- Lock::GlobalWrite lk;
- status = st->shouldRetry(*it);
+ Lock::GlobalWrite lk(txn.lockState());
+ status = st->shouldRetry(&txn, *it);
}
+
if (status) {
// retry
- if (!st->syncApply(*it)) {
+ if (!st->syncApply(&txn, *it)) {
fassertFailedNoTrace(15915);
}
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 581a37498ae..ea9995aaed2 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -34,6 +34,9 @@
#include "mongo/db/repl/sync.h"
namespace mongo {
+
+ class OperationContext;
+
namespace repl {
class BackgroundSyncInterface;
@@ -46,7 +49,9 @@ namespace repl {
public:
SyncTail(BackgroundSyncInterface *q);
virtual ~SyncTail();
- virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false);
+ virtual bool syncApply(OperationContext* txn,
+ const BSONObj &o,
+ bool convertUpdateToUpsert = false);
/**
* Apply ops from applyGTEObj's ts to at least minValidObj's ts. Note that, due to