diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2015-04-23 09:17:57 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2015-07-06 17:20:53 -0400 |
commit | 40410b820cd93e54fb7b42d37146b2d930334bbb (patch) | |
tree | 65f4d3c42da2c19a5d0fc26419d9ed26664728a8 | |
parent | f31038b98941bdc72c13449183854a690fd20653 (diff) | |
download | mongo-40410b820cd93e54fb7b42d37146b2d930334bbb.tar.gz |
SERVER-17689: handle wce better in initial sync
(cherry picked from commit 57232d88b869b0741d95998f8ed09a96b2e7a1bc)
-rw-r--r-- | src/mongo/db/catalog/database.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/cloner.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/concurrency/write_conflict_exception.h | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/update.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/minvalid.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/sync.cpp | 70 |
11 files changed, 267 insertions, 216 deletions
diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp index 726032a46c0..e81a29826d3 100644 --- a/src/mongo/db/catalog/database.cpp +++ b/src/mongo/db/catalog/database.cpp @@ -559,10 +559,18 @@ namespace mongo { for (vector<string>::iterator i = n.begin(); i != n.end(); i++) { if (*i != "local") { - Database* db = dbHolder().get(txn, *i); - invariant(db); - - dropDatabase(txn, db); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + Database* db = dbHolder().get(txn, *i); + // This is needed since dropDatabase can't be rolled back. + // This is safe be replaced by "invariant(db);dropDatabase(txn, db);" once fixed + if (db == nullptr) { + log() << "database disappeared after listDatabases but before drop: " << *i; + } else { + dropDatabase(txn, db); + } + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, + "dropAllDatabasesExceptLocal", + *i); } } } diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index baab89daf01..4ed449487fb 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -48,6 +48,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/copydb.h" #include "mongo/db/commands/rename_collection.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/index_builder.h" @@ -145,17 +146,20 @@ namespace mongo { << "collection dropped during clone [" << to_collection.ns() << "]", !createdCollection ); - WriteUnitOfWork wunit(txn); - collection = db->createCollection(txn, to_collection.ns()); - verify(collection); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - if (logForRepl) { - repl::logOp(txn, - "c", - (_dbName + ".$cmd").c_str(), - BSON("create" << to_collection.coll())); - } - wunit.commit(); + WriteUnitOfWork wunit(txn); + collection = db->createCollection(txn, to_collection.ns()); + verify(collection); + + if (logForRepl) { + repl::logOp(txn, + "c", + (_dbName + ".$cmd").c_str(), + BSON("create" << to_collection.coll())); + } + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns()); } while( i.moreInCurrentBatch() ) { @@ -221,21 +225,22 @@ namespace mongo { } ++numSeen; - WriteUnitOfWork wunit(txn); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); - BSONObj js = tmp; + BSONObj js = tmp; - StatusWith<RecordId> loc = collection->insertDocument( txn, js, true ); - if ( !loc.isOK() ) { - error() << "error: exception cloning object in " << from_collection - << ' ' << loc.toString() << " obj:" << js; - } - uassertStatusOK( loc.getStatus() ); - if (logForRepl) - repl::logOp(txn, "i", to_collection.ns().c_str(), js); - - wunit.commit(); + StatusWith<RecordId> loc = collection->insertDocument( txn, js, true ); + if ( !loc.isOK() ) { + error() << "error: exception cloning object in " << from_collection + << ' ' << loc.toString() << " obj:" << js; + } + uassertStatusOK( loc.getStatus() ); + if (logForRepl) + repl::logOp(txn, "i", to_collection.ns().c_str(), js); + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "cloner insert", to_collection.ns()); RARELY if ( time( 0 ) - saveLast > 60 ) { log() << numSeen << " objects cloned so far from collection " << from_collection; saveLast = time( 0 ); @@ -335,16 +340,18 @@ namespace mongo { Collection* collection = db->getCollection( to_collection ); if ( !collection ) { - WriteUnitOfWork wunit(txn); - collection = db->createCollection( txn, to_collection.ns() ); - invariant(collection); - if (logForRepl) { - repl::logOp(txn, - "c", - (toDBName + ".$cmd").c_str(), - BSON("create" << to_collection.coll())); - } - wunit.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + collection = db->createCollection( txn, to_collection.ns() ); + invariant(collection); + if (logForRepl) { + repl::logOp(txn, + "c", + (toDBName + ".$cmd").c_str(), + BSON("create" << to_collection.coll())); + } + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_collection.ns()); } // TODO pass the MultiIndexBlock when inserting into the collection rather than building the @@ -406,13 +413,15 @@ namespace mongo { invariant(collList.size() <= 1); BSONObj col = collList.front(); if (col["options"].isABSONObj()) { - WriteUnitOfWork wunit(txn); - Status status = userCreateNS(txn, db, ns, col["options"].Obj(), logForRepl, 0); - if ( !status.isOK() ) { - errmsg = status.toString(); - return false; - } - wunit.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + Status status = userCreateNS(txn, db, ns, col["options"].Obj(), logForRepl, 0); + if ( !status.isOK() ) { + errmsg = status.toString(); + return false; + } + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns); } } @@ -583,7 +592,7 @@ namespace mongo { Database* db = dbHolder().openDb(txn, toDBName); - { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); // we defer building id index for performance - building it in batch is much @@ -600,9 +609,8 @@ namespace mongo { << createStatus.reason(); return false; } - wunit.commit(); - } + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", to_name.ns()); LOG(1) << "\t\t cloning " << from_name << " -> " << to_name << endl; Query q; diff --git a/src/mongo/db/concurrency/write_conflict_exception.h b/src/mongo/db/concurrency/write_conflict_exception.h index 63a4645bffc..2f369beebea 100644 --- a/src/mongo/db/concurrency/write_conflict_exception.h +++ b/src/mongo/db/concurrency/write_conflict_exception.h @@ -45,7 +45,7 @@ continue; \ } \ break; \ - } while (true); } while (false) + } while (true); } while (false); namespace mongo { diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index 9aeb9946a45..6b8e3cff2a4 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/clientcursor.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/update.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/update_driver.h" @@ -74,28 +75,30 @@ namespace mongo { locker->isLockHeldForMode(ResourceId(RESOURCE_DATABASE, nsString.db()), MODE_X)); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), nsString.db(), MODE_X); - - if (!request.isFromReplication() && - !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - nsString.db())) { - uassertStatusOK(Status(ErrorCodes::NotMaster, str::stream() - << "Not primary while creating collection " << nsString.ns() - << " during upsert")); - } - - WriteUnitOfWork wuow(txn); - collection = db->createCollection(txn, nsString.ns()); - invariant(collection); - - if (!request.isFromReplication()) { - repl::logOp(txn, - "c", - (db->name() + ".$cmd").c_str(), - BSON("create" << (nsString.coll()))); - } - wuow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), nsString.db(), MODE_X); + + if (!request.isFromReplication() && + !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( + nsString.db())) { + uassertStatusOK(Status(ErrorCodes::NotMaster, str::stream() + << "Not primary while creating collection " << nsString.ns() + << " during upsert")); + } + + WriteUnitOfWork wuow(txn); + collection = db->createCollection(txn, nsString.ns()); + invariant(collection); + + if (!request.isFromReplication()) { + repl::logOp(txn, + "c", + (db->name() + ".$cmd").c_str(), + BSON("create" << (nsString.coll()))); + } + wuow.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", nsString.ns()); } // Parse the update, get an executor for it, run the executor, get stats out. diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 6e26475642f..6c8eba2199e 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -37,6 +37,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/oplog.h" @@ -499,14 +500,16 @@ namespace { long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { BSONObj oplogEntry; try { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - bool success = Helpers::getLast(txn, rsoplog, oplogEntry); - if (!success) { - // This can happen when we are to do an initial sync. lastHash will be set - // after the initial sync is complete. - return 0; - } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + bool success = Helpers::getLast(txn, rsoplog, oplogEntry); + if (!success) { + // This can happen when we are to do an initial sync. lastHash will be set + // after the initial sync is complete. + return 0; + } + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsoplog); } catch (const DBException& ex) { severe() << "Problem reading " << rsoplog << ": " << ex.toStatus(); diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index 0005f364963..18235cc178c 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/optime.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" @@ -50,45 +51,54 @@ namespace { } // namespace void clearInitialSyncFlag(OperationContext* txn) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag)); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag)); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS); } void setInitialSyncFlag(OperationContext* txn) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag)); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag)); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS); } bool getInitialSyncFlag() { OperationContextImpl txn; - ScopedTransaction transaction(&txn, MODE_IX); - Lock::DBLock lk(txn.lockState(), "local", MODE_X); - BSONObj mv; - bool found = Helpers::getSingleton( &txn, minvalidNS, mv); - - if (found) { - return mv[initialSyncFlagString].trueValue(); - } - return false; + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(&txn, MODE_IX); + Lock::DBLock lk(txn.lockState(), "local", MODE_X); + BSONObj mv; + bool found = Helpers::getSingleton( &txn, minvalidNS, mv); + if (found) { + return mv[initialSyncFlagString].trueValue(); + } + return false; + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS); } void setMinValid(OperationContext* ctx, OpTime ts) { - ScopedTransaction transaction(ctx, MODE_IX); - Lock::DBLock lk(ctx->lockState(), "local", MODE_X); - Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts))); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(ctx, MODE_IX); + Lock::DBLock lk(ctx->lockState(), "local", MODE_X); + Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts))); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS); } OpTime getMinValid(OperationContext* txn) { - ScopedTransaction transaction(txn, MODE_IS); - Lock::DBLock lk(txn->lockState(), "local", MODE_S); - BSONObj mv; - bool found = Helpers::getSingleton(txn, minvalidNS, mv); - if (found) { - return mv["ts"]._opTime(); - } - return OpTime(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IS); + Lock::DBLock lk(txn->lockState(), "local", MODE_S); + BSONObj mv; + bool found = Helpers::getSingleton(txn, minvalidNS, mv); + if (found) { + return mv["ts"]._opTime(); + } + return OpTime(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 04e1b10adc0..53757442e49 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -382,62 +382,57 @@ namespace { OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - OpTime lastOptime = replCoord->getMyLastOptime(); - invariant(!ops.empty()); - - while (1) { - try { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - - if ( localOplogRSCollection == 0 ) { - Client::Context ctx(txn, rsoplog); - - localDB = ctx.db(); - verify( localDB ); - localOplogRSCollection = localDB->getCollection(rsoplog); - massert(13389, - "local.oplog.rs missing. did you drop it? if so restart server", - localOplogRSCollection); - } + OpTime lastOptime; + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + lastOptime = replCoord->getMyLastOptime(); + invariant(!ops.empty()); + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + + if ( localOplogRSCollection == 0 ) { + Client::Context ctx(txn, rsoplog); + + localDB = ctx.db(); + verify( localDB ); + localOplogRSCollection = localDB->getCollection(rsoplog); + massert(13389, + "local.oplog.rs missing. did you drop it? if so restart server", + localOplogRSCollection); + } - Client::Context ctx(txn, rsoplog, localDB); - WriteUnitOfWork wunit(txn); + Client::Context ctx(txn, rsoplog, localDB); + WriteUnitOfWork wunit(txn); - for (std::deque<BSONObj>::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - const BSONObj& op = *it; - const OpTime ts = op["ts"]._opTime(); + for (std::deque<BSONObj>::const_iterator it = ops.begin(); + it != ops.end(); + ++it) { + const BSONObj& op = *it; + const OpTime ts = op["ts"]._opTime(); - checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); + checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); - if (!(lastOptime < ts)) { - severe() << "replication oplog stream went back in time. " - "previous timestamp: " << lastOptime << " newest timestamp: " << ts - << ". Op being applied: " << op; - fassertFailedNoTrace(18905); - } - lastOptime = ts; + if (!(lastOptime < ts)) { + severe() << "replication oplog stream went back in time. " + "previous timestamp: " << lastOptime << " newest timestamp: " << ts + << ". Op being applied: " << op; + fassertFailedNoTrace(18905); } - wunit.commit(); + lastOptime = ts; + } + wunit.commit(); - BackgroundSync* bgsync = BackgroundSync::get(); - // Keep this up-to-date, in case we step up to primary. - long long hash = ops.back()["h"].numberLong(); - bgsync->setLastAppliedHash(hash); + BackgroundSync* bgsync = BackgroundSync::get(); + // Keep this up-to-date, in case we step up to primary. + long long hash = ops.back()["h"].numberLong(); + bgsync->setLastAppliedHash(hash); - ctx.getClient()->setLastOp(lastOptime); + ctx.getClient()->setLastOp(lastOptime); - replCoord->setMyLastOptime(lastOptime); - setNewOptime(lastOptime); + replCoord->setMyLastOptime(lastOptime); + setNewOptime(lastOptime); - return lastOptime; - } - catch (const WriteConflictException& wce) { - log() << "WriteConflictException while writing oplog, retrying."; - } - } + return lastOptime; + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "write oplog entry", rsoplog); } void createOplog(OperationContext* txn) { @@ -510,11 +505,13 @@ namespace { options.cappedSize = sz; options.autoIndexId = CollectionOptions::NO; - WriteUnitOfWork uow( txn ); - invariant(ctx.db()->createCollection(txn, ns, options)); - if( !rs ) - logOp(txn, "n", "", BSONObj() ); - uow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork uow( txn ); + invariant(ctx.db()->createCollection(txn, ns, options)); + if( !rs ) + logOp(txn, "n", "", BSONObj() ); + uow.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", ns); /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 6dbce6e6945..e2472bd3406 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_environment_experiment.h" #include "mongo/db/jsobj.h" @@ -111,12 +112,13 @@ namespace { void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) { createOplog(txn); - ScopedTransaction scopedXact(txn, MODE_X); - Lock::GlobalWrite globalWrite(txn->lockState()); - - WriteUnitOfWork wuow(txn); - logOpInitiate(txn, BSON("msg" << "initiating set")); - wuow.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(txn, MODE_X); + Lock::GlobalWrite globalWrite(txn->lockState()); + WriteUnitOfWork wuow(txn); + logOpInitiate(txn, BSON("msg" << "initiating set")); + wuow.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); } void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() { @@ -136,6 +138,7 @@ namespace { BSONObj me; // local.me is an identifier for a server for getLastError w:2+ + // TODO: handle WriteConflictExceptions below if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") || me["host"].String() != myname) { @@ -160,14 +163,18 @@ namespace { StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( OperationContext* txn) { try { - BSONObj config; - if (!Helpers::getSingleton(txn, configCollectionName, config)) { - return StatusWith<BSONObj>( - ErrorCodes::NoMatchingDocument, - str::stream() << "Did not find replica set configuration document in " << - configCollectionName); - } - return StatusWith<BSONObj>(config); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + BSONObj config; + if (!Helpers::getSingleton(txn, configCollectionName, config)) { + return StatusWith<BSONObj>( + ErrorCodes::NoMatchingDocument, + str::stream() << "Did not find replica set configuration document in " + << configCollectionName); + } + return StatusWith<BSONObj>(config); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, + "load replica set config", + configCollectionName); } catch (const DBException& ex) { return StatusWith<BSONObj>(ex.toStatus()); @@ -178,14 +185,19 @@ namespace { OperationContext* txn, const BSONObj& config) { try { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); - Helpers::putSingleton(txn, configCollectionName, config); - return Status::OK(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); + Helpers::putSingleton(txn, configCollectionName, config); + return Status::OK(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, + "save replica set config", + configCollectionName); } catch (const DBException& ex) { return ex.toStatus(); } + } void ReplicationCoordinatorExternalStateImpl::setGlobalOpTime(const OpTime& newTime) { @@ -195,6 +207,7 @@ namespace { StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* txn) { + // TODO: handle WriteConflictExceptions below try { BSONObj oplogEntry; if (!Helpers::getLast(txn, rsoplog, oplogEntry)) { diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index f95a28d9a15..536b53b9f2b 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/client.h" #include "mongo/db/cloner.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/operation_context_impl.h" @@ -96,10 +97,12 @@ namespace { // Truncate the oplog in case there was a prior initial sync that failed. Collection* collection = autoDb.getDb()->getCollection(rsoplog); fassert(28565, collection); - WriteUnitOfWork wunit(txn); - Status status = collection->truncate(txn); - fassert(28564, status); - wunit.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + Status status = collection->truncate(txn); + fassert(28564, status); + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns()); } /** @@ -478,13 +481,11 @@ namespace { // Initial sync is now complete. Flag this by setting minValid to the last thing // we synced. - WriteUnitOfWork wunit(&txn); setMinValid(&txn, lastOpTimeWritten); // Clear the initial sync flag. clearInitialSyncFlag(&txn); BackgroundSync::get()->setInitialSyncRequestedFlag(false); - wunit.commit(); } // If we just cloned & there were no ops applied, we still want the primary to know where diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 3f523b03e19..93740b37827 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -40,6 +40,7 @@ #include "mongo/db/client.h" #include "mongo/db/cloner.h" #include "mongo/db/commands.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/operation_context_impl.h" @@ -656,9 +657,14 @@ namespace { catch (DBException& e) { if (e.getCode() == 13415) { // hack: need to just make cappedTruncate do this... - WriteUnitOfWork wunit(txn); - uassertStatusOK(collection->truncate(txn)); - wunit.commit(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + uassertStatusOK(collection->truncate(txn)); + wunit.commit(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, + "truncate", + collection->ns().ns()); } else { throw e; diff --git a/src/mongo/db/repl/sync.cpp b/src/mongo/db/repl/sync.cpp index bdfc676a63f..bc7d86c8d31 100644 --- a/src/mongo/db/repl/sync.cpp +++ b/src/mongo/db/repl/sync.cpp @@ -37,7 +37,10 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" +#include "mongo/db/curop.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/jsobj.h" +#include "mongo/db/operation_context.h" #include "mongo/db/record_id.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/util/assert_util.h" @@ -114,40 +117,39 @@ namespace repl { bool Sync::shouldRetry(OperationContext* txn, const BSONObj& o) { const NamespaceString nss(o.getStringField("ns")); - - // Take an X lock on the database in order to preclude other modifications. Also, the - // database might not exist yet, so create it. - AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X); - Database* const db = autoDb.getDb(); - - // we don't have the object yet, which is possible on initial sync. get it. - log() << "replication info adding missing object" << endl; // rare enough we can log - - BSONObj missingObj = getMissingDoc(txn, db, o); - - if( missingObj.isEmpty() ) { - log() << "replication missing object not found on source. presumably deleted later in oplog" << endl; - log() << "replication o2: " << o.getObjectField("o2").toString() << endl; - log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl; - - return false; - } - else { - WriteUnitOfWork wunit(txn); - - Collection* const collection = db->getOrCreateCollection(txn, nss.toString()); - invariant(collection); - - StatusWith<RecordId> result = collection->insertDocument(txn, missingObj, true); - uassert(15917, - str::stream() << "failed to insert missing doc: " << result.toString(), - result.isOK() ); - - LOG(1) << "replication inserted missing doc: " << missingObj.toString() << endl; - - wunit.commit(); - return true; - } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + // Take an X lock on the database in order to preclude other modifications. + // Also, the database might not exist yet, so create it. + AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X); + Database* const db = autoDb.getDb(); + + // we don't have the object yet, which is possible on initial sync. get it. + log() << "adding missing object" << endl; // rare enough we can log + BSONObj missingObj = getMissingDoc(txn, db, o); + + if( missingObj.isEmpty() ) { + log() << "missing object not found on source." + " presumably deleted later in oplog"; + log() << "o2: " << o.getObjectField("o2").toString(); + log() << "o firstfield: " << o.getObjectField("o").firstElementFieldName(); + return false; + } + else { + WriteUnitOfWork wunit(txn); + + Collection* const coll = db->getOrCreateCollection(txn, nss.toString()); + invariant(coll); + + StatusWith<RecordId> result = coll->insertDocument(txn, missingObj, true); + uassert(15917, + str::stream() << "failed to insert missing doc: " + << result.getStatus().toString(), + result.isOK() ); + LOG(1) << "inserted missing doc: " << missingObj.toString() << endl; + wunit.commit(); + return true; + } + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "InsertRetry", nss.ns()); } } // namespace repl |