diff options
author | Marko Vojvodic <marko.vojvodic@mongodb.com> | 2016-11-08 15:20:39 -0500 |
---|---|---|
committer | Marko Vojvodic <marko.vojvodic@mongodb.com> | 2016-11-14 09:43:29 -0500 |
commit | 493b1f4bb89d721e630bcd99b1aac2808c8a5161 (patch) | |
tree | 9c50b7964fc87499e2983beff2de3bc121753e61 | |
parent | dfe9b688956e19095e2d37e21bb40c7b7ad17900 (diff) | |
download | mongo-493b1f4bb89d721e630bcd99b1aac2808c8a5161.tar.gz |
SERVER-24596 Replace DBDirectClient::count call with Collection::numRecords to avoid deadlock in State::postProcessCollection
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.h | 5 |
2 files changed, 50 insertions, 48 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 3f2f13d2db0..c113a8afe63 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -588,52 +588,56 @@ long long State::postProcessCollection(OperationContext* txn, if (_onDisk == false || _config.outputOptions.outType == Config::INMEMORY) return numInMemKeys(); + bool holdingGlobalLock = false; if (_config.outputOptions.outNonAtomic) - return postProcessCollectionNonAtomic(txn, curOp, pm); + return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock); invariant(!txn->lockState()->isLocked()); ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lock( - txn->lockState()); // TODO(erh): this is how it was, but seems it doesn't need to be global - return postProcessCollectionNonAtomic(txn, curOp, pm); + // This must be global because we may write across different databases. + Lock::GlobalWrite lock(txn->lockState()); + holdingGlobalLock = true; + return postProcessCollectionNonAtomic(txn, curOp, pm, holdingGlobalLock); } -// -// For SERVER-6116 - can't handle version errors in count currently -// +namespace { + +// Runs a count against the namespace specified by 'ns'. If the caller holds the global write lock, +// then this function does not acquire any additional locks. +unsigned long long _collectionCount(OperationContext* txn, + const string& ns, + bool callerHoldsGlobalLock) { + Collection* coll; + boost::optional<AutoGetCollectionForRead> ctx; + + // If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead + // to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596. + if (callerHoldsGlobalLock) { + Database* db = dbHolder().get(txn, ns); + coll = db->getCollection(ns); + } else { + ctx.emplace(txn, NamespaceString(ns)); + coll = ctx->getCollection(); + } -/** - * Runs count and disables version errors. - * - * TODO: make count work with versioning - */ -unsigned long long _safeCount(OperationContext* txn, - // Can't be const b/c count isn't - /* const */ DBDirectClient& db, - const string& ns, - const BSONObj& query = BSONObj(), - int options = 0, - int limit = 0, - int skip = 0) { - OperationShardingState::IgnoreVersioningBlock ignoreVersion(txn, NamespaceString(ns)); - return db.count(ns, query, options, limit, skip); + return coll ? coll->numRecords(txn) : 0; } -// -// End SERVER-6116 -// +} // namespace long long State::postProcessCollectionNonAtomic(OperationContext* txn, CurOp* curOp, - ProgressMeterHolder& pm) { + ProgressMeterHolder& pm, + bool callerHoldsGlobalLock) { if (_config.outputOptions.finalNamespace == _config.tempNamespace) - return _safeCount(txn, _db, _config.outputOptions.finalNamespace); + return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); if (_config.outputOptions.outType == Config::REPLACE || - _safeCount(txn, _db, _config.outputOptions.finalNamespace) == 0) { + _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) { ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lock(txn->lockState()); // TODO(erh): why global??? + // This must be global because we may write across different databases. + Lock::GlobalWrite lock(txn->lockState()); // replace: just rename from temp to final collection name, dropping previous collection _db.dropCollection(_config.outputOptions.finalNamespace); BSONObj info; @@ -651,19 +655,19 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, } else if (_config.outputOptions.outType == Config::MERGE) { // merge: upsert new docs into old collection { - const auto count = _safeCount(txn, _db, _config.tempNamespace, BSONObj()); + const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock); stdx::lock_guard<Client> lk(*txn->getClient()); curOp->setMessage_inlock( "m/r: merge post processing", "M/R Merge Post Processing Progress", count); } unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj()); while (cursor->more()) { - ScopedTransaction scopedXact(_txn, MODE_IX); - Lock::DBLock lock(_txn->lockState(), + ScopedTransaction scopedXact(txn, MODE_X); + Lock::DBLock lock(txn->lockState(), nsToDatabaseSubstring(_config.outputOptions.finalNamespace), MODE_X); BSONObj o = cursor->nextSafe(); - Helpers::upsert(_txn, _config.outputOptions.finalNamespace, o); + Helpers::upsert(txn, _config.outputOptions.finalNamespace, o); pm.hit(); } _db.dropCollection(_config.tempNamespace); @@ -673,7 +677,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, BSONList values; { - const auto count = _safeCount(txn, _db, _config.tempNamespace, BSONObj()); + const auto count = _collectionCount(txn, _config.tempNamespace, callerHoldsGlobalLock); stdx::lock_guard<Client> lk(*txn->getClient()); curOp->setMessage_inlock( "m/r: reduce post processing", "M/R Reduce Post Processing Progress", count); @@ -681,7 +685,8 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, unique_ptr<DBClientCursor> cursor = _db.query(_config.tempNamespace, BSONObj()); while (cursor->more()) { ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lock(txn->lockState()); // TODO(erh) why global? + // This must be global because we may write across different databases. + Lock::GlobalWrite lock(txn->lockState()); BSONObj temp = cursor->nextSafe(); BSONObj old; @@ -690,7 +695,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, const std::string& finalNamespace = _config.outputOptions.finalNamespace; OldClientContext tx(txn, finalNamespace); Collection* coll = getCollectionOrUassert(tx.db(), finalNamespace); - found = Helpers::findOne(_txn, coll, temp["_id"].wrap(), old, true); + found = Helpers::findOne(txn, coll, temp["_id"].wrap(), old, true); } if (found) { @@ -698,18 +703,18 @@ long long State::postProcessCollectionNonAtomic(OperationContext* txn, values.clear(); values.push_back(temp); values.push_back(old); - Helpers::upsert(_txn, + Helpers::upsert(txn, _config.outputOptions.finalNamespace, _config.reducer->finalReduce(values, _config.finalizer.get())); } else { - Helpers::upsert(_txn, _config.outputOptions.finalNamespace, temp); + Helpers::upsert(txn, _config.outputOptions.finalNamespace, temp); } pm.hit(); } pm.finished(); } - return _safeCount(txn, _db, _config.outputOptions.finalNamespace); + return _collectionCount(txn, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); } /** @@ -793,11 +798,6 @@ bool State::sourceExists() { return _db.exists(_config.ns); } -long long State::incomingDocuments() { - return _safeCount( - _txn, _db, _config.ns, _config.filter, QueryOption_SlaveOk, (unsigned)_config.limit); -} - State::~State() { if (_onDisk) { try { @@ -1434,7 +1434,10 @@ public: int progressTotal = 0; bool showTotal = true; if (state.config().filter.isEmpty()) { - progressTotal = state.incomingDocuments(); + const bool holdingGlobalLock = false; + const auto count = _collectionCount(txn, config.ns, holdingGlobalLock); + progressTotal = + (config.limit && (unsigned)config.limit < count) ? config.limit : count; } else { showTotal = false; // Set an arbitrary total > 0 so the meter will be activated. diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index 0fdec1129a9..89738ea1259 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -269,8 +269,6 @@ public: // ---- prep ----- bool sourceExists(); - long long incomingDocuments(); - // ---- map stage ---- /** @@ -321,7 +319,8 @@ public: long long postProcessCollection(OperationContext* txn, CurOp* op, ProgressMeterHolder& pm); long long postProcessCollectionNonAtomic(OperationContext* txn, CurOp* op, - ProgressMeterHolder& pm); + ProgressMeterHolder& pm, + bool callerHoldsGlobalLock); /** * if INMEMORY will append |