summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-04-23 09:17:57 -0400
committerScott Hernandez <scotthernandez@gmail.com>2015-07-06 17:20:53 -0400
commit40410b820cd93e54fb7b42d37146b2d930334bbb (patch)
tree65f4d3c42da2c19a5d0fc26419d9ed26664728a8
parentf31038b98941bdc72c13449183854a690fd20653 (diff)
downloadmongo-40410b820cd93e54fb7b42d37146b2d930334bbb.tar.gz
SERVER-17689: handle wce better in initial sync
(cherry picked from commit 57232d88b869b0741d95998f8ed09a96b2e7a1bc)
-rw-r--r--src/mongo/db/catalog/database.cpp16
-rw-r--r--src/mongo/db/cloner.cpp92
-rw-r--r--src/mongo/db/concurrency/write_conflict_exception.h2
-rw-r--r--src/mongo/db/ops/update.cpp47
-rw-r--r--src/mongo/db/repl/bgsync.cpp19
-rw-r--r--src/mongo/db/repl/minvalid.cpp62
-rw-r--r--src/mongo/db/repl/oplog.cpp101
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp49
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp13
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp12
-rw-r--r--src/mongo/db/repl/sync.cpp70
11 files changed, 267 insertions, 216 deletions
diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp
index 726032a46c0..e81a29826d3 100644
--- a/src/mongo/db/catalog/database.cpp
+++ b/src/mongo/db/catalog/database.cpp
@@ -559,10 +559,18 @@ namespace mongo {
for (vector<string>::iterator i = n.begin(); i != n.end(); i++) {
if (*i != "local") {
- Database* db = dbHolder().get(txn, *i);
- invariant(db);
-
- dropDatabase(txn, db);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ Database* db = dbHolder().get(txn, *i);
+ // This is needed since dropDatabase can't be rolled back.
+ // This is safe be replaced by "invariant(db);dropDatabase(txn, db);" once fixed
+ if (db == nullptr) {
+ log() << "database disappeared after listDatabases but before drop: " << *i;
+ } else {
+ dropDatabase(txn, db);
+ }
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
+ "dropAllDatabasesExceptLocal",
+ *i);
}
}
}
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index baab89daf01..4ed449487fb 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/copydb.h"
#include "mongo/db/commands/rename_collection.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index_builder.h"
@@ -145,17 +146,20 @@ namespace mongo {
<< "collection dropped during clone ["
<< to_collection.ns() << "]",
!createdCollection );
- WriteUnitOfWork wunit(txn);
- collection = db->createCollection(txn, to_collection.ns());
- verify(collection);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- if (logForRepl) {
- repl::logOp(txn,
- "c",
- (_dbName + ".$cmd").c_str(),
- BSON("create" << to_collection.coll()));
- }
- wunit.commit();
+ WriteUnitOfWork wunit(txn);
+ collection = db->createCollection(txn, to_collection.ns());
+ verify(collection);
+
+ if (logForRepl) {
+ repl::logOp(txn,
+ "c",
+ (_dbName + ".$cmd").c_str(),
+ BSON("create" << to_collection.coll()));
+ }
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns());
}
while( i.moreInCurrentBatch() ) {
@@ -221,21 +225,22 @@ namespace mongo {
}
++numSeen;
- WriteUnitOfWork wunit(txn);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
- BSONObj js = tmp;
+ BSONObj js = tmp;
- StatusWith<RecordId> loc = collection->insertDocument( txn, js, true );
- if ( !loc.isOK() ) {
- error() << "error: exception cloning object in " << from_collection
- << ' ' << loc.toString() << " obj:" << js;
- }
- uassertStatusOK( loc.getStatus() );
- if (logForRepl)
- repl::logOp(txn, "i", to_collection.ns().c_str(), js);
-
- wunit.commit();
+ StatusWith<RecordId> loc = collection->insertDocument( txn, js, true );
+ if ( !loc.isOK() ) {
+ error() << "error: exception cloning object in " << from_collection
+ << ' ' << loc.toString() << " obj:" << js;
+ }
+ uassertStatusOK( loc.getStatus() );
+ if (logForRepl)
+ repl::logOp(txn, "i", to_collection.ns().c_str(), js);
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "cloner insert", to_collection.ns());
RARELY if ( time( 0 ) - saveLast > 60 ) {
log() << numSeen << " objects cloned so far from collection " << from_collection;
saveLast = time( 0 );
@@ -335,16 +340,18 @@ namespace mongo {
Collection* collection = db->getCollection( to_collection );
if ( !collection ) {
- WriteUnitOfWork wunit(txn);
- collection = db->createCollection( txn, to_collection.ns() );
- invariant(collection);
- if (logForRepl) {
- repl::logOp(txn,
- "c",
- (toDBName + ".$cmd").c_str(),
- BSON("create" << to_collection.coll()));
- }
- wunit.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ collection = db->createCollection( txn, to_collection.ns() );
+ invariant(collection);
+ if (logForRepl) {
+ repl::logOp(txn,
+ "c",
+ (toDBName + ".$cmd").c_str(),
+ BSON("create" << to_collection.coll()));
+ }
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns());
}
// TODO pass the MultiIndexBlock when inserting into the collection rather than building the
@@ -406,13 +413,15 @@ namespace mongo {
invariant(collList.size() <= 1);
BSONObj col = collList.front();
if (col["options"].isABSONObj()) {
- WriteUnitOfWork wunit(txn);
- Status status = userCreateNS(txn, db, ns, col["options"].Obj(), logForRepl, 0);
- if ( !status.isOK() ) {
- errmsg = status.toString();
- return false;
- }
- wunit.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ Status status = userCreateNS(txn, db, ns, col["options"].Obj(), logForRepl, 0);
+ if ( !status.isOK() ) {
+ errmsg = status.toString();
+ return false;
+ }
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns);
}
}
@@ -583,7 +592,7 @@ namespace mongo {
Database* db = dbHolder().openDb(txn, toDBName);
- {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
// we defer building id index for performance - building it in batch is much
@@ -600,9 +609,8 @@ namespace mongo {
<< createStatus.reason();
return false;
}
-
wunit.commit();
- }
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_name.ns());
LOG(1) << "\t\t cloning " << from_name << " -> " << to_name << endl;
Query q;
diff --git a/src/mongo/db/concurrency/write_conflict_exception.h b/src/mongo/db/concurrency/write_conflict_exception.h
index 63a4645bffc..2f369beebea 100644
--- a/src/mongo/db/concurrency/write_conflict_exception.h
+++ b/src/mongo/db/concurrency/write_conflict_exception.h
@@ -45,7 +45,7 @@
continue; \
} \
break; \
- } while (true); } while (false)
+ } while (true); } while (false);
namespace mongo {
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index 9aeb9946a45..6b8e3cff2a4 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/exec/update.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/update_driver.h"
@@ -74,28 +75,30 @@ namespace mongo {
locker->isLockHeldForMode(ResourceId(RESOURCE_DATABASE, nsString.db()),
MODE_X));
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), nsString.db(), MODE_X);
-
- if (!request.isFromReplication() &&
- !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsString.db())) {
- uassertStatusOK(Status(ErrorCodes::NotMaster, str::stream()
- << "Not primary while creating collection " << nsString.ns()
- << " during upsert"));
- }
-
- WriteUnitOfWork wuow(txn);
- collection = db->createCollection(txn, nsString.ns());
- invariant(collection);
-
- if (!request.isFromReplication()) {
- repl::logOp(txn,
- "c",
- (db->name() + ".$cmd").c_str(),
- BSON("create" << (nsString.coll())));
- }
- wuow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), nsString.db(), MODE_X);
+
+ if (!request.isFromReplication() &&
+ !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
+ nsString.db())) {
+ uassertStatusOK(Status(ErrorCodes::NotMaster, str::stream()
+ << "Not primary while creating collection " << nsString.ns()
+ << " during upsert"));
+ }
+
+ WriteUnitOfWork wuow(txn);
+ collection = db->createCollection(txn, nsString.ns());
+ invariant(collection);
+
+ if (!request.isFromReplication()) {
+ repl::logOp(txn,
+ "c",
+ (db->name() + ".$cmd").c_str(),
+ BSON("create" << (nsString.coll())));
+ }
+ wuow.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nsString.ns());
}
// Parse the update, get an executor for it, run the executor, get stats out.
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 6e26475642f..6c8eba2199e 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/oplog.h"
@@ -499,14 +500,16 @@ namespace {
long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
BSONObj oplogEntry;
try {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- bool success = Helpers::getLast(txn, rsoplog, oplogEntry);
- if (!success) {
- // This can happen when we are to do an initial sync. lastHash will be set
- // after the initial sync is complete.
- return 0;
- }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+ bool success = Helpers::getLast(txn, rsoplog, oplogEntry);
+ if (!success) {
+ // This can happen when we are to do an initial sync. lastHash will be set
+ // after the initial sync is complete.
+ return 0;
+ }
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsoplog);
}
catch (const DBException& ex) {
severe() << "Problem reading " << rsoplog << ": " << ex.toStatus();
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index 0005f364963..18235cc178c 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -34,6 +34,7 @@
#include "mongo/bson/optime.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
@@ -50,45 +51,54 @@ namespace {
} // namespace
void clearInitialSyncFlag(OperationContext* txn) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag));
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+ Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag));
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS);
}
void setInitialSyncFlag(OperationContext* txn) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag));
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+ Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag));
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS);
}
bool getInitialSyncFlag() {
OperationContextImpl txn;
- ScopedTransaction transaction(&txn, MODE_IX);
- Lock::DBLock lk(txn.lockState(), "local", MODE_X);
- BSONObj mv;
- bool found = Helpers::getSingleton( &txn, minvalidNS, mv);
-
- if (found) {
- return mv[initialSyncFlagString].trueValue();
- }
- return false;
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(&txn, MODE_IX);
+ Lock::DBLock lk(txn.lockState(), "local", MODE_X);
+ BSONObj mv;
+ bool found = Helpers::getSingleton( &txn, minvalidNS, mv);
+ if (found) {
+ return mv[initialSyncFlagString].trueValue();
+ }
+ return false;
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS);
}
void setMinValid(OperationContext* ctx, OpTime ts) {
- ScopedTransaction transaction(ctx, MODE_IX);
- Lock::DBLock lk(ctx->lockState(), "local", MODE_X);
- Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts)));
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(ctx, MODE_IX);
+ Lock::DBLock lk(ctx->lockState(), "local", MODE_X);
+ Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts)));
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS);
}
OpTime getMinValid(OperationContext* txn) {
- ScopedTransaction transaction(txn, MODE_IS);
- Lock::DBLock lk(txn->lockState(), "local", MODE_S);
- BSONObj mv;
- bool found = Helpers::getSingleton(txn, minvalidNS, mv);
- if (found) {
- return mv["ts"]._opTime();
- }
- return OpTime();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IS);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_S);
+ BSONObj mv;
+ bool found = Helpers::getSingleton(txn, minvalidNS, mv);
+ if (found) {
+ return mv["ts"]._opTime();
+ }
+ return OpTime();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS);
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 04e1b10adc0..53757442e49 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -382,62 +382,57 @@ namespace {
OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- OpTime lastOptime = replCoord->getMyLastOptime();
- invariant(!ops.empty());
-
- while (1) {
- try {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
-
- if ( localOplogRSCollection == 0 ) {
- Client::Context ctx(txn, rsoplog);
-
- localDB = ctx.db();
- verify( localDB );
- localOplogRSCollection = localDB->getCollection(rsoplog);
- massert(13389,
- "local.oplog.rs missing. did you drop it? if so restart server",
- localOplogRSCollection);
- }
+ OpTime lastOptime;
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ lastOptime = replCoord->getMyLastOptime();
+ invariant(!ops.empty());
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+
+ if ( localOplogRSCollection == 0 ) {
+ Client::Context ctx(txn, rsoplog);
+
+ localDB = ctx.db();
+ verify( localDB );
+ localOplogRSCollection = localDB->getCollection(rsoplog);
+ massert(13389,
+ "local.oplog.rs missing. did you drop it? if so restart server",
+ localOplogRSCollection);
+ }
- Client::Context ctx(txn, rsoplog, localDB);
- WriteUnitOfWork wunit(txn);
+ Client::Context ctx(txn, rsoplog, localDB);
+ WriteUnitOfWork wunit(txn);
- for (std::deque<BSONObj>::const_iterator it = ops.begin();
- it != ops.end();
- ++it) {
- const BSONObj& op = *it;
- const OpTime ts = op["ts"]._opTime();
+ for (std::deque<BSONObj>::const_iterator it = ops.begin();
+ it != ops.end();
+ ++it) {
+ const BSONObj& op = *it;
+ const OpTime ts = op["ts"]._opTime();
- checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
+ checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
- if (!(lastOptime < ts)) {
- severe() << "replication oplog stream went back in time. "
- "previous timestamp: " << lastOptime << " newest timestamp: " << ts
- << ". Op being applied: " << op;
- fassertFailedNoTrace(18905);
- }
- lastOptime = ts;
+ if (!(lastOptime < ts)) {
+ severe() << "replication oplog stream went back in time. "
+ "previous timestamp: " << lastOptime << " newest timestamp: " << ts
+ << ". Op being applied: " << op;
+ fassertFailedNoTrace(18905);
}
- wunit.commit();
+ lastOptime = ts;
+ }
+ wunit.commit();
- BackgroundSync* bgsync = BackgroundSync::get();
- // Keep this up-to-date, in case we step up to primary.
- long long hash = ops.back()["h"].numberLong();
- bgsync->setLastAppliedHash(hash);
+ BackgroundSync* bgsync = BackgroundSync::get();
+ // Keep this up-to-date, in case we step up to primary.
+ long long hash = ops.back()["h"].numberLong();
+ bgsync->setLastAppliedHash(hash);
- ctx.getClient()->setLastOp(lastOptime);
+ ctx.getClient()->setLastOp(lastOptime);
- replCoord->setMyLastOptime(lastOptime);
- setNewOptime(lastOptime);
+ replCoord->setMyLastOptime(lastOptime);
+ setNewOptime(lastOptime);
- return lastOptime;
- }
- catch (const WriteConflictException& wce) {
- log() << "WriteConflictException while writing oplog, retrying.";
- }
- }
+ return lastOptime;
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "write oplog entry", rsoplog);
}
void createOplog(OperationContext* txn) {
@@ -510,11 +505,13 @@ namespace {
options.cappedSize = sz;
options.autoIndexId = CollectionOptions::NO;
- WriteUnitOfWork uow( txn );
- invariant(ctx.db()->createCollection(txn, ns, options));
- if( !rs )
- logOp(txn, "n", "", BSONObj() );
- uow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork uow( txn );
+ invariant(ctx.db()->createCollection(txn, ns, options));
+ if( !rs )
+ logOp(txn, "n", "", BSONObj() );
+ uow.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns);
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 6dbce6e6945..e2472bd3406 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/jsobj.h"
@@ -111,12 +112,13 @@ namespace {
void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) {
createOplog(txn);
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::GlobalWrite globalWrite(txn->lockState());
-
- WriteUnitOfWork wuow(txn);
- logOpInitiate(txn, BSON("msg" << "initiating set"));
- wuow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(txn, MODE_X);
+ Lock::GlobalWrite globalWrite(txn->lockState());
+ WriteUnitOfWork wuow(txn);
+ logOpInitiate(txn, BSON("msg" << "initiating set"));
+ wuow.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() {
@@ -136,6 +138,7 @@ namespace {
BSONObj me;
// local.me is an identifier for a server for getLastError w:2+
+ // TODO: handle WriteConflictExceptions below
if (!Helpers::getSingleton(txn, meCollectionName, me) ||
!me.hasField("host") ||
me["host"].String() != myname) {
@@ -160,14 +163,18 @@ namespace {
StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument(
OperationContext* txn) {
try {
- BSONObj config;
- if (!Helpers::getSingleton(txn, configCollectionName, config)) {
- return StatusWith<BSONObj>(
- ErrorCodes::NoMatchingDocument,
- str::stream() << "Did not find replica set configuration document in " <<
- configCollectionName);
- }
- return StatusWith<BSONObj>(config);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ BSONObj config;
+ if (!Helpers::getSingleton(txn, configCollectionName, config)) {
+ return StatusWith<BSONObj>(
+ ErrorCodes::NoMatchingDocument,
+ str::stream() << "Did not find replica set configuration document in "
+ << configCollectionName);
+ }
+ return StatusWith<BSONObj>(config);
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
+ "load replica set config",
+ configCollectionName);
}
catch (const DBException& ex) {
return StatusWith<BSONObj>(ex.toStatus());
@@ -178,14 +185,19 @@ namespace {
OperationContext* txn,
const BSONObj& config) {
try {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X);
- Helpers::putSingleton(txn, configCollectionName, config);
- return Status::OK();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X);
+ Helpers::putSingleton(txn, configCollectionName, config);
+ return Status::OK();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
+ "save replica set config",
+ configCollectionName);
}
catch (const DBException& ex) {
return ex.toStatus();
}
+
}
void ReplicationCoordinatorExternalStateImpl::setGlobalOpTime(const OpTime& newTime) {
@@ -195,6 +207,7 @@ namespace {
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* txn) {
+ // TODO: handle WriteConflictExceptions below
try {
BSONObj oplogEntry;
if (!Helpers::getLast(txn, rsoplog, oplogEntry)) {
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index f95a28d9a15..536b53b9f2b 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client.h"
#include "mongo/db/cloner.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/operation_context_impl.h"
@@ -96,10 +97,12 @@ namespace {
// Truncate the oplog in case there was a prior initial sync that failed.
Collection* collection = autoDb.getDb()->getCollection(rsoplog);
fassert(28565, collection);
- WriteUnitOfWork wunit(txn);
- Status status = collection->truncate(txn);
- fassert(28564, status);
- wunit.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ Status status = collection->truncate(txn);
+ fassert(28564, status);
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns());
}
/**
@@ -478,13 +481,11 @@ namespace {
// Initial sync is now complete. Flag this by setting minValid to the last thing
// we synced.
- WriteUnitOfWork wunit(&txn);
setMinValid(&txn, lastOpTimeWritten);
// Clear the initial sync flag.
clearInitialSyncFlag(&txn);
BackgroundSync::get()->setInitialSyncRequestedFlag(false);
- wunit.commit();
}
// If we just cloned & there were no ops applied, we still want the primary to know where
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 3f523b03e19..93740b37827 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/client.h"
#include "mongo/db/cloner.h"
#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/operation_context_impl.h"
@@ -656,9 +657,14 @@ namespace {
catch (DBException& e) {
if (e.getCode() == 13415) {
// hack: need to just make cappedTruncate do this...
- WriteUnitOfWork wunit(txn);
- uassertStatusOK(collection->truncate(txn));
- wunit.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ uassertStatusOK(collection->truncate(txn));
+ wunit.commit();
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn,
+ "truncate",
+ collection->ns().ns());
}
else {
throw e;
diff --git a/src/mongo/db/repl/sync.cpp b/src/mongo/db/repl/sync.cpp
index bdfc676a63f..bc7d86c8d31 100644
--- a/src/mongo/db/repl/sync.cpp
+++ b/src/mongo/db/repl/sync.cpp
@@ -37,7 +37,10 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/util/assert_util.h"
@@ -114,40 +117,39 @@ namespace repl {
bool Sync::shouldRetry(OperationContext* txn, const BSONObj& o) {
const NamespaceString nss(o.getStringField("ns"));
-
- // Take an X lock on the database in order to preclude other modifications. Also, the
- // database might not exist yet, so create it.
- AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X);
- Database* const db = autoDb.getDb();
-
- // we don't have the object yet, which is possible on initial sync. get it.
- log() << "replication info adding missing object" << endl; // rare enough we can log
-
- BSONObj missingObj = getMissingDoc(txn, db, o);
-
- if( missingObj.isEmpty() ) {
- log() << "replication missing object not found on source. presumably deleted later in oplog" << endl;
- log() << "replication o2: " << o.getObjectField("o2").toString() << endl;
- log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
-
- return false;
- }
- else {
- WriteUnitOfWork wunit(txn);
-
- Collection* const collection = db->getOrCreateCollection(txn, nss.toString());
- invariant(collection);
-
- StatusWith<RecordId> result = collection->insertDocument(txn, missingObj, true);
- uassert(15917,
- str::stream() << "failed to insert missing doc: " << result.toString(),
- result.isOK() );
-
- LOG(1) << "replication inserted missing doc: " << missingObj.toString() << endl;
-
- wunit.commit();
- return true;
- }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ // Take an X lock on the database in order to preclude other modifications.
+ // Also, the database might not exist yet, so create it.
+ AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X);
+ Database* const db = autoDb.getDb();
+
+ // we don't have the object yet, which is possible on initial sync. get it.
+ log() << "adding missing object" << endl; // rare enough we can log
+ BSONObj missingObj = getMissingDoc(txn, db, o);
+
+ if( missingObj.isEmpty() ) {
+ log() << "missing object not found on source."
+ " presumably deleted later in oplog";
+ log() << "o2: " << o.getObjectField("o2").toString();
+ log() << "o firstfield: " << o.getObjectField("o").firstElementFieldName();
+ return false;
+ }
+ else {
+ WriteUnitOfWork wunit(txn);
+
+ Collection* const coll = db->getOrCreateCollection(txn, nss.toString());
+ invariant(coll);
+
+ StatusWith<RecordId> result = coll->insertDocument(txn, missingObj, true);
+ uassert(15917,
+ str::stream() << "failed to insert missing doc: "
+ << result.getStatus().toString(),
+ result.isOK() );
+ LOG(1) << "inserted missing doc: " << missingObj.toString() << endl;
+ wunit.commit();
+ return true;
+ }
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "InsertRetry", nss.ns());
}
} // namespace repl