summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Vojvodic <marko.vojvodic@mongodb.com>2016-11-08 15:20:39 -0500
committerMarko Vojvodic <marko.vojvodic@mongodb.com>2016-11-14 09:43:29 -0500
commit493b1f4bb89d721e630bcd99b1aac2808c8a5161 (patch)
tree9c50b7964fc87499e2983beff2de3bc121753e61
parentdfe9b688956e19095e2d37e21bb40c7b7ad17900 (diff)
downloadmongo-493b1f4bb89d721e630bcd99b1aac2808c8a5161.tar.gz
SERVER-24596 Replace DBDirectClient::count call with Collection::numRecords to avoid deadlock in State::postProcessCollection
-rw-r--r--src/mongo/db/commands/mr.cpp93
-rw-r--r--src/mongo/db/commands/mr.h5
2 files changed, 50 insertions, 48 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 3f2f13d2db0..c113a8afe63 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -588,52 +588,56 @@ long long State::postProcessCollection(OperationContext* txn,
if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY)
return numInMemKeys();
+ bool holdingGlobalLock = false;
if (_config.outputOptions.outNonAtomic)
- return postProcessCollectionNonAtomic(txn, curOp, pm);
+ return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock);
invariant(!txn->lockState()->isLocked());
ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(
- txn->lockState()); // TODO(erh): this is how it was, but seems it doesn't need to be global
- return postProcessCollectionNonAtomic(txn, curOp, pm);
+ // This must be global because we may write across different databases.
+ Lock::GlobalWrite lock(txn->lockState());
+ holdingGlobalLock = true;
+ return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock);
}
-//
-// For SERVER-6116 - can't handle version errors in count currently
-//
+namespace {
+
+// Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock,
+// then this function does not acquire any additional locks.
+unsigned long long _collectionCount(OperationContext* txn,
+ const string& ns,
+ bool callerHoldsGlobalLock) {
+ Collection* coll;
+ boost::optional<AutoGetCollectionForRead> ctx;
+
+ // If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead
+ // to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
+ if (callerHoldsGlobalLock) {
+ Database* db = dbHolder().get(txn, ns);
+ coll = db->getCollection(ns);
+ } else {
+ ctx.emplace(txn, NamespaceString(ns));
+ coll = ctx->getCollection();
+ }
-/**
- * Runs count and disables version errors.
- *
- * TODO: make count work with versioning
- */
-unsigned long long _safeCount(OperationContext* txn,
- // Can't be const b/c count isn't
- /* const */ DBDirectClient& db,
- const string& ns,
- const BSONObj& query = BSONObj(),
- int options = 0,
- int limit = 0,
- int skip = 0) {
- OperationShardingState::IgnoreVersioningBlock ignoreVersion(txn, NamespaceString(ns));
- return db.count(ns, query, options, limit, skip);
+ return coll ? coll->numRecords(txn) : 0;
}
-//
-// End SERVER-6116
-//
+} // namespace
long long State::postProcessCollectionNonAtomic(OperationContext* txn,
CurOp* curOp,
- ProgressMeterHolder& pm) {
+ ProgressMeterHolder& pm,
+ bool callerHoldsGlobalLock) {
if (_config.outputOptions.finalNamespace == _config.tempNamespace)
- return _safeCount(txn, _db, _config.outputOptions.finalNamespace);
+ return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
if (_config.outputOptions.outType == Config::REPLACE ||
- _safeCount(txn, _db, _config.outputOptions.finalNamespace) == 0) {
+ _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) {
ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): why global???
+ // This must be global because we may write across different databases.
+ Lock::GlobalWrite lock(txn->lockState());
// replace: just rename from temp to final collection name, dropping previous collection
_db.dropCollection(_config.outputOptions.finalNamespace);
BSONObj info;
@@ -651,19 +655,19 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
} else if (_config.outputOptions.outType == Config::MERGE) {
// merge: upsert new docs into old collection
{
- const auto count = _safeCount(txn, _db, _config.tempNamespace, BSONObj());
+ const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock);
stdx::lock_guard<Client> lk(*txn->getClient());
curOp->setMessage_inlock(
"m/r: merge post processing", "M/R Merge Post Processing Progress", count);
}
unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj());
while (cursor->more()) {
- ScopedTransaction scopedXact(_txn, MODE_IX);
- Lock::DBLock lock(_txn->lockState(),
+ ScopedTransaction scopedXact(txn, MODE_X);
+ Lock::DBLock lock(txn->lockState(),
nsToDatabaseSubstring(_config.outputOptions.finalNamespace),
MODE_X);
BSONObj o = cursor->nextSafe();
- Helpers::upsert(_txn, _config.outputOptions.finalNamespace, o);
+ Helpers::upsert(txn, _config.outputOptions.finalNamespace, o);
pm.hit();
}
_db.dropCollection(_config.tempNamespace);
@@ -673,7 +677,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
BSONList values;
{
- const auto count = _safeCount(txn, _db, _config.tempNamespace, BSONObj());
+ const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock);
stdx::lock_guard<Client> lk(*txn->getClient());
curOp->setMessage_inlock(
"m/r: reduce post processing", "M/R Reduce Post Processing Progress", count);
@@ -681,7 +685,8 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj());
while (cursor->more()) {
ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lock(txn->lockState()); // TODO(erh) why global?
+ // This must be global because we may write across different databases.
+ Lock::GlobalWrite lock(txn->lockState());
BSONObj temp = cursor->nextSafe();
BSONObj old;
@@ -690,7 +695,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
const std::string& finalNamespace = _config.outputOptions.finalNamespace;
OldClientContext tx(txn, finalNamespace);
Collection* coll = getCollectionOrUassert(tx.db(), finalNamespace);
- found = Helpers::findOne(_txn, coll, temp["_id"].wrap(), old, true);
+ found = Helpers::findOne(txn, coll, temp["_id"].wrap(), old, true);
}
if (found) {
@@ -698,18 +703,18 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn,
values.clear();
values.push_back(temp);
values.push_back(old);
- Helpers::upsert(_txn,
+ Helpers::upsert(txn,
_config.outputOptions.finalNamespace,
_config.reducer->finalReduce(values, _config.finalizer.get()));
} else {
- Helpers::upsert(_txn, _config.outputOptions.finalNamespace, temp);
+ Helpers::upsert(txn, _config.outputOptions.finalNamespace, temp);
}
pm.hit();
}
pm.finished();
}
- return _safeCount(txn, _db, _config.outputOptions.finalNamespace);
+ return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
}
/**
@@ -793,11 +798,6 @@ bool State::sourceExists() {
return _db.exists(_config.ns);
}
-long long State::incomingDocuments() {
- return _safeCount(
- _txn, _db, _config.ns, _config.filter, QueryOption_SlaveOk, (unsigned)_config.limit);
-}
-
State::~State() {
if (_onDisk) {
try {
@@ -1434,7 +1434,10 @@ public:
int progressTotal = 0;
bool showTotal = true;
if (state.config().filter.isEmpty()) {
- progressTotal = state.incomingDocuments();
+ const bool holdingGlobalLock = false;
+ const auto count = _collectionCount(txn, config.ns, holdingGlobalLock);
+ progressTotal =
+ (config.limit && (unsigned)config.limit < count) ? config.limit : count;
} else {
showTotal = false;
// Set an arbitrary total > 0 so the meter will be activated.
diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h
index 0fdec1129a9..89738ea1259 100644
--- a/src/mongo/db/commands/mr.h
+++ b/src/mongo/db/commands/mr.h
@@ -269,8 +269,6 @@ public:
// ---- prep -----
bool sourceExists();
- long long incomingDocuments();
-
// ---- map stage ----
/**
@@ -321,7 +319,8 @@ public:
long long postProcessCollection(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm);
long long postProcessCollectionNonAtomic(OperationContext* txn,
CurOp* op,
- ProgressMeterHolder& pm);
+ ProgressMeterHolder& pm,
+ bool callerHoldsGlobalLock);
/**
* if INMEMORY will append