summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr.cpp
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2015-06-23 11:41:15 -0400
committerGeert Bosch <geert@mongodb.com>2015-07-31 18:20:05 -0400
commitba6cfc1c7790760e4f5a21da31cce74133023430 (patch)
tree8fdb39870602eed3a431f7b9c990e746cf9ea90d /src/mongo/db/commands/mr.cpp
parentce1019eabc51305ec7fabc1808605b9c9fb24523 (diff)
downloadmongo-ba6cfc1c7790760e4f5a21da31cce74133023430.tar.gz
SERVER-16322: Make sure that RecoveryUnit::commitUnitOfWork can throw WCE
Map reduce did not handle WriteConflictExceptions at commit time well.
Diffstat (limited to 'src/mongo/db/commands/mr.cpp')
-rw-r--r--src/mongo/db/commands/mr.cpp115
1 files changed, 65 insertions, 50 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 46f9af9f6ee..62b17b562f6 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
@@ -346,13 +347,16 @@ void State::dropTempCollections() {
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
- ScopedTransaction scopedXact(_txn, MODE_IX);
- Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X);
- if (Database* db = dbHolder().get(_txn, _config.incLong)) {
- WriteUnitOfWork wunit(_txn);
- db->dropCollection(_txn, _config.incLong);
- wunit.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(_txn, MODE_IX);
+ Lock::DBLock lk(_txn->lockState(), nsToDatabaseSubstring(_config.incLong), MODE_X);
+ if (Database* db = dbHolder().get(_txn, _config.incLong)) {
+ WriteUnitOfWork wunit(_txn);
+ db->dropCollection(_txn, _config.incLong);
+ wunit.commit();
+ }
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R dropTempCollections", _config.incLong)
ShardConnection::forgetNS(_config.incLong);
}
@@ -373,26 +377,30 @@ void State::prepTempCollection() {
_txn->setReplicatedWrites(false);
ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
- OldClientWriteContext incCtx(_txn, _config.incLong);
- WriteUnitOfWork wuow(_txn);
- Collection* incColl = incCtx.getCollection();
- invariant(!incColl);
-
- CollectionOptions options;
- options.setNoIdIndex();
- options.temp = true;
- incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
- invariant(incColl);
-
- BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name"
- << "_temp_0");
- Status status = incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec);
- if (!status.isOK()) {
- uasserted(17305,
- str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong
- << " err: " << status.code());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ OldClientWriteContext incCtx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
+ Collection* incColl = incCtx.getCollection();
+ invariant(!incColl);
+
+ CollectionOptions options;
+ options.setNoIdIndex();
+ options.temp = true;
+ incColl = incCtx.db()->createCollection(_txn, _config.incLong, options);
+ invariant(incColl);
+
+ BSONObj indexSpec = BSON("key" << BSON("0" << 1) << "ns" << _config.incLong << "name"
+ << "_temp_0");
+ Status status =
+ incColl->getIndexCatalog()->createIndexOnEmptyCollection(_txn, indexSpec);
+ if (!status.isOK()) {
+ uasserted(17305,
+ str::stream() << "createIndex failed for mr incLong ns: "
+ << _config.incLong << " err: " << status.code());
+ }
+ wuow.commit();
}
- wuow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.incLong);
}
CollectionOptions finalOptions;
@@ -426,7 +434,7 @@ void State::prepTempCollection() {
}
}
- {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
// create temp collection and insert the indexes from temporary storage
OldClientWriteContext tempCtx(_txn, _config.tempNamespace);
WriteUnitOfWork wuow(_txn);
@@ -456,6 +464,7 @@ void State::prepTempCollection() {
}
wuow.commit();
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R prepTempCollection", _config.tempNamespace)
}
/**
@@ -669,24 +678,26 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
void State::insert(const string& ns, const BSONObj& o) {
verify(_onDisk);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ OldClientWriteContext ctx(_txn, ns);
+ WriteUnitOfWork wuow(_txn);
+ NamespaceString nss(ns);
+ uassert(ErrorCodes::NotMaster,
+ "no longer master",
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
+ Collection* coll = getCollectionOrUassert(ctx.db(), ns);
- OldClientWriteContext ctx(_txn, ns);
- WriteUnitOfWork wuow(_txn);
- NamespaceString nss(ns);
- uassert(ErrorCodes::NotMaster,
- "no longer master",
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss));
- Collection* coll = getCollectionOrUassert(ctx.db(), ns);
+ BSONObjBuilder b;
+ if (!o.hasField("_id")) {
+ b.appendOID("_id", NULL, true);
+ }
+ b.appendElements(o);
+ BSONObj bo = b.obj();
- BSONObjBuilder b;
- if (!o.hasField("_id")) {
- b.appendOID("_id", NULL, true);
+ uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus());
+ wuow.commit();
}
- b.appendElements(o);
- BSONObj bo = b.obj();
-
- uassertStatusOK(coll->insertDocument(_txn, bo, true).getStatus());
- wuow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insert", ns);
}
/**
@@ -695,14 +706,17 @@ void State::insert(const string& ns, const BSONObj& o) {
void State::_insertToInc(BSONObj& o) {
verify(_onDisk);
- OldClientWriteContext ctx(_txn, _config.incLong);
- WriteUnitOfWork wuow(_txn);
- Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
- bool shouldReplicateWrites = _txn->writesAreReplicated();
- _txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
- uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
- wuow.commit();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ OldClientWriteContext ctx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
+ Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
+ bool shouldReplicateWrites = _txn->writesAreReplicated();
+ _txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, _txn, shouldReplicateWrites);
+ uassertStatusOK(coll->insertDocument(_txn, o, true, false).getStatus());
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "M/R insertToInc", _config.incLong);
}
State::State(OperationContext* txn, const Config& c)
@@ -967,7 +981,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
verify(_temp->size() == 0);
BSONObj sortKey = BSON("0" << 1);
- {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
OldClientWriteContext incCtx(_txn, _config.incLong);
WriteUnitOfWork wuow(_txn);
Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong);
@@ -987,6 +1001,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
verify(foundIndex);
wuow.commit();
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(_txn, "finalReduce", _config.incLong);
unique_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));