summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCraig Harris <craig.harris@10gen.com>2014-06-20 11:20:12 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2014-06-20 17:32:30 -0400
commit87739d45b9c974da88ca37d740c71b441249d45b (patch)
treec8aa6ea4b80e5e46526013d06af8f8eda6f9a840
parentd7b4f787b0c2885ba80f76a54a4961a32ee1debf (diff)
downloadmongo-87739d45b9c974da88ca37d740c71b441249d45b.tar.gz
SERVER-14006 Experimental document-level locking for in-place updates
This commit enables experimental (and untested) document-level locking for in-place updates, which do not modify indexed values. It should absolutely not be used on production systems and is for demonstration purposes only. It runs under server parameter called useExperimentalDocLocking. In order to enable it, run: ./mongod <other parameters> --setParameter useExperimentalDocLocking=true
-rw-r--r--src/mongo/SConscript2
-rw-r--r--src/mongo/db/catalog/collection.cpp3
-rw-r--r--src/mongo/db/catalog/collection_cursor_cache.cpp14
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp5
-rw-r--r--src/mongo/db/concurrency/SConscript15
-rw-r--r--src/mongo/db/concurrency/lock_mgr.cpp1044
-rw-r--r--src/mongo/db/concurrency/lock_mgr.h621
-rw-r--r--src/mongo/db/concurrency/lock_mgr_test.cpp896
-rw-r--r--src/mongo/db/d_concurrency.cpp71
-rw-r--r--src/mongo/db/d_concurrency.h9
-rw-r--r--src/mongo/db/instance.cpp2
-rw-r--r--src/mongo/db/ops/update.cpp8
-rw-r--r--src/mongo/db/startup_warnings.cpp14
-rw-r--r--src/mongo/db/structure/record_store_v1_base.cpp4
14 files changed, 2687 insertions, 21 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 7ed2211bf51..84e32ad556b 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -20,6 +20,7 @@ env.SConscript(['base/SConscript',
'db/auth/SConscript',
'db/catalog/SConscript',
'db/commands/SConscript',
+ 'db/concurrency/SConscript',
'db/exec/SConscript',
'db/fts/SConscript',
'db/index/SConscript',
@@ -902,6 +903,7 @@ env.Library("serveronly", serverOnlyFiles,
"db/auth/authmongod",
"db/fts/ftsmongod",
"db/common",
+ "db/concurrency/lock_mgr",
"db/ops/update_driver",
"defaultversion",
"geoparser",
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 1c137515d65..b1f705f1411 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database.h"
+#include "mongo/db/concurrency/lock_mgr.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/operation_context.h"
@@ -406,6 +407,8 @@ namespace mongo {
// Broadcast the mutation so that query results stay correct.
_cursorCache.invalidateDocument(loc, INVALIDATION_MUTATION);
+
+ ExclusiveResourceLock lk((size_t)txn->getCurOp()->opNum(), *(size_t*)&loc);
return _recordStore->updateWithDamages( txn, loc, damangeSource, damages );
}
diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp
index e9962c9441f..2e941dd1f40 100644
--- a/src/mongo/db/catalog/collection_cursor_cache.cpp
+++ b/src/mongo/db/catalog/collection_cursor_cache.cpp
@@ -388,14 +388,18 @@ namespace mongo {
}
void CollectionCursorCache::registerRunner( Runner* runner ) {
- SimpleMutex::scoped_lock lk( _mutex );
- const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner);
- invariant(result.second); // make sure this was inserted
+ if (!useExperimentalDocLocking) {
+ SimpleMutex::scoped_lock lk(_mutex);
+ const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner);
+ invariant(result.second); // make sure this was inserted
+ }
}
void CollectionCursorCache::deregisterRunner( Runner* runner ) {
- SimpleMutex::scoped_lock lk( _mutex );
- _nonCachedRunners.erase( runner );
+ if (!useExperimentalDocLocking) {
+ SimpleMutex::scoped_lock lk(_mutex);
+ _nonCachedRunners.erase(runner);
+ }
}
ClientCursor* CollectionCursorCache::find( CursorId id, bool pin ) {
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 1711348a35b..52acb7aae6f 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -1092,14 +1092,13 @@ namespace mongo {
}
///////////////////////////////////////////
- Lock::DBWrite writeLock(txn->lockState(), nsString.ns());
+ Lock::DBWrite writeLock(txn->lockState(), nsString.ns(), useExperimentalDocLocking);
///////////////////////////////////////////
if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result))
return;
- Client::Context ctx( nsString.ns(),
- false /* don't check version */ );
+ Client::Context ctx(nsString.ns(), false /* don't check version */);
try {
UpdateResult res = executor.execute(txn, ctx.db());
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript
new file mode 100644
index 00000000000..53be5435c91
--- /dev/null
+++ b/src/mongo/db/concurrency/SConscript
@@ -0,0 +1,15 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env.Library(
+ target='lock_mgr',
+ source=[
+ 'lock_mgr.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/third_party/shim_boost',
+ '$BUILD_DIR/mongo/foundation'
+ ],
+)
diff --git a/src/mongo/db/concurrency/lock_mgr.cpp b/src/mongo/db/concurrency/lock_mgr.cpp
new file mode 100644
index 00000000000..d0cc06ead8d
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr.cpp
@@ -0,0 +1,1044 @@
+/**
+* Copyright (C) MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/concurrency/lock_mgr.h"
+
+#include <boost/thread/locks.hpp>
+#include <sstream>
+
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/timer.h"
+
+using boost::unique_lock;
+
+using std::endl;
+using std::exception;
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace mongo {
+
+ /*---------- Utility functions ----------*/
+
+ namespace {
+
+ /*
+ * Given a 'hierarchical' resource (e.g. document in collection in database)
+ * return true if the resource is reqeuested for exclusive access at a given level.
+ * In the example above, level 0 for document, level 1 for collection, level 2 for DB
+ * and level 3 for the system as a whole.
+ *
+ * mode is a bit vector, level is the index.
+ */
+ bool isExclusive(unsigned mode, const unsigned level=0) {
+ return 0 != (mode & (0x1 << level));
+ }
+
+ bool isShared(unsigned mode, const unsigned level=0) {
+ return 0 == (mode & (0x1 << level));
+ }
+
+ bool isCompatible(unsigned mode1, unsigned mode2) {
+ return mode1==mode2 && (isShared(mode1) || isShared(mode1));
+ }
+
+ bool hasConflict(const LockManager::ResourceStatus& status) {
+ return LockManager::kResourceConflict == status ||
+ LockManager::kResourceUpgradeConflict == status ||
+ LockManager::kResourcePolicyConflict == status;
+ }
+
+ // unique id for each LockRequest. zero is reserved
+ static LockManager::LockId nextLid = 1;
+ } // namespace
+
+ /*---------- AbortException functions ----------*/
+
+ const char* LockManager::AbortException::what() const throw() { return "AbortException"; }
+
+ /*---------- LockStats functions ----------*/
+ string LockManager::LockStats::toString() const {
+ stringstream result;
+ result << "----- LockManager Stats -----" << endl
+ << "\ttotal requests: " << getNumRequests() << endl
+ << "\t# pre-existing: " << getNumPreexistingRequests() << endl
+ << "\t# same: " << getNumSameRequests() << endl
+ << "\t# times blocked: " << getNumBlocks() << endl
+ << "\t# ms blocked: " << getNumMillisBlocked() << endl
+ << "\t# deadlocks: " << getNumDeadlocks() << endl
+ << "\t# downgrades: " << getNumDowngrades() << endl
+ << "\t# upgrades: " << getNumUpgrades() << endl
+ ;
+ return result.str();
+ }
+
+ /*---------- LockRequest functions ----------*/
+
+
+ LockManager::LockRequest::LockRequest(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId)
+ : lid(nextLid++)
+ , xid(xid)
+ , mode(mode)
+ , resId(resId)
+ , count(1)
+ , sleepCount(0) { }
+
+
+ LockManager::LockRequest::~LockRequest() { }
+
+ bool LockManager::LockRequest::matches(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId) const {
+ return
+ this->xid == xid &&
+ this->mode == mode &&
+ this->resId == resId;
+ }
+
+ string LockManager::LockRequest::toString() const {
+ stringstream result;
+ result << "<lid:" << lid
+ << ",xid:" << xid
+ << ",mode:" << mode
+ << ",resId:" << resId
+ << ",count:" << count
+ << ",sleepCount:" << sleepCount
+ << ">";
+ return result.str();
+ }
+
+ bool LockManager::LockRequest::isBlocked() const {
+ return sleepCount > 0;
+ }
+
+ bool LockManager::LockRequest::shouldAwake() {
+ return 0 == --sleepCount;
+ }
+
+ /*---------- LockManager public functions (mutex guarded) ---------*/
+
+ LockManager* LockManager::_singleton = NULL;
+ boost::mutex LockManager::_getSingletonMutex;
+ LockManager& LockManager::getSingleton() {
+ unique_lock<boost::mutex> lk(_getSingletonMutex);
+ if (NULL == _singleton) {
+ _singleton = new LockManager();
+ }
+ return *_singleton;
+ }
+
+ LockManager::LockManager(const Policy& policy)
+ : _policy(policy),
+ _mutex(),
+ _shuttingDown(false),
+ _millisToQuiesce(-1) { }
+
+ LockManager::~LockManager() {
+ unique_lock<boost::mutex> lk(_mutex);
+ for (map<LockId, LockRequest*>::iterator locks = _locks.begin();
+ locks != _locks.end(); ++locks) {
+ delete locks->second;
+ }
+ }
+
+ void LockManager::shutdown(const unsigned& millisToQuiesce) {
+ unique_lock<boost::mutex> lk(_mutex);
+
+#ifdef DONT_ALLOW_CHANGE_TO_QUIESCE_PERIOD
+ // XXX not sure whether we want to allow multiple shutdowns
+ // in order to change quiesce period?
+ if (_shuttingDown) {
+ return; // already in shutdown, don't extend quiescence(?)
+ }
+#endif
+
+ _shuttingDown = true;
+ _millisToQuiesce = millisToQuiesce;
+ _timer.millisReset();
+ }
+
+ LockManager::Policy LockManager::getPolicy() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _policy;
+ }
+
+ TxId LockManager::getPolicySetter() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _policySetter;
+ }
+
+ void LockManager::setTransactionPriority(const TxId& xid, int priority) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(xid);
+ _txPriorities[xid] = priority;
+ }
+
+ int LockManager::getTransactionPriority(const TxId& xid) const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(xid);
+ return _getTransactionPriorityInternal(xid);
+ }
+
+ LockManager::LockId LockManager::acquire(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* notifier) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(requestor);
+
+ // don't accept requests from aborted transactions
+ if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) {
+ throw AbortException();
+ }
+
+ _stats.incRequests();
+ _stats.incStatsForMode(mode);
+
+ return _acquireInternal(requestor, mode, resId, notifier, lk);
+ }
+
+ int LockManager::acquireOne(const TxId& requestor,
+ const uint32_t& mode,
+ const vector<ResourceId>& resources,
+ Notifier* notifier) {
+
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(requestor);
+
+ if (resources.empty()) { return -1; }
+
+ // don't accept requests from aborted transactions
+ if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) {
+ throw AbortException();
+ }
+
+ _stats.incRequests();
+
+ // acquire the first available recordId
+ for (unsigned ix=0; ix < resources.size(); ix++) {
+ if (_isAvailable(requestor, mode, resources[ix])) {
+ _acquireInternal(requestor, mode, resources[ix], notifier, lk);
+ _stats.incStatsForMode(mode);
+ return ix;
+ }
+ }
+
+ // sigh. none of the records are currently available. wait on the first.
+ _stats.incStatsForMode(mode);
+ _acquireInternal(requestor, mode, resources[0], notifier, lk);
+ return 0;
+ }
+
+ LockManager::LockStatus LockManager::releaseLock(const LockId& lid) {
+ unique_lock<boost::mutex> lk(_mutex);
+
+ LockMap::iterator it = _locks.find(lid);
+ if (it != _locks.end()) {
+ LockRequest* theLock = it->second;
+ _throwIfShuttingDown(theLock->xid);
+ _stats.decStatsForMode(theLock->mode);
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ }
+ return _releaseInternal(lid);
+ }
+
+ LockManager::LockStatus LockManager::release(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ LockId lid;
+ LockStatus status = _findLock(holder, mode, resId, &lid);
+ if (kLockFound != status) {
+ return status; // error, resource wasn't acquired in this mode by holder
+ }
+ _stats.decStatsForMode(_locks[lid]->mode);
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ return _releaseInternal(lid);
+ }
+
+ /*
+ * release all resource acquired by a transaction, returning the count
+ */
+ size_t LockManager::release(const TxId& holder) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ TxLockMap::iterator lockIdsHeld = _xaLocks.find(holder);
+ if (lockIdsHeld == _xaLocks.end()) { return 0; }
+ size_t numLocksReleased = 0;
+ for (set<LockId>::iterator nextLockId = lockIdsHeld->second.begin();
+ nextLockId != lockIdsHeld->second.end(); ++nextLockId) {
+ _releaseInternal(*nextLockId);
+
+ _stats.decStatsForMode(_locks[*nextLockId]->mode);
+
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ numLocksReleased++;
+ }
+ return numLocksReleased;
+ }
+
+ void LockManager::abort(const TxId& goner) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(goner);
+ _abortInternal(goner);
+ }
+
+ LockManager::LockStats LockManager::getStats() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _stats;
+ }
+
+ string LockManager::toString() const {
+// unique_lock<boost::mutex> lk(_mutex);
+#ifdef DONT_CARE_ABOUT_DEBUG_EVEN_WHEN_SHUTTING_DOWN
+ // seems like we might want to allow toString for debug during shutdown?
+ _throwIfShuttingDown();
+#endif
+ stringstream result;
+ result << "Policy: ";
+ switch(_policy) {
+ case kPolicyFirstCome:
+ result << "FirstCome";
+ break;
+ case kPolicyReadersFirst:
+ result << "ReadersFirst";
+ break;
+ case kPolicyOldestTxFirst:
+ result << "OldestFirst";
+ break;
+ case kPolicyReadersOnly:
+ result << "ReadersOnly";
+ break;
+ case kPolicyWritersOnly:
+ result << "WritersOnly";
+ break;
+ }
+ result << endl;
+
+ if (_shuttingDown)
+ result << " shutting down in " << _millisToQuiesce - _timer.millis();
+
+ result << "\t_locks:" << endl;
+ for (map<LockId,LockRequest*>::const_iterator locks = _locks.begin();
+ locks != _locks.end(); ++locks) {
+ result << "\t\t" << locks->first << locks->second->toString() << endl;
+ }
+
+ result << "\t_resourceLocks:" << endl;
+ bool firstResource=true;
+ result << "resources=" << ": {";
+ for (map<ResourceId,list<LockId> >::const_iterator nextResource = _resourceLocks.begin();
+ nextResource != _resourceLocks.end(); ++nextResource) {
+ if (firstResource) firstResource=false;
+ else result << ", ";
+ result << nextResource->first << ": {";
+ bool firstLock=true;
+ for (list<LockId>::const_iterator nextLockId = nextResource->second.begin();
+ nextLockId != nextResource->second.end(); ++nextLockId) {
+ if (firstLock) firstLock=false;
+ else result << ", ";
+ result << *nextLockId;
+ }
+ result << "}";
+ }
+ result << "}" << endl;
+
+ result << "\t_waiters:" << endl;
+ for (map<TxId, set<TxId> >::const_iterator txWaiters = _waiters.begin();
+ txWaiters != _waiters.end(); ++txWaiters) {
+ bool firstTime=true;
+ result << "\t\t" << txWaiters->first << ": {";
+ for (set<TxId>::const_iterator nextWaiter = txWaiters->second.begin();
+ nextWaiter != txWaiters->second.end(); ++nextWaiter) {
+ if (firstTime) firstTime=false;
+ else result << ", ";
+ result << *nextWaiter;
+ }
+ result << "}" << endl;
+ }
+
+ bool firstGoner = true;
+ result << "\t_aborted: {" << endl;
+ for (set<TxId>::iterator goners = _abortedTxIds.begin();
+ goners != _abortedTxIds.end(); ++goners) {
+ if (firstGoner) firstGoner = false;
+ else result << ",";
+ result << "t" << *goners;
+ }
+ result << "}";
+
+ return result.str();
+ }
+
+ bool LockManager::isLocked(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId) const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ LockId unused;
+ return kLockFound == _findLock(holder, mode, resId, &unused);
+ }
+
+ /*---------- LockManager private functions (alphabetical) ----------*/
+
+ /*
+ * release resources acquired by a transaction about to abort, notifying
+ * any waiters that they can retry their resource acquisition. cleanup
+ * and throw an AbortException.
+ */
+ void LockManager::_abortInternal(const TxId& goner) {
+ TxLockMap::iterator locks = _xaLocks.find(goner);
+
+ if (locks == _xaLocks.end()) {
+ // unusual, but possible to abort a transaction with no locks
+ throw AbortException();
+ }
+
+ // make a copy of the TxId's locks, because releasing
+ // would otherwise affect the iterator. XXX find a better way?
+ //
+ set<LockId> copyOfLocks = locks->second;
+
+ // release all resources acquired by this transaction
+ // notifying any waiters that they can continue
+ //
+ for (set<LockId>::iterator nextLockId = copyOfLocks.begin();
+ nextLockId != copyOfLocks.end(); ++nextLockId) {
+ _releaseInternal(*nextLockId);
+ }
+
+ // add to set of aborted transactions
+ _abortedTxIds.insert(goner);
+
+ throw AbortException();
+ }
+
+ LockManager::LockId LockManager::_acquireInternal(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* sleepNotifier,
+ unique_lock<boost::mutex>& guard) {
+#if 0
+ static unsigned long funCount = 0;
+ if (0 == ++funCount % 10000) {
+ log() << _stats.toString();
+ }
+#endif
+ list<LockId>& queue = _resourceLocks[resId];
+ if (!queue.empty()) { _stats.incPreexisting(); }
+ list<LockId>::iterator conflictPosition = queue.begin();
+ ResourceStatus resourceStatus = _conflictExists(requestor, mode, resId,
+ queue, conflictPosition);
+ if (kResourceAcquired == resourceStatus) {
+ _stats.incSame();
+ ++_locks[*conflictPosition]->count;
+ return *conflictPosition;
+ }
+
+ // create the lock request and add to TxId's set of lock requests
+
+ // XXX should probably use placement operator new and manage LockRequest memory
+ LockRequest* lr = new LockRequest(requestor, mode, resId);
+ _locks[lr->lid] = lr;
+
+ // add lock request to set of requests of requesting TxId
+ _xaLocks[requestor].insert(lr->lid);
+
+ if (kResourceAvailable == resourceStatus) {
+ queue.insert(conflictPosition, lr->lid);
+ _addWaiters(lr, conflictPosition, queue.end());
+ return lr->lid;
+ }
+
+ // some type of conflict, insert after confictPosition
+
+ verify(conflictPosition != queue.end());
+ ++conflictPosition;
+
+ if (kResourceUpgradeConflict == resourceStatus) {
+ queue.insert(conflictPosition, lr->lid);
+ }
+ else {
+ _addLockToQueueUsingPolicy(lr, queue, conflictPosition);
+ }
+#if 0
+ if (isExclusive(mode)) {
+ for (list<LockId>::iterator followers = conflictPosition;
+ followers != queue.end(); ++followers) {
+ LockRequest* nextLockRequest = _locks[*followers];
+ if (nextLockRequest->xid == requestor) continue;
+ verify(nextLockRequest->isBlocked());
+ }
+ }
+#endif
+ // set remaining incompatible requests as lr's waiters
+ _addWaiters(lr, conflictPosition, queue.end());
+
+
+ // call the sleep notification function once
+ if (NULL != sleepNotifier) {
+ // XXX should arg be xid of blocker?
+ (*sleepNotifier)(lr->xid);
+ }
+
+ _stats.incBlocks();
+
+ // this loop typically executes once
+ do {
+ // set up for future deadlock detection add requestor to blockers' waiters
+ //
+ for (list<LockId>::iterator nextBlocker = queue.begin();
+ nextBlocker != conflictPosition; ++nextBlocker) {
+ LockRequest* nextBlockingRequest = _locks[*nextBlocker];
+ if (nextBlockingRequest->lid == lr->lid) {break;}
+ if (nextBlockingRequest->xid == requestor) {continue;}
+ if (isCompatible(_locks[*nextBlocker]->mode, lr->mode)) {continue;}
+ _addWaiter(_locks[*nextBlocker]->xid, requestor);
+ ++lr->sleepCount;
+ }
+ if (kResourcePolicyConflict == resourceStatus) {
+ // to facilitate waking once the policy reverts, add requestor to system's waiters
+ _addWaiter(kReservedTxId, requestor);
+ ++lr->sleepCount;
+ }
+
+ // wait for blocker to release
+ while (lr->isBlocked()) {
+ Timer timer;
+ lr->lock.wait(guard);
+ _stats.incTimeBlocked(timer.millis());
+ }
+
+ conflictPosition = queue.begin();
+ resourceStatus = _conflictExists(lr->xid, lr->mode, lr->resId, queue, conflictPosition);
+ } while (hasConflict(resourceStatus));
+
+ return lr->lid;
+ }
+
+ /*
+ * called only when there are conflicting LockRequests
+ * positions a lock request (lr) in a queue at or after position
+ * also adds remaining requests in queue as lr's waiters
+ * for subsequent deadlock detection
+ */
+ void LockManager::_addLockToQueueUsingPolicy(LockRequest* lr,
+ list<LockId>& queue,
+ list<LockId>::iterator& position) {
+
+ if (position == queue.end()) {
+ queue.insert(position, lr->lid);
+ return;
+ }
+
+ // use lock request's transaction's priority if specified
+ int txPriority = _getTransactionPriorityInternal(lr->xid);
+ if (txPriority > 0) {
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (txPriority > _getTransactionPriorityInternal(nextRequest->xid)) {
+ // add in front of request with lower priority that is either
+ // compatible, or blocked
+ //
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ queue.push_back(lr->lid);
+ return;
+ }
+ else if (txPriority < 0) {
+ // for now, just push to end
+ // TODO: honor position of low priority requests
+ queue.push_back(lr->lid);
+ }
+
+ // use LockManager's default policy
+ switch (_policy) {
+ case kPolicyFirstCome:
+ queue.push_back(lr->lid);
+ position = queue.end();
+ return;
+ case kPolicyReadersFirst:
+ if (isExclusive(lr->mode)) {
+ queue.push_back(lr->lid);
+ position = queue.end();
+ return;
+ }
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (isExclusive(nextRequest->mode) && nextRequest->isBlocked()) {
+ // insert shared lock before first sleeping exclusive lock
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ break;
+ case kPolicyOldestTxFirst:
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (lr->xid < nextRequest->xid &&
+ (isCompatible(lr->mode, nextRequest->mode) || nextRequest->isBlocked())) {
+ // smaller xid is older, so queue it before
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ queue.push_back(lr->lid);
+ position = queue.end();
+ }
+
+ void LockManager::_addWaiter(const TxId& blocker, const TxId& requestor) {
+ if (blocker == requestor) {
+ // can't wait on self
+ return;
+ }
+
+ // add requestor to blocker's waiters
+ _waiters[blocker].insert(requestor);
+
+ // add all of requestor's waiters to blocker's waiters
+ _waiters[blocker].insert(_waiters[requestor].begin(), _waiters[requestor].end());
+ }
+
+ void LockManager::_addWaiters(LockRequest* blocker,
+ list<LockId>::iterator nextLockId,
+ list<LockId>::iterator lastLockId) {
+ for (; nextLockId != lastLockId; ++nextLockId) {
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+ if (! isCompatible(blocker->mode, nextLockRequest->mode)) {
+ if (nextLockRequest->sleepCount > 0) {
+ _addWaiter(blocker->xid, nextLockRequest->xid);
+ ++nextLockRequest->sleepCount;
+ }
+ }
+ }
+ }
+
+ bool LockManager::_comesBeforeUsingPolicy(const TxId& requestor,
+ const unsigned& mode,
+ const LockRequest* oldRequest) const {
+
+ // handle special policies
+ if (kPolicyReadersOnly == _policy && kShared == mode && oldRequest->isBlocked())
+ return true;
+ if (kPolicyWritersOnly == _policy && kExclusive == mode && oldRequest->isBlocked())
+ return true;
+
+ if (_getTransactionPriorityInternal(requestor) >
+ _getTransactionPriorityInternal(oldRequest->xid)) {
+ return true;
+ }
+
+ switch (_policy) {
+ case kPolicyFirstCome:
+ return false;
+ case kPolicyReadersFirst:
+ return isShared(mode);
+ case kPolicyOldestTxFirst:
+ return requestor < oldRequest->xid;
+ default:
+ return false;
+ }
+ }
+
+ LockManager::ResourceStatus LockManager::_conflictExists(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ list<LockId>& queue,
+ list<LockId>::iterator& nextLockId) {
+
+ // handle READERS/kPolicyWritersOnly policy conflicts
+ if ((kPolicyReadersOnly == _policy && isExclusive(mode)) ||
+ (kPolicyWritersOnly == _policy && isShared(mode))) {
+
+ if (nextLockId == queue.end()) { return kResourcePolicyConflict; }
+
+ // position past the last active lock request on the queue
+ list<LockId>::iterator lastActivePosition = queue.end();
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+ if (requestor == nextLockRequest->xid && mode == nextLockRequest->mode) {
+ return kResourceAcquired; // already have the lock
+ }
+ if (! nextLockRequest->isBlocked()) {
+ lastActivePosition = nextLockId;
+ }
+ }
+ if (lastActivePosition != queue.end()) {
+ nextLockId = lastActivePosition;
+ }
+ return kResourcePolicyConflict;
+ }
+
+ // loop over the lock requests in the queue, looking for the 1st conflict
+ // normally, we'll leave the nextLockId iterator positioned at the 1st conflict
+ // if there is one, or the position (often the end) where we know there is no conflict.
+ //
+ // upgrades complicate this picture, because we want to position the iterator
+ // after all initial share locks. but we may not know whether an exclusived request
+ // is an upgrade until we look at all the initial share locks.
+ //
+ // so we record the position of the 1st conflict, but continue advancing the
+ // nextLockId iterator until we've seen all initial share locks. If none have
+ // the same TxId as the exclusive request, we restore the position to 1st conflict
+ //
+ list<LockId>::iterator firstConflict = queue.end(); // invalid
+ set<TxId> sharedOwners; // all initial share lock owners
+ bool alreadyHadLock = false; // true if we see a lock with the same Txid
+
+ for (; nextLockId != queue.end(); ++nextLockId) {
+
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+
+ if (nextLockRequest->matches(requestor, mode, resId)) {
+ // if we're already on the queue, there's no conflict
+ return kResourceAcquired;
+ }
+
+ if (requestor == nextLockRequest->xid) {
+ // an upgrade or downgrade request, can't conflict with ourselves
+ if (isShared(mode)) {
+ // downgrade
+ _stats.incDowngrades();
+ ++nextLockId;
+ return kResourceAvailable;
+ }
+
+ // upgrade
+ alreadyHadLock = true;
+ _stats.incUpgrades();
+ // position after initial readers
+ continue;
+ }
+
+ if (isShared(nextLockRequest->mode)) {
+ invariant(!nextLockRequest->isBlocked() || kPolicyWritersOnly == _policy);
+
+ sharedOwners.insert(nextLockRequest->xid);
+
+ if (isExclusive(mode) && firstConflict == queue.end()) {
+ // if "lr" proves not to be an upgrade, restore this position later
+ firstConflict = nextLockId;
+ }
+ // either there's no conflict yet, or we're not done checking for an upgrade
+ continue;
+ }
+
+ // the next lock on the queue is an exclusive request
+ invariant(isExclusive(nextLockRequest->mode));
+
+ if (alreadyHadLock) {
+ // bumped into something incompatible while up/down grading
+ if (isExclusive(mode)) {
+ // upgrading: bumped into another exclusive lock
+ if (sharedOwners.find(nextLockRequest->xid) != sharedOwners.end()) {
+ // the exclusive lock is also an upgrade, and it must
+ // be blocked, waiting for our original share lock to be released
+ // if we wait for its shared lock, we would deadlock
+ //
+ invariant(nextLockRequest->isBlocked());
+ _abortInternal(requestor);
+ }
+
+ if (sharedOwners.empty()) {
+ // simple upgrade, queue in front of nextLockRequest, no conflict
+ return kResourceAvailable;
+ }
+ else {
+ // we have to wait for another shared lock before upgrading
+ return kResourceUpgradeConflict;
+ }
+ }
+
+ // downgrading, bumped into an exclusive lock, blocked on our original
+ invariant (isShared(mode));
+ invariant(nextLockRequest->isBlocked());
+ // lr will be inserted before nextLockRequest
+ return kResourceAvailable;
+ }
+ else if (firstConflict != queue.end()) {
+ // restore first conflict position
+ nextLockId = firstConflict;
+ nextLockRequest = _locks[*nextLockId];
+ }
+
+ // no conflict if nextLock is blocked and we come before
+ if (nextLockRequest->isBlocked() &&
+ _comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) {
+ return kResourceAvailable;
+ }
+
+ return kResourceConflict;
+ }
+
+ // positioned to the end of the queue
+ if (alreadyHadLock && isExclusive(mode) && !sharedOwners.empty()) {
+ // upgrading, queue consists of requestor's earlier share lock
+ // plus other share lock. Must wait for the others to release
+ return kResourceUpgradeConflict;
+ }
+ else if (firstConflict != queue.end()) {
+ nextLockId = firstConflict;
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+
+ if (_comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) {
+ return kResourceAvailable;
+ }
+
+ return kResourceConflict;
+ }
+ return kResourceAvailable;
+ }
+
+ LockManager::LockStatus LockManager::_findLock(const TxId& holder,
+ const unsigned& mode,
+ const ResourceId& resId,
+ LockId* outLockId) const {
+
+ *outLockId = kReservedLockId; // set invalid;
+
+ // get iterator for resId's locks
+ ResourceLocks::const_iterator resourceLocks = _resourceLocks.find(resId);
+ if (resourceLocks == _resourceLocks.end()) { return kLockResourceNotFound; }
+
+ // look for an existing lock request from holder in mode
+ for (list<LockId>::const_iterator nextLockId = resourceLocks->second.begin();
+ nextLockId != resourceLocks->second.end(); ++nextLockId) {
+ LockRequest* nextLockRequest = _locks.at(*nextLockId);
+ if (nextLockRequest->xid == holder && nextLockRequest->mode == mode) {
+ *outLockId = nextLockRequest->lid;
+ return kLockFound;
+ }
+ }
+ return kLockModeNotFound;
+ }
+
+ int LockManager::_getTransactionPriorityInternal(const TxId& xid) const {
+ map<TxId, int>::const_iterator txPriority = _txPriorities.find(xid);
+ if (txPriority == _txPriorities.end()) {
+ return 0;
+ }
+ return txPriority->second;
+ }
+
+ /*
+ * Used by acquireOne
+ * XXX: there's overlap between this, _conflictExists and _findLock
+ */
+ bool LockManager::_isAvailable(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId) const {
+
+ // check for exceptional policies
+ if (kPolicyReadersOnly == _policy && isExclusive(mode))
+ return false;
+ else if (kPolicyWritersOnly == _policy && isShared(mode))
+ return false;
+
+ ResourceLocks::const_iterator resLocks = _resourceLocks.find(resId);
+ if (resLocks == _resourceLocks.end()) {
+ return true; // no lock requests against this ResourceId, so must be available
+ }
+
+ // walk over the queue of previous requests for this ResourceId
+ const list<LockId>& queue = resLocks->second;
+ for (list<LockId>::const_iterator nextLockId = queue.begin();
+ nextLockId != queue.end(); ++nextLockId) {
+
+ LockRequest* nextLockRequest = _locks.at(*nextLockId);
+
+ if (nextLockRequest->matches(requestor, mode, resId)) {
+ // we're already have this lock, if we're asking, we can't be asleep
+ invariant(! nextLockRequest->isBlocked());
+ return true;
+ }
+
+ // no conflict if we're compatible
+ if (isCompatible(mode, nextLockRequest->mode)) continue;
+
+ // no conflict if nextLock is blocked and we come before
+ if (nextLockRequest->isBlocked() && _comesBeforeUsingPolicy(requestor, mode, nextLockRequest))
+ return true;
+
+ return false; // we're incompatible and would block
+ }
+
+ // everything on the queue (if anything is on the queue) is compatible
+ return true;
+ }
+
+ LockManager::LockStatus LockManager::_releaseInternal(const LockId& lid) {
+
+ if (kReservedLockId == lid) { return kLockIdNotFound; }
+
+ LockRequest* lr = _locks[lid];
+ if (NULL == lr) { return kLockIdNotFound; }
+
+ const TxId holder = lr->xid;
+ const unsigned mode = lr->mode;
+ const ResourceId resId = lr->resId;
+
+ ResourceLocks::iterator recordLocks = _resourceLocks.find(resId);
+ if (recordLocks == _resourceLocks.end()) {
+ return kLockResourceNotFound;
+ }
+
+ bool foundLock = false;
+ bool foundResource = false;
+
+ list<LockId>& queue = recordLocks->second;
+ list<LockId>::iterator nextLockId = queue.begin();
+
+ // find the position of the lock to release in the queue
+ for(; !foundLock && nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLock = _locks[*nextLockId];
+ if (lid != *nextLockId) {
+ if (nextLock->xid == holder) {
+ foundResource = true;
+ }
+ }
+ else {
+ // this is our lock.
+ if (0 < --nextLock->count) { return kLockCountDecremented; }
+
+ // release the lock
+ _xaLocks[holder].erase(*nextLockId);
+ _locks.erase(*nextLockId);
+ queue.erase(nextLockId++);
+ delete nextLock;
+
+ foundLock = true;
+ break; // don't increment nextLockId again
+ }
+ }
+
+ if (! foundLock) {
+ // can't release a lock that hasn't been acquired in the specified mode
+ return foundResource ? kLockModeNotFound : kLockResourceNotFound;
+ }
+
+ if (isShared(mode)) {
+ // skip over any remaining shared requests. they can't be waiting for us.
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLock = _locks[*nextLockId];
+ if (isExclusive(nextLock->mode)) {
+ break;
+ }
+ }
+ }
+
+ // everything left on the queue potentially conflicts with the lock just
+ // released, unless it's an up/down-grade of that lock. So iterate, and
+ // when TxIds differ, decrement sleepCount, wake those with zero counts, and
+ // decrement their sleep counts, waking sleepers with zero counts, and
+ // cleanup state used for deadlock detection
+
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextSleeper = _locks[*nextLockId];
+ if (nextSleeper->xid == holder) continue;
+
+ invariant(nextSleeper->isBlocked());
+
+ // wake up sleepy heads
+ if (nextSleeper->shouldAwake()) {
+ nextSleeper->lock.notify_one();
+ }
+ }
+#if 0
+ // verify stuff
+ if (_xaLocks[holder].empty()) {
+ for (WaitersMap::iterator dependencies = _waiters.begin();
+ dependencies != _waiters.end(); ++dependencies) {
+ verify( dependencies->second.find(holder) == dependencies->second.end());
+ }
+ }
+
+ if (!queue.empty()) {
+ verify(! _locks[queue.front()]->isBlocked());
+ }
+#endif
+ return kLockReleased;
+ }
+
+ void LockManager::_throwIfShuttingDown(const TxId& xid) const {
+ if (_shuttingDown && (_timer.millis() >= _millisToQuiesce ||
+ _xaLocks.find(xid) == _xaLocks.end())) {
+
+ throw AbortException(); // XXX should this be something else? ShutdownException?
+ }
+ }
+
+ /*---------- ResourceLock functions ----------*/
+
+ ResourceLock::ResourceLock(LockManager& lm,
+ const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ LockManager::Notifier* notifier)
+ : _lm(lm),
+ _lid(LockManager::kReservedLockId) // if acquire throws, we want this initialized
+ {
+ _lid = lm.acquire(requestor, mode, resId, notifier);
+ }
+
+ ResourceLock::~ResourceLock() {
+ _lm.releaseLock(_lid);
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/concurrency/lock_mgr.h b/src/mongo/db/concurrency/lock_mgr.h
new file mode 100644
index 00000000000..451e88daabe
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr.h
@@ -0,0 +1,621 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <iterator>
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "mongo/platform/compiler.h"
+#include "mongo/platform/cstdint.h"
+#include "mongo/util/timer.h"
+
+/*
+ * LockManager controls access to resources through two functions: acquire and release
+ *
+ * Resources are either RecordStores, or Records within an RecordStore, identified by a ResourceId.
+ * Resources are acquired for either shared or exclusive use, by transactions identified by a TxId.
+ * Acquiring Records in any mode implies acquisition of the Record's RecordStore
+ *
+ * Contention for a resource is resolved by a LockingPolicy, which determines which blocked
+ * resource requests to awaken when the blocker releases the resource.
+ *
+ */
+
+namespace mongo {
+#if 1
+ class TxId {
+ public:
+ TxId() : _xid(0) { }
+ TxId(size_t xid) : _xid(xid) { }
+ bool operator<(const TxId& other) const { return _xid < other._xid; }
+ bool operator==(const TxId& other) const { return _xid == other._xid; }
+ operator size_t() const { return _xid; }
+
+ private:
+ size_t _xid;
+ };
+
+ class ResourceId {
+ public:
+ ResourceId() : _rid(0) { }
+ ResourceId(size_t rid) : _rid(rid) { }
+ bool operator<(const ResourceId& other) const { return _rid < other._rid; }
+ bool operator==(const ResourceId& other) const { return _rid == other._rid; }
+ operator size_t() const { return _rid; }
+
+ private:
+ size_t _rid;
+ };
+#else
+ typedef size_t TxId; // identifies requesting transaction. 0 is reserved
+ typedef size_t ResourceId; // identifies requested resource. 0 is reserved
+#endif
+ static const TxId kReservedTxId = 0;
+ static const ResourceId kReservedResourceId = 0;
+
+ /**
+ * LockManager is used to control access to resources. Usually a singleton. For deadlock detection
+ * all resources used by a set of transactions that could deadlock, should use one LockManager.
+ *
+ * Primary functions are:
+ * acquire - acquire a resource for shared or exclusive use; may throw Abort.
+ * acquireOne - acquire one of a vector of resources, hopefully without blocking
+ * release - release a resource previously acquired for shared/exclusive use
+ */
+ class LockManager {
+ public:
+
+ /**
+ * thrown primarily when deadlocks are detected, or when LockManager::abort is called.
+ * also thrown when LockManager requests are made during shutdown.
+ */
+ class AbortException : public std::exception {
+ public:
+ const char* what() const throw ();
+ };
+
+ /**
+ * LockModes: shared and exclusive
+ */
+ enum LockModes {
+ kShared = 0x0,
+ kExclusive = 0x1
+ };
+
+ /**
+ * Used to decide which blocked requests to honor first when resource becomes available
+ */
+ enum Policy {
+ kPolicyFirstCome, // wake the first blocked request in arrival order
+ kPolicyReadersFirst, // wake the first blocked read request(s)
+ kPolicyOldestTxFirst, // wake the blocked request with the lowest TxId
+ kPolicyReadersOnly, // block write requests (used during fsync)
+ kPolicyWritersOnly // block read requests
+ };
+
+ /**
+ * returned by ::conflictExists, called from acquire
+ */
+ enum ResourceStatus {
+ kResourceAcquired, // requested resource was already acquired, increment count
+ kResourceAvailable, // requested resource is available. no waiting
+ kResourceConflict, // requested resource is in use by another transaction
+ kResourcePolicyConflict, // requested mode blocked by READER/kPolicyWritersOnly policy
+ kResourceUpgradeConflict // requested resource was previously acquired for shared use
+ // now requested for exclusive use, but there are other shared
+ // users, so this request must wait.
+ };
+
+ /**
+ * returned by ::find_lock, and ::release and, mostly for testing
+ * explains why a lock wasn't found or released
+ */
+ enum LockStatus {
+ kLockFound, // found a matching lock request
+ kLockReleased, // released requested lock
+ kLockCountDecremented, // decremented lock count, but didn't release
+ kLockIdNotFound, // no locks with this id
+ kLockResourceNotFound, // no locks on the resource
+ kLockModeNotFound // locks on the resource, but not of the specified mode
+ };
+
+ typedef size_t LockId; // valid LockIds are > 0
+ static const LockId kReservedLockId = 0;
+
+ /**
+ * Used to do something just before an acquisition request blocks.
+ *
+ * XXX: should perhaps define Notifier as a functor so C++11 lambda's match
+ * for the test.cpp, it was convenient for the call operator to access private
+ * state, which is why we're using the class formulation for now
+ */
+ class Notifier {
+ public:
+ virtual ~Notifier() { }
+ virtual void operator()(const TxId& blocker) = 0;
+ };
+
+ /**
+ * Tracks locking statistics. For now, just aggregated across all resources/TxIds
+ * Eventually might keep per TxId and/or per Resource, to facilitate identifying
+ * hotspots and problem transactions.
+ */
+ class LockStats {
+ public:
+ LockStats()
+ : _numRequests(0)
+ , _numPreexistingRequests(0)
+ , _numSameRequests(0)
+ , _numBlocks(0)
+ , _numDeadlocks(0)
+ , _numDowngrades(0)
+ , _numUpgrades(0)
+ , _numMillisBlocked(0)
+ , _numCurrentActiveReadRequests(0)
+ , _numCurrentActiveWriteRequests(0) { }
+
+ void incRequests() { _numRequests++; }
+ void incPreexisting() { _numPreexistingRequests++; }
+ void incSame() { _numSameRequests++; }
+ void incBlocks() { _numBlocks++; }
+ void incDeadlocks() { _numDeadlocks++; }
+ void incDowngrades() { _numDowngrades++; }
+ void incUpgrades() { _numUpgrades++; }
+ void incTimeBlocked(size_t numMillis ) { _numMillisBlocked += numMillis; }
+
+ void incStatsForMode(const unsigned mode) {
+ 0==mode ? incActiveReads() : incActiveWrites();
+ }
+ void decStatsForMode(const unsigned mode) {
+ 0==mode ? decActiveReads() : decActiveWrites();
+ }
+
+ void incActiveReads() { _numCurrentActiveReadRequests++; }
+ void decActiveReads() { _numCurrentActiveReadRequests--; }
+ void incActiveWrites() { _numCurrentActiveWriteRequests++; }
+ void decActiveWrites() { _numCurrentActiveWriteRequests--; }
+
+ unsigned numActiveReads() const { return _numCurrentActiveReadRequests; }
+ unsigned numActiveWrites() const { return _numCurrentActiveWriteRequests; }
+
+ size_t getNumRequests() const { return _numRequests; }
+ size_t getNumPreexistingRequests() const { return _numPreexistingRequests; }
+ size_t getNumSameRequests() const { return _numSameRequests; }
+ size_t getNumBlocks() const { return _numBlocks; }
+ size_t getNumDeadlocks() const { return _numDeadlocks; }
+ size_t getNumDowngrades() const { return _numDowngrades; }
+ size_t getNumUpgrades() const { return _numUpgrades; }
+ size_t getNumMillisBlocked() const { return _numMillisBlocked; }
+
+ std::string toString() const;
+
+ private:
+ size_t _numRequests;
+ size_t _numPreexistingRequests;
+ size_t _numSameRequests;
+ size_t _numBlocks;
+ size_t _numDeadlocks;
+ size_t _numDowngrades;
+ size_t _numUpgrades;
+ size_t _numMillisBlocked;
+ unsigned long _numCurrentActiveReadRequests;
+ unsigned long _numCurrentActiveWriteRequests;
+ };
+
+
+ public:
+
+ /**
+ * Singleton factory - retrieves a common instance of LockManager
+ */
+ static LockManager& getSingleton();
+
+ /**
+ * It's possibly useful to allow multiple LockManagers for non-overlapping sets
+ * of resources, so the constructor is left public. Eventually we may want
+ * to enforce a singleton pattern.
+ */
+ explicit LockManager(const Policy& policy=kPolicyFirstCome);
+ ~LockManager();
+
+ /**
+ * Get the current policy
+ */
+ Policy getPolicy() const;
+
+ /**
+ * Who set the current policy. Of use when the Policy is ReadersOnly
+ * and we want to find out who is blocking a writer.
+ */
+ TxId getPolicySetter() const;
+
+ /**
+ * Initiate a shutdown, specifying a period of time to quiesce.
+ *
+ * During this period, existing transactions can continue to acquire resources,
+ * but new transaction requests will throw AbortException.
+ *
+ * After quiescing, any new requests will throw AbortException
+ */
+ void shutdown(const unsigned& millisToQuiesce = 1000);
+
+
+ /**
+ * override default LockManager's default Policy for a transaction.
+ *
+ * positive priority moves transaction's resource requests toward the front
+ * of the queue, behind only those requests with higher priority.
+ *
+ * negative priority moves transaction's resource requests toward the back
+ * of the queue, ahead of only those requests with lower priority.
+ *
+ * zero priority uses the LockManager's default Policy
+ */
+ void setTransactionPriority(const TxId& xid, int priority);
+ int getTransactionPriority(const TxId& xid) const;
+
+
+ /**
+ * acquire a resource in a mode.
+ * can throw AbortException
+ */
+ LockId acquire(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* notifier = NULL);
+
+ /**
+ * for bulk operations:
+ * acquire one of a vector of ResourceIds in a mode,
+ * hopefully without blocking, return index of
+ * acquired ResourceId, or -1 if vector was empty
+ */
+ int acquireOne(const TxId& requestor,
+ const uint32_t& mode,
+ const std::vector<ResourceId>& records,
+ Notifier* notifier = NULL);
+
+ /**
+ * release a ResourceId.
+ * The mode here is just the mode that applies to the resId
+ */
+ LockStatus release(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId);
+
+ /**
+ * releases the lock returned by acquire. should perhaps replace above?
+ */
+ LockStatus releaseLock(const LockId& lid);
+
+ /**
+ * release all resources acquired by a transaction
+ * returns number of locks released
+ */
+ size_t release(const TxId& holder);
+
+ /**
+ * called internally for deadlock
+ * possibly called publicly to stop a long transaction
+ * also used for testing
+ */
+ MONGO_COMPILER_NORETURN void abort(const TxId& goner);
+
+ /**
+ * returns a copy of the stats that exist at the time of the call
+ */
+ LockStats getStats() const;
+
+
+
+ // --- for testing and logging
+
+ std::string toString() const;
+
+ /**
+ * test whether a TxId has locked a ResourceId in a mode
+ */
+ bool isLocked(const TxId& holder,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ protected:
+
+ /**
+ * Data structure used to record a resource acquisition request
+ */
+ class LockRequest {
+ public:
+ LockRequest(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId);
+
+ ~LockRequest();
+
+ bool matches(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ bool isBlocked() const;
+ bool shouldAwake();
+
+ std::string toString() const;
+
+ // uniquely identifies a LockRequest
+ const LockId lid;
+
+ // transaction that made this request
+ const TxId xid;
+
+ // shared or exclusive use
+ const unsigned mode;
+
+ // resource requested
+ const ResourceId resId;
+
+ // number of times xid requested this resource in this mode
+ // request will be deleted when count goes to 0
+ size_t count;
+
+ // number of existing things blocking this request
+ // usually preceding requests on the queue, but also policy
+ size_t sleepCount;
+
+ // used for waiting and waking
+ boost::condition_variable lock;
+ };
+
+ typedef std::map<TxId, std::set<LockId> > TxLocks;
+ typedef std::map<ResourceId, std::list<LockId> > ResourceLocks;
+ typedef std::map<LockId, LockRequest*> LockMap;
+ typedef std::map<TxId, std::set<LockId> > TxLockMap;
+
+ private: // alphabetical
+
+ /**
+ * called by public ::abort and internally upon deadlock
+ * releases all locks acquired by goner, notify's any
+ * transactions that were waiting, then throws AbortException
+ */
+ MONGO_COMPILER_NORETURN void _abortInternal(const TxId& goner);
+
+ /**
+ * main workhorse for acquiring locks on resources, blocking
+ * or aborting on conflict
+ *
+ * returns a non-zero LockId, or throws AbortException on deadlock
+ *
+ */
+ LockId _acquireInternal(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ Notifier* notifier,
+ boost::unique_lock<boost::mutex>& guard);
+
+ /**
+ * adds a conflicting lock request to the list of requests for a resource
+ * using the Policy. Called by acquireInternal
+ */
+ void _addLockToQueueUsingPolicy(LockRequest* lr,
+ std::list<LockId>& queue,
+ std::list<LockId>::iterator& position);
+
+ /**
+ * set up for future deadlock detection, called from acquire
+ */
+ void _addWaiter(const TxId& blocker, const TxId& waiter);
+
+ /**
+ * when inserting a new lock request into the middle of a queue,
+ * add any remaining incompatible requests in the queue to the
+ * new lock request's set of waiters... for future deadlock detection
+ */
+ void _addWaiters(LockRequest* blocker,
+ std::list<LockId>::iterator nextLockId,
+ std::list<LockId>::iterator lastLockId);
+
+ /**
+ * returns true if a newRequest should be honored before an oldRequest according
+ * to the lockManager's policy. Used by acquire to decide whether a new share request
+ * conflicts with a previous upgrade-to-exclusive request that is blocked.
+ */
+ bool _comesBeforeUsingPolicy(const TxId& newReqXid,
+ const unsigned& newReqMode,
+ const LockRequest* oldReq) const;
+
+ /**
+ * determine whether a resource request would conflict with an existing lock
+ * set the position to the first possible insertion point, which is usually
+ * the position of the first conflict, or the end of the queue, or to an existing lock
+ */
+ ResourceStatus _conflictExists(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ std::list<LockId>& queue,
+ std::list<LockId>::iterator& position /* in/out */);
+
+ /**
+ * looks for an existing LockRequest that matches the four input params
+ * if not found, sets outLid to zero and returns a reason, otherwise
+ * sets outLid to the LockId that matches and returns kLockFound
+ */
+ LockStatus _findLock(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ LockId* outLid) const;
+
+ /**
+ * called externally by getTransactionPriority
+ * and internally by addLockToQueueUsingPolicy
+ */
+ int _getTransactionPriorityInternal(const TxId& xid) const;
+
+ /**
+ * returns true if acquire would return without waiting
+ * used by acquireOne
+ */
+ bool _isAvailable(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ /**
+ * called by public ::release and internally by abort.
+ * assumes caller as acquired a mutex.
+ */
+ LockStatus _releaseInternal(const LockId& lid);
+
+ /**
+ * called at start of public APIs, throws exception
+ * if quiescing period has expired, or if xid is new
+ */
+ void _throwIfShuttingDown(const TxId& xid = 0 ) const;
+
+
+ private:
+
+ // Singleton instance
+ static boost::mutex _getSingletonMutex;
+ static LockManager* _singleton;
+
+ // The Policy controls which requests should be honored first. This is
+ // used to guide the position of a request in a list of requests waiting for
+ // a resource.
+ //
+ // XXX At some point, we may want this to also guide the decision of which
+ // transaction to abort in case of deadlock. For now, the transaction whose
+ // request would lead to a deadlock is aborted. Since deadlocks are rare,
+ // careful choices may not matter much.
+ //
+ Policy _policy;
+ TxId _policySetter;
+
+ // synchronizes access to the lock manager, which is shared across threads
+ mutable boost::mutex _mutex;
+
+ // for blocking when setting kPolicyReadersOnly or kPolicyWritersOnly policy
+ boost::condition_variable _policyLock;
+
+ // only meaningful when _policy == SHUTDOWN
+ bool _shuttingDown;
+ int _millisToQuiesce;
+ Timer _timer;
+
+ // owns the LockRequest*
+ std::map<LockId, LockRequest*> _locks;
+
+ // Lists of lock requests associated with a resource,
+ //
+ // The lock-request lists have two sections. Some number (at least one) of requests
+ // at the front of a list are "active". All remaining lock requests are blocked by
+ // some earlier (not necessarily active) lock request, and are waiting. The order
+ // of lock request in the waiting section is determined by the LockPolicty.
+ // The order of lock request in the active/front portion of the list is irrelevant.
+ //
+ std::map<ResourceId, std::list<LockId> > _resourceLocks;
+
+ // For cleanup and abort processing, references all LockRequests made by a transaction
+ std::map<TxId, std::set<LockId> > _xaLocks;
+
+ // For deadlock detection: the set of transactions blocked by another transaction
+ // NB: a transaction can only be directly waiting for a single resource/transaction
+ // but to facilitate deadlock detection, if T1 is waiting for T2 and T2 is waiting
+ // for T3, then both T1 and T2 are listed as T3's waiters.
+ std::map<TxId, std::set<TxId> > _waiters;
+
+ // track transactions that have aborted, and don't accept further
+ // lock requests from them (which shouldn't happen anyway).
+ //
+ // XXX: this set can grow without bound in the pathological case. One way to deal
+ // with it is to track the oldest active transaction (which may or may not be the
+ // smallest TxId value), and flush older values from this set and ignore older values
+ // in new requests without consulting this set.
+ std::set<TxId> _abortedTxIds;
+
+ // transaction priorities:
+ // 0 => neutral, use LockManager's default _policy
+ // + => high, queue forward
+ // - => low, queue back
+ //
+ std::map<TxId, int> _txPriorities;
+
+ // stats, but also used internally
+ LockStats _stats;
+ };
+
+ /**
+ * RAII wrapper around LockManager, for scoped locking
+ */
+ class ResourceLock {
+ public:
+ ResourceLock(LockManager& lm,
+ const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ LockManager::Notifier* notifier = NULL);
+
+ ~ResourceLock();
+ private:
+ LockManager& _lm;
+ LockManager::LockId _lid;
+ };
+
+ class SharedResourceLock : public ResourceLock {
+ public:
+ SharedResourceLock(const TxId& requestor, void* resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kShared,
+ (size_t)resource) { }
+ SharedResourceLock(const TxId& requestor, size_t resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kShared,
+ resource) { }
+ };
+
+ class ExclusiveResourceLock : public ResourceLock {
+ public:
+ ExclusiveResourceLock(const TxId& requestor, void* resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kExclusive,
+ (size_t)resource) { }
+ ExclusiveResourceLock(const TxId& requestor, size_t resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kExclusive,
+ resource) { }
+ };
+} // namespace mongo
diff --git a/src/mongo/db/concurrency/lock_mgr_test.cpp b/src/mongo/db/concurrency/lock_mgr_test.cpp
new file mode 100644
index 00000000000..06840a41cb3
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr_test.cpp
@@ -0,0 +1,896 @@
+/**
+* Copyright (C) MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+/**
+ * tests for util/concurrency/lock_mgr.* capabilities
+ *
+ * Testing concurrency requires multiple threads. In this test, these are packaged as
+ * instances of a ClientTransaction class. The test's main thread creates instances
+ * of ClientTransactions and communicates with them through a pair of producer/consumer
+ * buffers. The driver thread sends requests to ClientTransaction threads using a
+ * TxCommandBuffer, and waits for responses in a TxResponseBuffer. This protocol
+ * allows precise control over timing.
+ *
+ * The producer/consumer buffer is possibly overkill. At present there is only
+ * one producer and one consumer using any one buffer. Also, the current set of
+ * tests are 'lock-step', with the driver never issuing more than one command
+ * before waiting for a response.
+ */
+
+#include <boost/thread/thread.hpp>
+#include "mongo/unittest/unittest.h"
+#include "mongo/db/concurrency/lock_mgr.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+ enum TxCmd {
+ ACQUIRE,
+ RELEASE,
+ ABORT,
+ POLICY,
+ QUIT,
+ INVALID
+ };
+
+ enum TxRsp {
+ ACQUIRED,
+ RELEASED,
+ BLOCKED,
+ AWAKENED,
+ ABORTED,
+ INVALID_RESPONSE
+ };
+
+ class TxResponse {
+ public:
+ TxRsp rspCode;
+ TxId xid;
+ unsigned mode;
+ ResourceId resId;
+ };
+
+ class TxRequest {
+ public:
+ TxCmd cmd;
+ TxId xid;
+ unsigned mode;
+ ResourceId resId;
+ LockManager::Policy policy;
+ };
+
+ class TxResponseBuffer {
+ public:
+ TxResponseBuffer() : _count(0), _readPos(0), _writePos(0) { }
+
+ void post(const TxRsp& rspCode) {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 10)
+ _full.wait(guard);
+ buffer[_writePos++].rspCode = rspCode;
+ _writePos %= 10;
+ _count++;
+ _empty.notify_one();
+ }
+
+ TxResponse* consume() {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 0)
+ _empty.wait(guard);
+ TxResponse* result = &buffer[_readPos++];
+ _readPos %= 10;
+ _count--;
+ _full.notify_one();
+ return result;
+ }
+
+ boost::mutex _guard;
+ boost::condition_variable _full;
+ boost::condition_variable _empty;
+
+ size_t _count;
+ size_t _readPos;
+ size_t _writePos;
+
+ TxResponse buffer[10];
+ };
+
+class TxCommandBuffer {
+public:
+ TxCommandBuffer() : _count(0), _readPos(0), _writePos(0) { }
+
+ void post(const TxCmd& cmd,
+ const TxId& xid = 0,
+ const unsigned& mode = 0,
+ const ResourceId& resId = 0,
+ const LockManager::Policy& policy = LockManager::kPolicyFirstCome) {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 10)
+ _full.wait(guard);
+ buffer[_writePos].cmd = cmd;
+ buffer[_writePos].xid = xid;
+ buffer[_writePos].mode = mode;
+ buffer[_writePos].resId = resId;
+ buffer[_writePos].policy = policy;
+ _writePos++;
+ _writePos %= 10;
+ _count++;
+ _empty.notify_one();
+ }
+
+ TxRequest* consume() {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 0)
+ _empty.wait(guard);
+ TxRequest* result = &buffer[_readPos++];
+ _readPos %= 10;
+ _count--;
+ _full.notify_one();
+ return result;
+ }
+
+ boost::mutex _guard;
+ boost::condition_variable _full;
+ boost::condition_variable _empty;
+
+ size_t _count;
+ size_t _readPos;
+ size_t _writePos;
+
+ TxRequest buffer[10];
+};
+
+class ClientTransaction : public LockManager::Notifier {
+public:
+ // these are called in the main driver program
+
+ ClientTransaction(LockManager* lm, const TxId& xid) : _lm(lm), _xid(xid), _thr(&ClientTransaction::processCmd, this) { }
+ virtual ~ClientTransaction() { _thr.join(); }
+
+ void acquire(const unsigned& mode, const ResourceId resId, const TxRsp& rspCode) {
+ _cmd.post(ACQUIRE, _xid, mode, 1, resId);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(rspCode == rsp->rspCode);
+ }
+
+ void release(const unsigned& mode, const ResourceId resId) {
+ _cmd.post(RELEASE, _xid, mode, 1, resId);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(RELEASED == rsp->rspCode);
+ }
+
+ void abort() {
+ _cmd.post(ABORT, _xid);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(ABORTED == rsp->rspCode);
+ }
+
+ void wakened() {
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(ACQUIRED == rsp->rspCode);
+ }
+
+ void setPolicy(const LockManager::Policy& policy, const TxRsp& rspCode) {
+ _cmd.post(POLICY, _xid, 0, 0, 0, policy);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(rspCode == rsp->rspCode);
+ }
+
+ void quit() {
+ _cmd.post(QUIT);
+ }
+
+ // these are run within the client threads
+ void processCmd() {
+ bool more = true;
+ while (more) {
+ TxRequest* req = _cmd.consume();
+ switch (req->cmd) {
+ case ACQUIRE:
+ try {
+ _lm->acquire(_xid, req->mode, req->resId, this);
+ _rsp.post(ACQUIRED);
+ } catch (const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+// log() << "t" << _xid << ": aborted, ending" << endl;
+ return;
+ }
+ break;
+ case RELEASE:
+ _lm->release(_xid, req->mode, req->resId);
+ _rsp.post(RELEASED);
+ break;
+ case ABORT:
+ try {
+ _lm->abort(_xid);
+ } catch (const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+ }
+ break;
+ case POLICY:
+ try {
+ _lm->setPolicy(_xid, req->policy, this);
+ _rsp.post(ACQUIRED);
+ } catch( const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+ }
+ break;
+ case QUIT:
+ default:
+ more = false;
+ break;
+ }
+ }
+ }
+
+ // inherited from Notifier, used by LockManager::acquire
+ virtual void operator()(const TxId& blocker) {
+ _rsp.post(BLOCKED);
+ }
+
+private:
+ TxCommandBuffer _cmd;
+ TxResponseBuffer _rsp;
+ LockManager* _lm;
+ TxId _xid;
+ boost::thread _thr;
+
+};
+
+TEST(LockManagerTest, TxError) {
+ LockManager lm;
+ LockManager::LockStatus status;
+
+ // release a lock on a container we haven't locked
+ lm.release(1, LockManager::kShared, 0, 1);
+
+
+ // release a lock on a record we haven't locked
+ status = lm.release(1, LockManager::kShared, 0, 1);
+ ASSERT(LockManager::kLockContainerNotFound == status);
+
+
+ // release a lock on a record we haven't locked in a store we have locked
+ lm.acquire(1, LockManager::kShared, 0, 2);
+ status = lm.release(1, LockManager::kShared, 0, 1); // this is in error
+ ASSERT(LockManager::kLockResourceNotFound == status);
+ status = lm.release(1, LockManager::kShared, 0, 2);
+ ASSERT(LockManager::kLockReleased == status);
+
+ // release a record we've locked in a different mode
+ lm.acquire(1, LockManager::kShared, 0, 1);
+ status = lm.release(1, LockManager::kExclusive, 0, 1); // this is in error
+ ASSERT(LockManager::kLockModeNotFound == status);
+ status = lm.release(1, LockManager::kShared, 0, 1);
+ ASSERT(LockManager::kLockReleased == status);
+
+ lm.acquire(1, LockManager::kExclusive, 0, 1);
+ status = lm.release(1, LockManager::kShared, 0, 1); // this is in error
+ ASSERT(LockManager::kLockModeNotFound == status);
+ status = lm.release(1, LockManager::kExclusive, 0, 1);
+ ASSERT(LockManager::kLockReleased == status);
+
+ // attempt to acquire on a transaction that aborted
+ try {
+ lm.abort(1);
+ } catch (const LockManager::AbortException& err) { }
+ try {
+ lm.acquire(1, LockManager::kShared, 0, 1); // error
+ ASSERT(false);
+ } catch (const LockManager::AbortException& error) {
+ }
+}
+
+TEST(LockManagerTest, SingleTx) {
+ LockManager lm;
+ ResourceId store = 1;
+ TxId t1 = 1;
+ ResourceId r1 = 1;
+ LockManager::LockStatus status;
+
+ // acquire a shared record lock
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release a shared record lock
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // acquire a shared record lock twice, on same ResourceId
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release the twice-acquired lock, once. Still locked
+ status = lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(LockManager::kLockCountDecremented == status);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // after 2nd release, it's not locked
+ status = lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(LockManager::kLockReleased == status);
+ ASSERT(!lm.isLocked(t1, LockManager::kShared, store, r1));
+
+
+
+ // --- test downgrade and release ---
+
+ // acquire an exclusive then a shared lock, on the same ResourceId
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release shared first, then exclusive
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+
+ // release exclusive first, then shared
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+
+
+ // --- test upgrade and release ---
+
+ // acquire a shared, then an exclusive lock on the same ResourceId
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+
+ // release exclusive first, then shared
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release shared first, then exclusive
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+}
+
+TEST(LockManagerTest, TxConflict) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ // no conflicts with shared locks on same/different objects
+
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ t1.acquire(LockManager::kShared, 2, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kShared, 2);
+ t2.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 2);
+
+
+ // no conflicts with exclusive locks on different objects
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 2, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.release(LockManager::kExclusive, 2);
+
+
+ // shared then exclusive conflict
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t2's request is incompatible with t1's lock, so it should block
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened(); // with t1's lock released, t2 should wake
+ t2.release(LockManager::kExclusive, 1);
+
+ // exclusive then shared conflict
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ // exclusive then exclusive conflict
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxDeadlock) {
+ LockManager lm(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ ClientTransaction a1(&lm, 4);
+ ClientTransaction a2(&lm, 5);
+ ClientTransaction a3(&lm, 6);
+ ClientTransaction a4(&lm, 7);
+ ClientTransaction a5(&lm, 8);
+
+ // simple deadlock test 1
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ a1.acquire(LockManager::kShared, 2, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 2, BLOCKED);
+ // a1's request would form a dependency cycle, so it should abort
+ a1.acquire(LockManager::kExclusive, 1, ABORTED);
+ t1.wakened(); // with t2's locks released, t1 should wake
+ t1.release(LockManager::kExclusive, 2);
+ t1.release(LockManager::kShared, 1);
+
+ // simple deadlock test 2
+ a2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ // a2's request would form a dependency cycle, so it should abort
+ a2.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened(); // with a2's locks released, t2 should wake
+ t2.release(LockManager::kExclusive, 1);
+ t2.release(LockManager::kShared, 2);
+
+ // three way deadlock
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ a3.acquire(LockManager::kShared, 3, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 2, BLOCKED);
+ t2.acquire(LockManager::kExclusive, 3, BLOCKED);
+ // a3's request would form a dependency cycle, so it should abort
+ a3.acquire(LockManager::kExclusive, 1, ABORTED);
+ t2.wakened(); // with a3's lock release, t2 should wake
+ t2.release(LockManager::kShared, 2);
+ t1.wakened(); // with t2's locks released, t1 should wake
+ t2.release(LockManager::kExclusive, 3);
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kExclusive, 2);
+
+ // test for phantom deadlocks
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ // at this point, t2 should no longer be waiting for t1
+ // so it should be OK for t1 to wait for t2
+ t1.acquire(LockManager::kShared, 1, BLOCKED);
+ t2.release(LockManager::kExclusive, 1);
+ t1.wakened();
+ t1.release(LockManager::kShared, 1);
+
+ // test for missing deadlocks
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a4
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on t1
+ // after this, because readers first policy, t2 should
+ // also be waiting on a4.
+ a4.acquire(LockManager::kShared, 1, ACQUIRED);
+ // after this, t2 should be waiting ONLY on a4
+ t1.release(LockManager::kShared, 1);
+ // So a4 should not be allowed to wait on t2's resource.
+ a4.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+ t2.release(LockManager::kExclusive, 1);
+
+
+ // test for missing deadlocks: due to downgrades
+ a5.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ a5.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a5
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on a5
+ a5.release(LockManager::kExclusive, 1);
+ // at this point, t2 should still be blocked on a5's downgraded lock
+ // So a5 should not be allowed to wait on t2's resource.
+ a5.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxDowngrade) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade
+ // t1 still has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade
+ // t1 still has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ // with t1 still holding exclusive on resource 1, t2 still blocked
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t2.release(LockManager::kShared, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ // t1 has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ // even though t2 is waiting for resource 1, t1 can still use it shared,
+ // because it already owns exclusive lock and can't block on itself
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ // t2 acquires exclusive during t1's downgrade
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxUpgrade) {
+ LockManager lm(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+ ClientTransaction t3(&lm, 3);
+
+ ClientTransaction a2(&lm, 4);
+ ClientTransaction a3(&lm, 5);
+
+ // test upgrade succeeds, blocks subsequent reads
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED); // upgrade
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ // test upgrade blocks, then wakes
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t1 can't use resource 1 exclusively yet, because t2 is using it
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.release(LockManager::kShared, 1);
+ t1.wakened(); // with t2's shared lock released, t1 wakes
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+
+ // test upgrade blocks on several, then wakes
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t1 can't use resource 1 exclusively yet, because t2 is using it
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t3.acquire(LockManager::kShared, 1, ACQUIRED); // additional blocker
+ t2.release(LockManager::kShared, 1); // t1 still blocked
+ t3.release(LockManager::kShared, 1);
+ t1.wakened(); // with t3's shared lock released, t1 wakes
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+
+ // failure to upgrade
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ a2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ a2.acquire(LockManager::kExclusive, 1, ABORTED);
+ // with a2's abort, t1 can wake
+ t1.wakened();
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kExclusive, 1);
+
+ // failure to upgrade
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ a3.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.release(LockManager::kShared, 1); // t1 still blocked on a3
+ a3.acquire(LockManager::kExclusive, 1, ABORTED);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+}
+
+TEST(LockManagerTest, TxPolicy) {
+
+ {
+ // Test FirstComeFirstServe policy
+ LockManager lm_first;
+ ClientTransaction t1(&lm_first, 1);
+ ClientTransaction t2(&lm_first, 2);
+ ClientTransaction t3(&lm_first, 3);
+ // test1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t2 should wake first, because its request came before t3's
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ // test2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, because its request came before t2's
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ // Test kPolicyReadersFirst
+ // shared request are considered read requests
+
+ LockManager lm_readers(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm_readers, 1);
+ ClientTransaction t2(&lm_readers, 2);
+ ClientTransaction t3(&lm_readers, 3);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, even though t3 came first in time
+ // because t2 is a reader and t3 is a writer
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ // Test OLDEST_TX_FIRST policy
+ // for now, smaller TxIds are considered older
+
+ LockManager lm_oldest(LockManager::kPolicyOldestTxFirst);
+ ClientTransaction t1(&lm_oldest, 1);
+ ClientTransaction t2(&lm_oldest, 2);
+ ClientTransaction t3(&lm_oldest, 3);
+
+ // test 1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, even though t3 came first in time
+ // because t2 is older than t3
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ // test 2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, because it's older than t3
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ LockManager lm_blockers(LockManager::kPolicyBlockersFirst);
+ ClientTransaction t1(&lm_blockers, 1);
+ ClientTransaction t2(&lm_blockers, 2);
+ ClientTransaction t3(&lm_blockers, 3);
+ ClientTransaction t4(&lm_blockers, 4);
+
+ // BIGGEST_BLOCKER_FIRST policy
+
+ // set up t3 as the biggest blocker
+ t3.acquire(LockManager::kExclusive, 2, ACQUIRED);
+ t4.acquire(LockManager::kExclusive, 2, BLOCKED);
+
+ // test 1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, because it's a bigger blocker than t2
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ // test 2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, even though t2 came first,
+ // because it's a bigger blocker than t2
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ t3.release(LockManager::kExclusive, 2);
+ t4.wakened();
+ t4.release(LockManager::kExclusive, 2);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ t4.quit();
+ }
+}
+
+/*
+ * test kPolicyReadersOnly and kPolicyWritersOnly
+ */
+TEST(LockManagerTest, TxOnlyPolicies) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+ ClientTransaction t3(&lm, 3);
+ ClientTransaction t4(&lm, 4);
+ ClientTransaction t5(&lm, 5);
+ ClientTransaction tp(&lm, 6);
+
+ // show kPolicyReadersOnly blocking writers, which
+ // awake when policy reverts
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyReadersOnly, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict
+ t4.acquire(LockManager::kExclusive, 1, BLOCKED); // both policy & t1
+ t5.acquire(LockManager::kShared, 1, ACQUIRED); // even tho t4
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 2);
+ t1.release(LockManager::kShared, 1);
+ t5.release(LockManager::kShared, 1);
+ t4.wakened();
+ t4.release(LockManager::kExclusive, 1);
+
+ // show WRITERS_ONLY blocking readers, which
+ // awake when policy reverts
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyWritersOnly, ACQUIRED);
+ t3.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict
+ t4.acquire(LockManager::kShared, 1, BLOCKED); // both policy & t1
+ t1.release(LockManager::kExclusive, 1);
+ t5.acquire(0x3/*LockManager::kExclusive*/, 2, ACQUIRED); // even tho t3
+ t5.release(LockManager::kExclusive, 2);
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t3.wakened();
+ t3.release(LockManager::kShared, 2);
+ t4.wakened();
+ t4.release(LockManager::kShared, 1);
+
+ // show READERS_ONLY blocked by existing writer
+ // but still blocking new writers
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyReadersOnly, BLOCKED); // blocked by t1
+ t2.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict
+ t3.acquire(LockManager::kShared, 2, ACQUIRED); // even tho t2
+ t3.release(LockManager::kShared, 2);
+ t1.release(LockManager::kExclusive, 1);
+ tp.wakened();
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 2);
+
+ // show WRITERS_ONLY blocked by existing reader
+ // but still blocking new readers
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyWritersOnly, BLOCKED); // blocked by t1
+ t2.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict
+ t1.release(LockManager::kShared, 1);
+ tp.wakened();
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ t4.quit();
+ t5.quit();
+ tp.quit();
+}
+
+TEST(LockManagerTest, TxShutdown) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ lm.shutdown(3000);
+
+ // t1 can still do work while quiescing
+ t1.release(LockManager::kShared, 1);
+ t1.acquire(LockManager::kShared, 2, ACQUIRED);
+
+ // t2 is new and should be refused
+ t2.acquire(LockManager::kShared, 3, ABORTED);
+
+ // after the quiescing period, t1's request should be refused
+ sleep(3);
+ t1.acquire(LockManager::kShared, 4, ABORTED);
+}
+}
diff --git a/src/mongo/db/d_concurrency.cpp b/src/mongo/db/d_concurrency.cpp
index c13ea3e9866..a8651645303 100644
--- a/src/mongo/db/d_concurrency.cpp
+++ b/src/mongo/db/d_concurrency.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/d_concurrency.h"
#include "mongo/db/client.h"
@@ -36,6 +38,7 @@
#include "mongo/db/d_globals.h"
#include "mongo/db/lockstat.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/server.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/mapsf.h"
@@ -402,7 +405,46 @@ namespace mongo {
}
}
- void Lock::DBWrite::lockOther(const StringData& db) {
+ void Lock::DBWrite::lockOtherRead(const StringData& db) {
+ fassert(18517, !db.empty());
+
+ // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work.
+ if( _lockState->otherCount() ) {
+ // nested. prev could be read or write. if/when we do temprelease with DBRead/DBWrite we will need to increment/decrement here
+ // (so we can not release or assert if nested). temprelease we should avoid if we can though, it's a bit of an anti-pattern.
+ massert(18513,
+ str::stream() << "internal error tried to lock two databases at the same time. old:"
+ << _lockState->otherName() << " new:" << db,
+ db == _lockState->otherName());
+ return;
+ }
+
+ // first lock for this db. check consistent order with local db lock so we never deadlock. local always comes last
+ massert(18514,
+ str::stream() << "can't dblock:" << db
+ << " when local or admin is already locked",
+ _lockState->nestableCount() == 0);
+
+ if (db != _lockState->otherName()) {
+ DBLocksMap::ref r(dblocks);
+ WrapperForRWLock*& lock = r[db];
+ if (lock == NULL) {
+ lock = new WrapperForRWLock(db);
+ }
+
+ _lockState->lockedOther(db, -1, lock);
+ }
+ else {
+ DEV OCCASIONALLY{ dassert(dblocks.get(db) == _lockState->otherLock()); }
+ _lockState->lockedOther(-1);
+ }
+
+ fassert(18515, _weLocked == 0);
+ _lockState->otherLock()->lock_shared();
+ _weLocked = _lockState->otherLock();
+ }
+
+ void Lock::DBWrite::lockOtherWrite(const StringData& db) {
fassert(16252, !db.empty());
// we do checks first, as on assert destructor won't be called so don't want to be half finished with our work.
@@ -470,8 +512,16 @@ namespace mongo {
_locked_W = true;
return;
}
- if( !nested )
- lockOther(db);
+
+ if (!nested) {
+ if (_isIntentWrite) {
+ lockOtherRead(db);
+ }
+ else {
+ lockOtherWrite(db);
+ }
+ }
+
lockTop();
if( nested )
lockNestable(nested);
@@ -496,9 +546,12 @@ namespace mongo {
lockNestable(nested);
}
- Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns)
- : ScopedLock(lockState, 'w' ), _what(ns.toString()), _nested(false) {
- lockDB( _what );
+ Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns, bool intentWrite)
+ : ScopedLock(lockState, 'w'),
+ _isIntentWrite(intentWrite),
+ _what(ns.toString()),
+ _nested(false) {
+ lockDB(_what);
}
Lock::DBRead::DBRead(LockState* lockState, const StringData& ns)
@@ -753,4 +806,10 @@ namespace mongo {
} lockStatsServerStatusSection;
+
+ // This startup parameter enables experimental document-level locking features, which work
+ // for update-in-place changes only (i.e., no index updates and no document growth or
+ // movement). It should be removed once full document-level locking is checked-in.
+ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(useExperimentalDocLocking, bool, false);
+
}
diff --git a/src/mongo/db/d_concurrency.h b/src/mongo/db/d_concurrency.h
index 4696626b704..a11351b5a84 100644
--- a/src/mongo/db/d_concurrency.h
+++ b/src/mongo/db/d_concurrency.h
@@ -170,7 +170,8 @@ namespace mongo {
void lockTop();
void lockNestable(Nestable db);
- void lockOther(const StringData& db);
+ void lockOtherWrite(const StringData& db);
+ void lockOtherRead(const StringData& db);
void lockDB(const std::string& ns);
void unlockDB();
@@ -179,12 +180,13 @@ namespace mongo {
void _relock();
public:
- DBWrite(LockState* lockState, const StringData& dbOrNs);
+ DBWrite(LockState* lockState, const StringData& dbOrNs, bool intentWrite = false);
virtual ~DBWrite();
private:
bool _locked_w;
bool _locked_W;
+ bool _isIntentWrite;
WrapperForRWLock *_weLocked;
const std::string _what;
bool _nested;
@@ -248,4 +250,7 @@ namespace mongo {
~writelocktry();
bool got() const { return _got; }
};
+
+ // Defined in instance.cpp
+ extern bool useExperimentalDocLocking;
}
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index b70210c4ef2..17111a87b73 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -605,7 +605,7 @@ namespace mongo {
UpdateExecutor executor(&request, &op.debug());
uassertStatusOK(executor.prepare());
- Lock::DBWrite lk(txn->lockState(), ns.ns());
+ Lock::DBWrite lk(txn->lockState(), ns.ns(), useExperimentalDocLocking);
// if this ever moves to outside of lock, need to adjust check
// Client::Context::_finishInit
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index f3d21129fb3..963109b2537 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -56,6 +56,7 @@
namespace mongo {
namespace mb = mutablebson;
+
namespace {
const char idFieldName[] = "_id";
@@ -624,7 +625,6 @@ namespace mongo {
runner->saveState();
if (inPlace && !driver->modsAffectIndices()) {
-
// If a set of modifiers were all no-ops, we are still 'in place', but there is
// no work to do, in which case we want to consider the object unchanged.
if (!damages.empty() ) {
@@ -636,8 +636,12 @@ namespace mongo {
newObj = oldObj;
}
else {
-
// The updates were not in place. Apply them through the file manager.
+
+ // XXX: With experimental document-level locking, we do not hold the sufficient
+ // locks, so this would cause corruption.
+ fassert(18516, !useExperimentalDocLocking);
+
newObj = doc.getObject();
uassert(17419,
str::stream() << "Resulting document after update is larger than "
diff --git a/src/mongo/db/startup_warnings.cpp b/src/mongo/db/startup_warnings.cpp
index d7e187cd5b0..15570c973c6 100644
--- a/src/mongo/db/startup_warnings.cpp
+++ b/src/mongo/db/startup_warnings.cpp
@@ -40,6 +40,8 @@
namespace mongo {
+ extern bool useExperimentalDocLocking;
+
//
// system warnings
//
@@ -72,6 +74,18 @@ namespace mongo {
warned = true;
}
+ if (useExperimentalDocLocking) {
+ log() << "** WARNING: Experimental (and untested) document-level locking for in-place"
+ << startupWarningsLog;
+ log() << " updates, which do not modify indexed values, has been enabled."
+ << startupWarningsLog;
+ log() << " This should absolutely not be used on production systems and is"
+ << startupWarningsLog;
+ log() << " for demonstration purposes only."
+ << startupWarningsLog;
+ warned = true;
+ }
+
#if defined(_WIN32) && !defined(_WIN64)
// Warn user that they are running a 32-bit app on 64-bit Windows
BOOL wow64Process;
diff --git a/src/mongo/db/structure/record_store_v1_base.cpp b/src/mongo/db/structure/record_store_v1_base.cpp
index 9a80cd62ceb..e2fc7ff8312 100644
--- a/src/mongo/db/structure/record_store_v1_base.cpp
+++ b/src/mongo/db/structure/record_store_v1_base.cpp
@@ -325,7 +325,7 @@ namespace mongo {
Status RecordStoreV1Base::updateWithDamages( OperationContext* txn,
const DiskLoc& loc,
- const char* damangeSource,
+ const char* damageSource,
const mutablebson::DamageVector& damages ) {
_paddingFits( txn );
@@ -336,7 +336,7 @@ namespace mongo {
mutablebson::DamageVector::const_iterator where = damages.begin();
const mutablebson::DamageVector::const_iterator end = damages.end();
for( ; where != end; ++where ) {
- const char* sourcePtr = damangeSource + where->sourceOffset;
+ const char* sourcePtr = damageSource + where->sourceOffset;
void* targetPtr = txn->recoveryUnit()->writingPtr(root + where->targetOffset, where->size);
std::memcpy(targetPtr, sourcePtr, where->size);
}