summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/SConscript2
-rw-r--r--src/mongo/db/catalog/collection.cpp3
-rw-r--r--src/mongo/db/catalog/collection_cursor_cache.cpp14
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp5
-rw-r--r--src/mongo/db/concurrency/SConscript15
-rw-r--r--src/mongo/db/concurrency/lock_mgr.cpp1044
-rw-r--r--src/mongo/db/concurrency/lock_mgr.h621
-rw-r--r--src/mongo/db/concurrency/lock_mgr_test.cpp896
-rw-r--r--src/mongo/db/d_concurrency.cpp71
-rw-r--r--src/mongo/db/d_concurrency.h9
-rw-r--r--src/mongo/db/instance.cpp2
-rw-r--r--src/mongo/db/ops/update.cpp8
-rw-r--r--src/mongo/db/startup_warnings.cpp14
-rw-r--r--src/mongo/db/structure/record_store_v1_base.cpp4
14 files changed, 2687 insertions, 21 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 7ed2211bf51..84e32ad556b 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -20,6 +20,7 @@ env.SConscript(['base/SConscript',
'db/auth/SConscript',
'db/catalog/SConscript',
'db/commands/SConscript',
+ 'db/concurrency/SConscript',
'db/exec/SConscript',
'db/fts/SConscript',
'db/index/SConscript',
@@ -902,6 +903,7 @@ env.Library("serveronly", serverOnlyFiles,
"db/auth/authmongod",
"db/fts/ftsmongod",
"db/common",
+ "db/concurrency/lock_mgr",
"db/ops/update_driver",
"defaultversion",
"geoparser",
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 1c137515d65..b1f705f1411 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database.h"
+#include "mongo/db/concurrency/lock_mgr.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/operation_context.h"
@@ -406,6 +407,8 @@ namespace mongo {
// Broadcast the mutation so that query results stay correct.
_cursorCache.invalidateDocument(loc, INVALIDATION_MUTATION);
+
+ ExclusiveResourceLock lk((size_t)txn->getCurOp()->opNum(), *(size_t*)&loc);
return _recordStore->updateWithDamages( txn, loc, damangeSource, damages );
}
diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp
index e9962c9441f..2e941dd1f40 100644
--- a/src/mongo/db/catalog/collection_cursor_cache.cpp
+++ b/src/mongo/db/catalog/collection_cursor_cache.cpp
@@ -388,14 +388,18 @@ namespace mongo {
}
void CollectionCursorCache::registerRunner( Runner* runner ) {
- SimpleMutex::scoped_lock lk( _mutex );
- const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner);
- invariant(result.second); // make sure this was inserted
+ if (!useExperimentalDocLocking) {
+ SimpleMutex::scoped_lock lk(_mutex);
+ const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner);
+ invariant(result.second); // make sure this was inserted
+ }
}
void CollectionCursorCache::deregisterRunner( Runner* runner ) {
- SimpleMutex::scoped_lock lk( _mutex );
- _nonCachedRunners.erase( runner );
+ if (!useExperimentalDocLocking) {
+ SimpleMutex::scoped_lock lk(_mutex);
+ _nonCachedRunners.erase(runner);
+ }
}
ClientCursor* CollectionCursorCache::find( CursorId id, bool pin ) {
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 1711348a35b..52acb7aae6f 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -1092,14 +1092,13 @@ namespace mongo {
}
///////////////////////////////////////////
- Lock::DBWrite writeLock(txn->lockState(), nsString.ns());
+ Lock::DBWrite writeLock(txn->lockState(), nsString.ns(), useExperimentalDocLocking);
///////////////////////////////////////////
if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result))
return;
- Client::Context ctx( nsString.ns(),
- false /* don't check version */ );
+ Client::Context ctx(nsString.ns(), false /* don't check version */);
try {
UpdateResult res = executor.execute(txn, ctx.db());
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript
new file mode 100644
index 00000000000..53be5435c91
--- /dev/null
+++ b/src/mongo/db/concurrency/SConscript
@@ -0,0 +1,15 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env.Library(
+ target='lock_mgr',
+ source=[
+ 'lock_mgr.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/third_party/shim_boost',
+ '$BUILD_DIR/mongo/foundation'
+ ],
+)
diff --git a/src/mongo/db/concurrency/lock_mgr.cpp b/src/mongo/db/concurrency/lock_mgr.cpp
new file mode 100644
index 00000000000..d0cc06ead8d
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr.cpp
@@ -0,0 +1,1044 @@
+/**
+* Copyright (C) MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/concurrency/lock_mgr.h"
+
+#include <boost/thread/locks.hpp>
+#include <sstream>
+
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/timer.h"
+
+using boost::unique_lock;
+
+using std::endl;
+using std::exception;
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace mongo {
+
+ /*---------- Utility functions ----------*/
+
+ namespace {
+
+ /*
+ * Given a 'hierarchical' resource (e.g. document in collection in database)
+ * return true if the resource is reqeuested for exclusive access at a given level.
+ * In the example above, level 0 for document, level 1 for collection, level 2 for DB
+ * and level 3 for the system as a whole.
+ *
+ * mode is a bit vector, level is the index.
+ */
+ bool isExclusive(unsigned mode, const unsigned level=0) {
+ return 0 != (mode & (0x1 << level));
+ }
+
+ bool isShared(unsigned mode, const unsigned level=0) {
+ return 0 == (mode & (0x1 << level));
+ }
+
+ bool isCompatible(unsigned mode1, unsigned mode2) {
+ return mode1==mode2 && (isShared(mode1) || isShared(mode1));
+ }
+
+ bool hasConflict(const LockManager::ResourceStatus& status) {
+ return LockManager::kResourceConflict == status ||
+ LockManager::kResourceUpgradeConflict == status ||
+ LockManager::kResourcePolicyConflict == status;
+ }
+
+ // unique id for each LockRequest. zero is reserved
+ static LockManager::LockId nextLid = 1;
+ } // namespace
+
+ /*---------- AbortException functions ----------*/
+
+ const char* LockManager::AbortException::what() const throw() { return "AbortException"; }
+
+ /*---------- LockStats functions ----------*/
+ string LockManager::LockStats::toString() const {
+ stringstream result;
+ result << "----- LockManager Stats -----" << endl
+ << "\ttotal requests: " << getNumRequests() << endl
+ << "\t# pre-existing: " << getNumPreexistingRequests() << endl
+ << "\t# same: " << getNumSameRequests() << endl
+ << "\t# times blocked: " << getNumBlocks() << endl
+ << "\t# ms blocked: " << getNumMillisBlocked() << endl
+ << "\t# deadlocks: " << getNumDeadlocks() << endl
+ << "\t# downgrades: " << getNumDowngrades() << endl
+ << "\t# upgrades: " << getNumUpgrades() << endl
+ ;
+ return result.str();
+ }
+
+ /*---------- LockRequest functions ----------*/
+
+
+ LockManager::LockRequest::LockRequest(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId)
+ : lid(nextLid++)
+ , xid(xid)
+ , mode(mode)
+ , resId(resId)
+ , count(1)
+ , sleepCount(0) { }
+
+
+ LockManager::LockRequest::~LockRequest() { }
+
+ bool LockManager::LockRequest::matches(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId) const {
+ return
+ this->xid == xid &&
+ this->mode == mode &&
+ this->resId == resId;
+ }
+
+ string LockManager::LockRequest::toString() const {
+ stringstream result;
+ result << "<lid:" << lid
+ << ",xid:" << xid
+ << ",mode:" << mode
+ << ",resId:" << resId
+ << ",count:" << count
+ << ",sleepCount:" << sleepCount
+ << ">";
+ return result.str();
+ }
+
+ bool LockManager::LockRequest::isBlocked() const {
+ return sleepCount > 0;
+ }
+
+ bool LockManager::LockRequest::shouldAwake() {
+ return 0 == --sleepCount;
+ }
+
+ /*---------- LockManager public functions (mutex guarded) ---------*/
+
+ LockManager* LockManager::_singleton = NULL;
+ boost::mutex LockManager::_getSingletonMutex;
+ LockManager& LockManager::getSingleton() {
+ unique_lock<boost::mutex> lk(_getSingletonMutex);
+ if (NULL == _singleton) {
+ _singleton = new LockManager();
+ }
+ return *_singleton;
+ }
+
+ LockManager::LockManager(const Policy& policy)
+ : _policy(policy),
+ _mutex(),
+ _shuttingDown(false),
+ _millisToQuiesce(-1) { }
+
+ LockManager::~LockManager() {
+ unique_lock<boost::mutex> lk(_mutex);
+ for (map<LockId, LockRequest*>::iterator locks = _locks.begin();
+ locks != _locks.end(); ++locks) {
+ delete locks->second;
+ }
+ }
+
+ void LockManager::shutdown(const unsigned& millisToQuiesce) {
+ unique_lock<boost::mutex> lk(_mutex);
+
+#ifdef DONT_ALLOW_CHANGE_TO_QUIESCE_PERIOD
+ // XXX not sure whether we want to allow multiple shutdowns
+ // in order to change quiesce period?
+ if (_shuttingDown) {
+ return; // already in shutdown, don't extend quiescence(?)
+ }
+#endif
+
+ _shuttingDown = true;
+ _millisToQuiesce = millisToQuiesce;
+ _timer.millisReset();
+ }
+
+ LockManager::Policy LockManager::getPolicy() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _policy;
+ }
+
+ TxId LockManager::getPolicySetter() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _policySetter;
+ }
+
+ void LockManager::setTransactionPriority(const TxId& xid, int priority) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(xid);
+ _txPriorities[xid] = priority;
+ }
+
+ int LockManager::getTransactionPriority(const TxId& xid) const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(xid);
+ return _getTransactionPriorityInternal(xid);
+ }
+
+ LockManager::LockId LockManager::acquire(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* notifier) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(requestor);
+
+ // don't accept requests from aborted transactions
+ if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) {
+ throw AbortException();
+ }
+
+ _stats.incRequests();
+ _stats.incStatsForMode(mode);
+
+ return _acquireInternal(requestor, mode, resId, notifier, lk);
+ }
+
+ int LockManager::acquireOne(const TxId& requestor,
+ const uint32_t& mode,
+ const vector<ResourceId>& resources,
+ Notifier* notifier) {
+
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(requestor);
+
+ if (resources.empty()) { return -1; }
+
+ // don't accept requests from aborted transactions
+ if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) {
+ throw AbortException();
+ }
+
+ _stats.incRequests();
+
+ // acquire the first available recordId
+ for (unsigned ix=0; ix < resources.size(); ix++) {
+ if (_isAvailable(requestor, mode, resources[ix])) {
+ _acquireInternal(requestor, mode, resources[ix], notifier, lk);
+ _stats.incStatsForMode(mode);
+ return ix;
+ }
+ }
+
+ // sigh. none of the records are currently available. wait on the first.
+ _stats.incStatsForMode(mode);
+ _acquireInternal(requestor, mode, resources[0], notifier, lk);
+ return 0;
+ }
+
+ LockManager::LockStatus LockManager::releaseLock(const LockId& lid) {
+ unique_lock<boost::mutex> lk(_mutex);
+
+ LockMap::iterator it = _locks.find(lid);
+ if (it != _locks.end()) {
+ LockRequest* theLock = it->second;
+ _throwIfShuttingDown(theLock->xid);
+ _stats.decStatsForMode(theLock->mode);
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ }
+ return _releaseInternal(lid);
+ }
+
+ LockManager::LockStatus LockManager::release(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ LockId lid;
+ LockStatus status = _findLock(holder, mode, resId, &lid);
+ if (kLockFound != status) {
+ return status; // error, resource wasn't acquired in this mode by holder
+ }
+ _stats.decStatsForMode(_locks[lid]->mode);
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ return _releaseInternal(lid);
+ }
+
+ /*
+ * release all resource acquired by a transaction, returning the count
+ */
+ size_t LockManager::release(const TxId& holder) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ TxLockMap::iterator lockIdsHeld = _xaLocks.find(holder);
+ if (lockIdsHeld == _xaLocks.end()) { return 0; }
+ size_t numLocksReleased = 0;
+ for (set<LockId>::iterator nextLockId = lockIdsHeld->second.begin();
+ nextLockId != lockIdsHeld->second.end(); ++nextLockId) {
+ _releaseInternal(*nextLockId);
+
+ _stats.decStatsForMode(_locks[*nextLockId]->mode);
+
+ if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) ||
+ (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) {
+ _policyLock.notify_one();
+ }
+ numLocksReleased++;
+ }
+ return numLocksReleased;
+ }
+
+ void LockManager::abort(const TxId& goner) {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(goner);
+ _abortInternal(goner);
+ }
+
+ LockManager::LockStats LockManager::getStats() const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown();
+ return _stats;
+ }
+
+ string LockManager::toString() const {
+// unique_lock<boost::mutex> lk(_mutex);
+#ifdef DONT_CARE_ABOUT_DEBUG_EVEN_WHEN_SHUTTING_DOWN
+ // seems like we might want to allow toString for debug during shutdown?
+ _throwIfShuttingDown();
+#endif
+ stringstream result;
+ result << "Policy: ";
+ switch(_policy) {
+ case kPolicyFirstCome:
+ result << "FirstCome";
+ break;
+ case kPolicyReadersFirst:
+ result << "ReadersFirst";
+ break;
+ case kPolicyOldestTxFirst:
+ result << "OldestFirst";
+ break;
+ case kPolicyReadersOnly:
+ result << "ReadersOnly";
+ break;
+ case kPolicyWritersOnly:
+ result << "WritersOnly";
+ break;
+ }
+ result << endl;
+
+ if (_shuttingDown)
+ result << " shutting down in " << _millisToQuiesce - _timer.millis();
+
+ result << "\t_locks:" << endl;
+ for (map<LockId,LockRequest*>::const_iterator locks = _locks.begin();
+ locks != _locks.end(); ++locks) {
+ result << "\t\t" << locks->first << locks->second->toString() << endl;
+ }
+
+ result << "\t_resourceLocks:" << endl;
+ bool firstResource=true;
+ result << "resources=" << ": {";
+ for (map<ResourceId,list<LockId> >::const_iterator nextResource = _resourceLocks.begin();
+ nextResource != _resourceLocks.end(); ++nextResource) {
+ if (firstResource) firstResource=false;
+ else result << ", ";
+ result << nextResource->first << ": {";
+ bool firstLock=true;
+ for (list<LockId>::const_iterator nextLockId = nextResource->second.begin();
+ nextLockId != nextResource->second.end(); ++nextLockId) {
+ if (firstLock) firstLock=false;
+ else result << ", ";
+ result << *nextLockId;
+ }
+ result << "}";
+ }
+ result << "}" << endl;
+
+ result << "\t_waiters:" << endl;
+ for (map<TxId, set<TxId> >::const_iterator txWaiters = _waiters.begin();
+ txWaiters != _waiters.end(); ++txWaiters) {
+ bool firstTime=true;
+ result << "\t\t" << txWaiters->first << ": {";
+ for (set<TxId>::const_iterator nextWaiter = txWaiters->second.begin();
+ nextWaiter != txWaiters->second.end(); ++nextWaiter) {
+ if (firstTime) firstTime=false;
+ else result << ", ";
+ result << *nextWaiter;
+ }
+ result << "}" << endl;
+ }
+
+ bool firstGoner = true;
+ result << "\t_aborted: {" << endl;
+ for (set<TxId>::iterator goners = _abortedTxIds.begin();
+ goners != _abortedTxIds.end(); ++goners) {
+ if (firstGoner) firstGoner = false;
+ else result << ",";
+ result << "t" << *goners;
+ }
+ result << "}";
+
+ return result.str();
+ }
+
+ bool LockManager::isLocked(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId) const {
+ unique_lock<boost::mutex> lk(_mutex);
+ _throwIfShuttingDown(holder);
+
+ LockId unused;
+ return kLockFound == _findLock(holder, mode, resId, &unused);
+ }
+
+ /*---------- LockManager private functions (alphabetical) ----------*/
+
+ /*
+ * release resources acquired by a transaction about to abort, notifying
+ * any waiters that they can retry their resource acquisition. cleanup
+ * and throw an AbortException.
+ */
+ void LockManager::_abortInternal(const TxId& goner) {
+ TxLockMap::iterator locks = _xaLocks.find(goner);
+
+ if (locks == _xaLocks.end()) {
+ // unusual, but possible to abort a transaction with no locks
+ throw AbortException();
+ }
+
+ // make a copy of the TxId's locks, because releasing
+ // would otherwise affect the iterator. XXX find a better way?
+ //
+ set<LockId> copyOfLocks = locks->second;
+
+ // release all resources acquired by this transaction
+ // notifying any waiters that they can continue
+ //
+ for (set<LockId>::iterator nextLockId = copyOfLocks.begin();
+ nextLockId != copyOfLocks.end(); ++nextLockId) {
+ _releaseInternal(*nextLockId);
+ }
+
+ // add to set of aborted transactions
+ _abortedTxIds.insert(goner);
+
+ throw AbortException();
+ }
+
+ LockManager::LockId LockManager::_acquireInternal(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* sleepNotifier,
+ unique_lock<boost::mutex>& guard) {
+#if 0
+ static unsigned long funCount = 0;
+ if (0 == ++funCount % 10000) {
+ log() << _stats.toString();
+ }
+#endif
+ list<LockId>& queue = _resourceLocks[resId];
+ if (!queue.empty()) { _stats.incPreexisting(); }
+ list<LockId>::iterator conflictPosition = queue.begin();
+ ResourceStatus resourceStatus = _conflictExists(requestor, mode, resId,
+ queue, conflictPosition);
+ if (kResourceAcquired == resourceStatus) {
+ _stats.incSame();
+ ++_locks[*conflictPosition]->count;
+ return *conflictPosition;
+ }
+
+ // create the lock request and add to TxId's set of lock requests
+
+ // XXX should probably use placement operator new and manage LockRequest memory
+ LockRequest* lr = new LockRequest(requestor, mode, resId);
+ _locks[lr->lid] = lr;
+
+ // add lock request to set of requests of requesting TxId
+ _xaLocks[requestor].insert(lr->lid);
+
+ if (kResourceAvailable == resourceStatus) {
+ queue.insert(conflictPosition, lr->lid);
+ _addWaiters(lr, conflictPosition, queue.end());
+ return lr->lid;
+ }
+
+ // some type of conflict, insert after confictPosition
+
+ verify(conflictPosition != queue.end());
+ ++conflictPosition;
+
+ if (kResourceUpgradeConflict == resourceStatus) {
+ queue.insert(conflictPosition, lr->lid);
+ }
+ else {
+ _addLockToQueueUsingPolicy(lr, queue, conflictPosition);
+ }
+#if 0
+ if (isExclusive(mode)) {
+ for (list<LockId>::iterator followers = conflictPosition;
+ followers != queue.end(); ++followers) {
+ LockRequest* nextLockRequest = _locks[*followers];
+ if (nextLockRequest->xid == requestor) continue;
+ verify(nextLockRequest->isBlocked());
+ }
+ }
+#endif
+ // set remaining incompatible requests as lr's waiters
+ _addWaiters(lr, conflictPosition, queue.end());
+
+
+ // call the sleep notification function once
+ if (NULL != sleepNotifier) {
+ // XXX should arg be xid of blocker?
+ (*sleepNotifier)(lr->xid);
+ }
+
+ _stats.incBlocks();
+
+ // this loop typically executes once
+ do {
+ // set up for future deadlock detection add requestor to blockers' waiters
+ //
+ for (list<LockId>::iterator nextBlocker = queue.begin();
+ nextBlocker != conflictPosition; ++nextBlocker) {
+ LockRequest* nextBlockingRequest = _locks[*nextBlocker];
+ if (nextBlockingRequest->lid == lr->lid) {break;}
+ if (nextBlockingRequest->xid == requestor) {continue;}
+ if (isCompatible(_locks[*nextBlocker]->mode, lr->mode)) {continue;}
+ _addWaiter(_locks[*nextBlocker]->xid, requestor);
+ ++lr->sleepCount;
+ }
+ if (kResourcePolicyConflict == resourceStatus) {
+ // to facilitate waking once the policy reverts, add requestor to system's waiters
+ _addWaiter(kReservedTxId, requestor);
+ ++lr->sleepCount;
+ }
+
+ // wait for blocker to release
+ while (lr->isBlocked()) {
+ Timer timer;
+ lr->lock.wait(guard);
+ _stats.incTimeBlocked(timer.millis());
+ }
+
+ conflictPosition = queue.begin();
+ resourceStatus = _conflictExists(lr->xid, lr->mode, lr->resId, queue, conflictPosition);
+ } while (hasConflict(resourceStatus));
+
+ return lr->lid;
+ }
+
+ /*
+ * called only when there are conflicting LockRequests
+ * positions a lock request (lr) in a queue at or after position
+ * also adds remaining requests in queue as lr's waiters
+ * for subsequent deadlock detection
+ */
+ void LockManager::_addLockToQueueUsingPolicy(LockRequest* lr,
+ list<LockId>& queue,
+ list<LockId>::iterator& position) {
+
+ if (position == queue.end()) {
+ queue.insert(position, lr->lid);
+ return;
+ }
+
+ // use lock request's transaction's priority if specified
+ int txPriority = _getTransactionPriorityInternal(lr->xid);
+ if (txPriority > 0) {
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (txPriority > _getTransactionPriorityInternal(nextRequest->xid)) {
+ // add in front of request with lower priority that is either
+ // compatible, or blocked
+ //
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ queue.push_back(lr->lid);
+ return;
+ }
+ else if (txPriority < 0) {
+ // for now, just push to end
+ // TODO: honor position of low priority requests
+ queue.push_back(lr->lid);
+ }
+
+ // use LockManager's default policy
+ switch (_policy) {
+ case kPolicyFirstCome:
+ queue.push_back(lr->lid);
+ position = queue.end();
+ return;
+ case kPolicyReadersFirst:
+ if (isExclusive(lr->mode)) {
+ queue.push_back(lr->lid);
+ position = queue.end();
+ return;
+ }
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (isExclusive(nextRequest->mode) && nextRequest->isBlocked()) {
+ // insert shared lock before first sleeping exclusive lock
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ break;
+ case kPolicyOldestTxFirst:
+ for (; position != queue.end(); ++position) {
+ LockRequest* nextRequest = _locks[*position];
+ if (lr->xid < nextRequest->xid &&
+ (isCompatible(lr->mode, nextRequest->mode) || nextRequest->isBlocked())) {
+ // smaller xid is older, so queue it before
+ queue.insert(position, lr->lid);
+ return;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ queue.push_back(lr->lid);
+ position = queue.end();
+ }
+
+ void LockManager::_addWaiter(const TxId& blocker, const TxId& requestor) {
+ if (blocker == requestor) {
+ // can't wait on self
+ return;
+ }
+
+ // add requestor to blocker's waiters
+ _waiters[blocker].insert(requestor);
+
+ // add all of requestor's waiters to blocker's waiters
+ _waiters[blocker].insert(_waiters[requestor].begin(), _waiters[requestor].end());
+ }
+
+ void LockManager::_addWaiters(LockRequest* blocker,
+ list<LockId>::iterator nextLockId,
+ list<LockId>::iterator lastLockId) {
+ for (; nextLockId != lastLockId; ++nextLockId) {
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+ if (! isCompatible(blocker->mode, nextLockRequest->mode)) {
+ if (nextLockRequest->sleepCount > 0) {
+ _addWaiter(blocker->xid, nextLockRequest->xid);
+ ++nextLockRequest->sleepCount;
+ }
+ }
+ }
+ }
+
+ bool LockManager::_comesBeforeUsingPolicy(const TxId& requestor,
+ const unsigned& mode,
+ const LockRequest* oldRequest) const {
+
+ // handle special policies
+ if (kPolicyReadersOnly == _policy && kShared == mode && oldRequest->isBlocked())
+ return true;
+ if (kPolicyWritersOnly == _policy && kExclusive == mode && oldRequest->isBlocked())
+ return true;
+
+ if (_getTransactionPriorityInternal(requestor) >
+ _getTransactionPriorityInternal(oldRequest->xid)) {
+ return true;
+ }
+
+ switch (_policy) {
+ case kPolicyFirstCome:
+ return false;
+ case kPolicyReadersFirst:
+ return isShared(mode);
+ case kPolicyOldestTxFirst:
+ return requestor < oldRequest->xid;
+ default:
+ return false;
+ }
+ }
+
+ LockManager::ResourceStatus LockManager::_conflictExists(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ list<LockId>& queue,
+ list<LockId>::iterator& nextLockId) {
+
+ // handle READERS/kPolicyWritersOnly policy conflicts
+ if ((kPolicyReadersOnly == _policy && isExclusive(mode)) ||
+ (kPolicyWritersOnly == _policy && isShared(mode))) {
+
+ if (nextLockId == queue.end()) { return kResourcePolicyConflict; }
+
+ // position past the last active lock request on the queue
+ list<LockId>::iterator lastActivePosition = queue.end();
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+ if (requestor == nextLockRequest->xid && mode == nextLockRequest->mode) {
+ return kResourceAcquired; // already have the lock
+ }
+ if (! nextLockRequest->isBlocked()) {
+ lastActivePosition = nextLockId;
+ }
+ }
+ if (lastActivePosition != queue.end()) {
+ nextLockId = lastActivePosition;
+ }
+ return kResourcePolicyConflict;
+ }
+
+ // loop over the lock requests in the queue, looking for the 1st conflict
+ // normally, we'll leave the nextLockId iterator positioned at the 1st conflict
+ // if there is one, or the position (often the end) where we know there is no conflict.
+ //
+ // upgrades complicate this picture, because we want to position the iterator
+ // after all initial share locks. but we may not know whether an exclusived request
+ // is an upgrade until we look at all the initial share locks.
+ //
+ // so we record the position of the 1st conflict, but continue advancing the
+ // nextLockId iterator until we've seen all initial share locks. If none have
+ // the same TxId as the exclusive request, we restore the position to 1st conflict
+ //
+ list<LockId>::iterator firstConflict = queue.end(); // invalid
+ set<TxId> sharedOwners; // all initial share lock owners
+ bool alreadyHadLock = false; // true if we see a lock with the same Txid
+
+ for (; nextLockId != queue.end(); ++nextLockId) {
+
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+
+ if (nextLockRequest->matches(requestor, mode, resId)) {
+ // if we're already on the queue, there's no conflict
+ return kResourceAcquired;
+ }
+
+ if (requestor == nextLockRequest->xid) {
+ // an upgrade or downgrade request, can't conflict with ourselves
+ if (isShared(mode)) {
+ // downgrade
+ _stats.incDowngrades();
+ ++nextLockId;
+ return kResourceAvailable;
+ }
+
+ // upgrade
+ alreadyHadLock = true;
+ _stats.incUpgrades();
+ // position after initial readers
+ continue;
+ }
+
+ if (isShared(nextLockRequest->mode)) {
+ invariant(!nextLockRequest->isBlocked() || kPolicyWritersOnly == _policy);
+
+ sharedOwners.insert(nextLockRequest->xid);
+
+ if (isExclusive(mode) && firstConflict == queue.end()) {
+ // if "lr" proves not to be an upgrade, restore this position later
+ firstConflict = nextLockId;
+ }
+ // either there's no conflict yet, or we're not done checking for an upgrade
+ continue;
+ }
+
+ // the next lock on the queue is an exclusive request
+ invariant(isExclusive(nextLockRequest->mode));
+
+ if (alreadyHadLock) {
+ // bumped into something incompatible while up/down grading
+ if (isExclusive(mode)) {
+ // upgrading: bumped into another exclusive lock
+ if (sharedOwners.find(nextLockRequest->xid) != sharedOwners.end()) {
+ // the exclusive lock is also an upgrade, and it must
+ // be blocked, waiting for our original share lock to be released
+ // if we wait for its shared lock, we would deadlock
+ //
+ invariant(nextLockRequest->isBlocked());
+ _abortInternal(requestor);
+ }
+
+ if (sharedOwners.empty()) {
+ // simple upgrade, queue in front of nextLockRequest, no conflict
+ return kResourceAvailable;
+ }
+ else {
+ // we have to wait for another shared lock before upgrading
+ return kResourceUpgradeConflict;
+ }
+ }
+
+ // downgrading, bumped into an exclusive lock, blocked on our original
+ invariant (isShared(mode));
+ invariant(nextLockRequest->isBlocked());
+ // lr will be inserted before nextLockRequest
+ return kResourceAvailable;
+ }
+ else if (firstConflict != queue.end()) {
+ // restore first conflict position
+ nextLockId = firstConflict;
+ nextLockRequest = _locks[*nextLockId];
+ }
+
+ // no conflict if nextLock is blocked and we come before
+ if (nextLockRequest->isBlocked() &&
+ _comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) {
+ return kResourceAvailable;
+ }
+
+ return kResourceConflict;
+ }
+
+ // positioned to the end of the queue
+ if (alreadyHadLock && isExclusive(mode) && !sharedOwners.empty()) {
+ // upgrading, queue consists of requestor's earlier share lock
+ // plus other share lock. Must wait for the others to release
+ return kResourceUpgradeConflict;
+ }
+ else if (firstConflict != queue.end()) {
+ nextLockId = firstConflict;
+ LockRequest* nextLockRequest = _locks[*nextLockId];
+
+ if (_comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) {
+ return kResourceAvailable;
+ }
+
+ return kResourceConflict;
+ }
+ return kResourceAvailable;
+ }
+
+ LockManager::LockStatus LockManager::_findLock(const TxId& holder,
+ const unsigned& mode,
+ const ResourceId& resId,
+ LockId* outLockId) const {
+
+ *outLockId = kReservedLockId; // set invalid;
+
+ // get iterator for resId's locks
+ ResourceLocks::const_iterator resourceLocks = _resourceLocks.find(resId);
+ if (resourceLocks == _resourceLocks.end()) { return kLockResourceNotFound; }
+
+ // look for an existing lock request from holder in mode
+ for (list<LockId>::const_iterator nextLockId = resourceLocks->second.begin();
+ nextLockId != resourceLocks->second.end(); ++nextLockId) {
+ LockRequest* nextLockRequest = _locks.at(*nextLockId);
+ if (nextLockRequest->xid == holder && nextLockRequest->mode == mode) {
+ *outLockId = nextLockRequest->lid;
+ return kLockFound;
+ }
+ }
+ return kLockModeNotFound;
+ }
+
+ int LockManager::_getTransactionPriorityInternal(const TxId& xid) const {
+ map<TxId, int>::const_iterator txPriority = _txPriorities.find(xid);
+ if (txPriority == _txPriorities.end()) {
+ return 0;
+ }
+ return txPriority->second;
+ }
+
+ /*
+ * Used by acquireOne
+ * XXX: there's overlap between this, _conflictExists and _findLock
+ */
+ bool LockManager::_isAvailable(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId) const {
+
+ // check for exceptional policies
+ if (kPolicyReadersOnly == _policy && isExclusive(mode))
+ return false;
+ else if (kPolicyWritersOnly == _policy && isShared(mode))
+ return false;
+
+ ResourceLocks::const_iterator resLocks = _resourceLocks.find(resId);
+ if (resLocks == _resourceLocks.end()) {
+ return true; // no lock requests against this ResourceId, so must be available
+ }
+
+ // walk over the queue of previous requests for this ResourceId
+ const list<LockId>& queue = resLocks->second;
+ for (list<LockId>::const_iterator nextLockId = queue.begin();
+ nextLockId != queue.end(); ++nextLockId) {
+
+ LockRequest* nextLockRequest = _locks.at(*nextLockId);
+
+ if (nextLockRequest->matches(requestor, mode, resId)) {
+ // we're already have this lock, if we're asking, we can't be asleep
+ invariant(! nextLockRequest->isBlocked());
+ return true;
+ }
+
+ // no conflict if we're compatible
+ if (isCompatible(mode, nextLockRequest->mode)) continue;
+
+ // no conflict if nextLock is blocked and we come before
+ if (nextLockRequest->isBlocked() && _comesBeforeUsingPolicy(requestor, mode, nextLockRequest))
+ return true;
+
+ return false; // we're incompatible and would block
+ }
+
+ // everything on the queue (if anything is on the queue) is compatible
+ return true;
+ }
+
+ LockManager::LockStatus LockManager::_releaseInternal(const LockId& lid) {
+
+ if (kReservedLockId == lid) { return kLockIdNotFound; }
+
+ LockRequest* lr = _locks[lid];
+ if (NULL == lr) { return kLockIdNotFound; }
+
+ const TxId holder = lr->xid;
+ const unsigned mode = lr->mode;
+ const ResourceId resId = lr->resId;
+
+ ResourceLocks::iterator recordLocks = _resourceLocks.find(resId);
+ if (recordLocks == _resourceLocks.end()) {
+ return kLockResourceNotFound;
+ }
+
+ bool foundLock = false;
+ bool foundResource = false;
+
+ list<LockId>& queue = recordLocks->second;
+ list<LockId>::iterator nextLockId = queue.begin();
+
+ // find the position of the lock to release in the queue
+ for(; !foundLock && nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLock = _locks[*nextLockId];
+ if (lid != *nextLockId) {
+ if (nextLock->xid == holder) {
+ foundResource = true;
+ }
+ }
+ else {
+ // this is our lock.
+ if (0 < --nextLock->count) { return kLockCountDecremented; }
+
+ // release the lock
+ _xaLocks[holder].erase(*nextLockId);
+ _locks.erase(*nextLockId);
+ queue.erase(nextLockId++);
+ delete nextLock;
+
+ foundLock = true;
+ break; // don't increment nextLockId again
+ }
+ }
+
+ if (! foundLock) {
+ // can't release a lock that hasn't been acquired in the specified mode
+ return foundResource ? kLockModeNotFound : kLockResourceNotFound;
+ }
+
+ if (isShared(mode)) {
+ // skip over any remaining shared requests. they can't be waiting for us.
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextLock = _locks[*nextLockId];
+ if (isExclusive(nextLock->mode)) {
+ break;
+ }
+ }
+ }
+
+ // everything left on the queue potentially conflicts with the lock just
+ // released, unless it's an up/down-grade of that lock. So iterate, and
+ // when TxIds differ, decrement sleepCount, wake those with zero counts, and
+ // decrement their sleep counts, waking sleepers with zero counts, and
+ // cleanup state used for deadlock detection
+
+ for (; nextLockId != queue.end(); ++nextLockId) {
+ LockRequest* nextSleeper = _locks[*nextLockId];
+ if (nextSleeper->xid == holder) continue;
+
+ invariant(nextSleeper->isBlocked());
+
+ // wake up sleepy heads
+ if (nextSleeper->shouldAwake()) {
+ nextSleeper->lock.notify_one();
+ }
+ }
+#if 0
+ // verify stuff
+ if (_xaLocks[holder].empty()) {
+ for (WaitersMap::iterator dependencies = _waiters.begin();
+ dependencies != _waiters.end(); ++dependencies) {
+ verify( dependencies->second.find(holder) == dependencies->second.end());
+ }
+ }
+
+ if (!queue.empty()) {
+ verify(! _locks[queue.front()]->isBlocked());
+ }
+#endif
+ return kLockReleased;
+ }
+
+ void LockManager::_throwIfShuttingDown(const TxId& xid) const {
+ if (_shuttingDown && (_timer.millis() >= _millisToQuiesce ||
+ _xaLocks.find(xid) == _xaLocks.end())) {
+
+ throw AbortException(); // XXX should this be something else? ShutdownException?
+ }
+ }
+
+ /*---------- ResourceLock functions ----------*/
+
+ ResourceLock::ResourceLock(LockManager& lm,
+ const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ LockManager::Notifier* notifier)
+ : _lm(lm),
+ _lid(LockManager::kReservedLockId) // if acquire throws, we want this initialized
+ {
+ _lid = lm.acquire(requestor, mode, resId, notifier);
+ }
+
+ ResourceLock::~ResourceLock() {
+ _lm.releaseLock(_lid);
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/concurrency/lock_mgr.h b/src/mongo/db/concurrency/lock_mgr.h
new file mode 100644
index 00000000000..451e88daabe
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr.h
@@ -0,0 +1,621 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <iterator>
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "mongo/platform/compiler.h"
+#include "mongo/platform/cstdint.h"
+#include "mongo/util/timer.h"
+
+/*
+ * LockManager controls access to resources through two functions: acquire and release
+ *
+ * Resources are either RecordStores, or Records within an RecordStore, identified by a ResourceId.
+ * Resources are acquired for either shared or exclusive use, by transactions identified by a TxId.
+ * Acquiring Records in any mode implies acquisition of the Record's RecordStore
+ *
+ * Contention for a resource is resolved by a LockingPolicy, which determines which blocked
+ * resource requests to awaken when the blocker releases the resource.
+ *
+ */
+
+namespace mongo {
+#if 1
+ class TxId {
+ public:
+ TxId() : _xid(0) { }
+ TxId(size_t xid) : _xid(xid) { }
+ bool operator<(const TxId& other) const { return _xid < other._xid; }
+ bool operator==(const TxId& other) const { return _xid == other._xid; }
+ operator size_t() const { return _xid; }
+
+ private:
+ size_t _xid;
+ };
+
+ class ResourceId {
+ public:
+ ResourceId() : _rid(0) { }
+ ResourceId(size_t rid) : _rid(rid) { }
+ bool operator<(const ResourceId& other) const { return _rid < other._rid; }
+ bool operator==(const ResourceId& other) const { return _rid == other._rid; }
+ operator size_t() const { return _rid; }
+
+ private:
+ size_t _rid;
+ };
+#else
+ typedef size_t TxId; // identifies requesting transaction. 0 is reserved
+ typedef size_t ResourceId; // identifies requested resource. 0 is reserved
+#endif
+ static const TxId kReservedTxId = 0;
+ static const ResourceId kReservedResourceId = 0;
+
+ /**
+ * LockManager is used to control access to resources. Usually a singleton. For deadlock detection
+ * all resources used by a set of transactions that could deadlock, should use one LockManager.
+ *
+ * Primary functions are:
+ * acquire - acquire a resource for shared or exclusive use; may throw Abort.
+ * acquireOne - acquire one of a vector of resources, hopefully without blocking
+ * release - release a resource previously acquired for shared/exclusive use
+ */
+ class LockManager {
+ public:
+
+ /**
+ * thrown primarily when deadlocks are detected, or when LockManager::abort is called.
+ * also thrown when LockManager requests are made during shutdown.
+ */
+ class AbortException : public std::exception {
+ public:
+ const char* what() const throw ();
+ };
+
+ /**
+ * LockModes: shared and exclusive
+ */
+ enum LockModes {
+ kShared = 0x0,
+ kExclusive = 0x1
+ };
+
+ /**
+ * Used to decide which blocked requests to honor first when resource becomes available
+ */
+ enum Policy {
+ kPolicyFirstCome, // wake the first blocked request in arrival order
+ kPolicyReadersFirst, // wake the first blocked read request(s)
+ kPolicyOldestTxFirst, // wake the blocked request with the lowest TxId
+ kPolicyReadersOnly, // block write requests (used during fsync)
+ kPolicyWritersOnly // block read requests
+ };
+
+ /**
+ * returned by ::conflictExists, called from acquire
+ */
+ enum ResourceStatus {
+ kResourceAcquired, // requested resource was already acquired, increment count
+ kResourceAvailable, // requested resource is available. no waiting
+ kResourceConflict, // requested resource is in use by another transaction
+ kResourcePolicyConflict, // requested mode blocked by READER/kPolicyWritersOnly policy
+ kResourceUpgradeConflict // requested resource was previously acquired for shared use
+ // now requested for exclusive use, but there are other shared
+ // users, so this request must wait.
+ };
+
+ /**
+ * returned by ::find_lock, and ::release and, mostly for testing
+ * explains why a lock wasn't found or released
+ */
+ enum LockStatus {
+ kLockFound, // found a matching lock request
+ kLockReleased, // released requested lock
+ kLockCountDecremented, // decremented lock count, but didn't release
+ kLockIdNotFound, // no locks with this id
+ kLockResourceNotFound, // no locks on the resource
+ kLockModeNotFound // locks on the resource, but not of the specified mode
+ };
+
+ typedef size_t LockId; // valid LockIds are > 0
+ static const LockId kReservedLockId = 0;
+
+ /**
+ * Used to do something just before an acquisition request blocks.
+ *
+ * XXX: should perhaps define Notifier as a functor so C++11 lambda's match
+ * for the test.cpp, it was convenient for the call operator to access private
+ * state, which is why we're using the class formulation for now
+ */
+ class Notifier {
+ public:
+ virtual ~Notifier() { }
+ virtual void operator()(const TxId& blocker) = 0;
+ };
+
+ /**
+ * Tracks locking statistics. For now, just aggregated across all resources/TxIds
+ * Eventually might keep per TxId and/or per Resource, to facilitate identifying
+ * hotspots and problem transactions.
+ */
+ class LockStats {
+ public:
+ LockStats()
+ : _numRequests(0)
+ , _numPreexistingRequests(0)
+ , _numSameRequests(0)
+ , _numBlocks(0)
+ , _numDeadlocks(0)
+ , _numDowngrades(0)
+ , _numUpgrades(0)
+ , _numMillisBlocked(0)
+ , _numCurrentActiveReadRequests(0)
+ , _numCurrentActiveWriteRequests(0) { }
+
+ void incRequests() { _numRequests++; }
+ void incPreexisting() { _numPreexistingRequests++; }
+ void incSame() { _numSameRequests++; }
+ void incBlocks() { _numBlocks++; }
+ void incDeadlocks() { _numDeadlocks++; }
+ void incDowngrades() { _numDowngrades++; }
+ void incUpgrades() { _numUpgrades++; }
+ void incTimeBlocked(size_t numMillis ) { _numMillisBlocked += numMillis; }
+
+ void incStatsForMode(const unsigned mode) {
+ 0==mode ? incActiveReads() : incActiveWrites();
+ }
+ void decStatsForMode(const unsigned mode) {
+ 0==mode ? decActiveReads() : decActiveWrites();
+ }
+
+ void incActiveReads() { _numCurrentActiveReadRequests++; }
+ void decActiveReads() { _numCurrentActiveReadRequests--; }
+ void incActiveWrites() { _numCurrentActiveWriteRequests++; }
+ void decActiveWrites() { _numCurrentActiveWriteRequests--; }
+
+ unsigned numActiveReads() const { return _numCurrentActiveReadRequests; }
+ unsigned numActiveWrites() const { return _numCurrentActiveWriteRequests; }
+
+ size_t getNumRequests() const { return _numRequests; }
+ size_t getNumPreexistingRequests() const { return _numPreexistingRequests; }
+ size_t getNumSameRequests() const { return _numSameRequests; }
+ size_t getNumBlocks() const { return _numBlocks; }
+ size_t getNumDeadlocks() const { return _numDeadlocks; }
+ size_t getNumDowngrades() const { return _numDowngrades; }
+ size_t getNumUpgrades() const { return _numUpgrades; }
+ size_t getNumMillisBlocked() const { return _numMillisBlocked; }
+
+ std::string toString() const;
+
+ private:
+ size_t _numRequests;
+ size_t _numPreexistingRequests;
+ size_t _numSameRequests;
+ size_t _numBlocks;
+ size_t _numDeadlocks;
+ size_t _numDowngrades;
+ size_t _numUpgrades;
+ size_t _numMillisBlocked;
+ unsigned long _numCurrentActiveReadRequests;
+ unsigned long _numCurrentActiveWriteRequests;
+ };
+
+
+ public:
+
+ /**
+ * Singleton factory - retrieves a common instance of LockManager
+ */
+ static LockManager& getSingleton();
+
+ /**
+ * It's possibly useful to allow multiple LockManagers for non-overlapping sets
+ * of resources, so the constructor is left public. Eventually we may want
+ * to enforce a singleton pattern.
+ */
+ explicit LockManager(const Policy& policy=kPolicyFirstCome);
+ ~LockManager();
+
+ /**
+ * Get the current policy
+ */
+ Policy getPolicy() const;
+
+ /**
+ * Who set the current policy. Of use when the Policy is ReadersOnly
+ * and we want to find out who is blocking a writer.
+ */
+ TxId getPolicySetter() const;
+
+ /**
+ * Initiate a shutdown, specifying a period of time to quiesce.
+ *
+ * During this period, existing transactions can continue to acquire resources,
+ * but new transaction requests will throw AbortException.
+ *
+ * After quiescing, any new requests will throw AbortException
+ */
+ void shutdown(const unsigned& millisToQuiesce = 1000);
+
+
+ /**
+ * override default LockManager's default Policy for a transaction.
+ *
+ * positive priority moves transaction's resource requests toward the front
+ * of the queue, behind only those requests with higher priority.
+ *
+ * negative priority moves transaction's resource requests toward the back
+ * of the queue, ahead of only those requests with lower priority.
+ *
+ * zero priority uses the LockManager's default Policy
+ */
+ void setTransactionPriority(const TxId& xid, int priority);
+ int getTransactionPriority(const TxId& xid) const;
+
+
+ /**
+ * acquire a resource in a mode.
+ * can throw AbortException
+ */
+ LockId acquire(const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ Notifier* notifier = NULL);
+
+ /**
+ * for bulk operations:
+ * acquire one of a vector of ResourceIds in a mode,
+ * hopefully without blocking, return index of
+ * acquired ResourceId, or -1 if vector was empty
+ */
+ int acquireOne(const TxId& requestor,
+ const uint32_t& mode,
+ const std::vector<ResourceId>& records,
+ Notifier* notifier = NULL);
+
+ /**
+ * release a ResourceId.
+ * The mode here is just the mode that applies to the resId
+ */
+ LockStatus release(const TxId& holder,
+ const uint32_t& mode,
+ const ResourceId& resId);
+
+ /**
+ * releases the lock returned by acquire. should perhaps replace above?
+ */
+ LockStatus releaseLock(const LockId& lid);
+
+ /**
+ * release all resources acquired by a transaction
+ * returns number of locks released
+ */
+ size_t release(const TxId& holder);
+
+ /**
+ * called internally for deadlock
+ * possibly called publicly to stop a long transaction
+ * also used for testing
+ */
+ MONGO_COMPILER_NORETURN void abort(const TxId& goner);
+
+ /**
+ * returns a copy of the stats that exist at the time of the call
+ */
+ LockStats getStats() const;
+
+
+
+ // --- for testing and logging
+
+ std::string toString() const;
+
+ /**
+ * test whether a TxId has locked a ResourceId in a mode
+ */
+ bool isLocked(const TxId& holder,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ protected:
+
+ /**
+ * Data structure used to record a resource acquisition request
+ */
+ class LockRequest {
+ public:
+ LockRequest(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId);
+
+ ~LockRequest();
+
+ bool matches(const TxId& xid,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ bool isBlocked() const;
+ bool shouldAwake();
+
+ std::string toString() const;
+
+ // uniquely identifies a LockRequest
+ const LockId lid;
+
+ // transaction that made this request
+ const TxId xid;
+
+ // shared or exclusive use
+ const unsigned mode;
+
+ // resource requested
+ const ResourceId resId;
+
+ // number of times xid requested this resource in this mode
+ // request will be deleted when count goes to 0
+ size_t count;
+
+ // number of existing things blocking this request
+ // usually preceding requests on the queue, but also policy
+ size_t sleepCount;
+
+ // used for waiting and waking
+ boost::condition_variable lock;
+ };
+
+ typedef std::map<TxId, std::set<LockId> > TxLocks;
+ typedef std::map<ResourceId, std::list<LockId> > ResourceLocks;
+ typedef std::map<LockId, LockRequest*> LockMap;
+ typedef std::map<TxId, std::set<LockId> > TxLockMap;
+
+ private: // alphabetical
+
+ /**
+ * called by public ::abort and internally upon deadlock
+ * releases all locks acquired by goner, notify's any
+ * transactions that were waiting, then throws AbortException
+ */
+ MONGO_COMPILER_NORETURN void _abortInternal(const TxId& goner);
+
+ /**
+ * main workhorse for acquiring locks on resources, blocking
+ * or aborting on conflict
+ *
+ * returns a non-zero LockId, or throws AbortException on deadlock
+ *
+ */
+ LockId _acquireInternal(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ Notifier* notifier,
+ boost::unique_lock<boost::mutex>& guard);
+
+ /**
+ * adds a conflicting lock request to the list of requests for a resource
+ * using the Policy. Called by acquireInternal
+ */
+ void _addLockToQueueUsingPolicy(LockRequest* lr,
+ std::list<LockId>& queue,
+ std::list<LockId>::iterator& position);
+
+ /**
+ * set up for future deadlock detection, called from acquire
+ */
+ void _addWaiter(const TxId& blocker, const TxId& waiter);
+
+ /**
+ * when inserting a new lock request into the middle of a queue,
+ * add any remaining incompatible requests in the queue to the
+ * new lock request's set of waiters... for future deadlock detection
+ */
+ void _addWaiters(LockRequest* blocker,
+ std::list<LockId>::iterator nextLockId,
+ std::list<LockId>::iterator lastLockId);
+
+ /**
+ * returns true if a newRequest should be honored before an oldRequest according
+ * to the lockManager's policy. Used by acquire to decide whether a new share request
+ * conflicts with a previous upgrade-to-exclusive request that is blocked.
+ */
+ bool _comesBeforeUsingPolicy(const TxId& newReqXid,
+ const unsigned& newReqMode,
+ const LockRequest* oldReq) const;
+
+ /**
+ * determine whether a resource request would conflict with an existing lock
+ * set the position to the first possible insertion point, which is usually
+ * the position of the first conflict, or the end of the queue, or to an existing lock
+ */
+ ResourceStatus _conflictExists(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ std::list<LockId>& queue,
+ std::list<LockId>::iterator& position /* in/out */);
+
+ /**
+ * looks for an existing LockRequest that matches the four input params
+ * if not found, sets outLid to zero and returns a reason, otherwise
+ * sets outLid to the LockId that matches and returns kLockFound
+ */
+ LockStatus _findLock(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId,
+ LockId* outLid) const;
+
+ /**
+ * called externally by getTransactionPriority
+ * and internally by addLockToQueueUsingPolicy
+ */
+ int _getTransactionPriorityInternal(const TxId& xid) const;
+
+ /**
+ * returns true if acquire would return without waiting
+ * used by acquireOne
+ */
+ bool _isAvailable(const TxId& requestor,
+ const unsigned& mode,
+ const ResourceId& resId) const;
+
+ /**
+ * called by public ::release and internally by abort.
+ * assumes caller as acquired a mutex.
+ */
+ LockStatus _releaseInternal(const LockId& lid);
+
+ /**
+ * called at start of public APIs, throws exception
+ * if quiescing period has expired, or if xid is new
+ */
+ void _throwIfShuttingDown(const TxId& xid = 0 ) const;
+
+
+ private:
+
+ // Singleton instance
+ static boost::mutex _getSingletonMutex;
+ static LockManager* _singleton;
+
+ // The Policy controls which requests should be honored first. This is
+ // used to guide the position of a request in a list of requests waiting for
+ // a resource.
+ //
+ // XXX At some point, we may want this to also guide the decision of which
+ // transaction to abort in case of deadlock. For now, the transaction whose
+ // request would lead to a deadlock is aborted. Since deadlocks are rare,
+ // careful choices may not matter much.
+ //
+ Policy _policy;
+ TxId _policySetter;
+
+ // synchronizes access to the lock manager, which is shared across threads
+ mutable boost::mutex _mutex;
+
+ // for blocking when setting kPolicyReadersOnly or kPolicyWritersOnly policy
+ boost::condition_variable _policyLock;
+
+ // only meaningful when _policy == SHUTDOWN
+ bool _shuttingDown;
+ int _millisToQuiesce;
+ Timer _timer;
+
+ // owns the LockRequest*
+ std::map<LockId, LockRequest*> _locks;
+
+ // Lists of lock requests associated with a resource,
+ //
+ // The lock-request lists have two sections. Some number (at least one) of requests
+ // at the front of a list are "active". All remaining lock requests are blocked by
+ // some earlier (not necessarily active) lock request, and are waiting. The order
+ // of lock request in the waiting section is determined by the LockPolicty.
+ // The order of lock request in the active/front portion of the list is irrelevant.
+ //
+ std::map<ResourceId, std::list<LockId> > _resourceLocks;
+
+ // For cleanup and abort processing, references all LockRequests made by a transaction
+ std::map<TxId, std::set<LockId> > _xaLocks;
+
+ // For deadlock detection: the set of transactions blocked by another transaction
+ // NB: a transaction can only be directly waiting for a single resource/transaction
+ // but to facilitate deadlock detection, if T1 is waiting for T2 and T2 is waiting
+ // for T3, then both T1 and T2 are listed as T3's waiters.
+ std::map<TxId, std::set<TxId> > _waiters;
+
+ // track transactions that have aborted, and don't accept further
+ // lock requests from them (which shouldn't happen anyway).
+ //
+ // XXX: this set can grow without bound in the pathological case. One way to deal
+ // with it is to track the oldest active transaction (which may or may not be the
+ // smallest TxId value), and flush older values from this set and ignore older values
+ // in new requests without consulting this set.
+ std::set<TxId> _abortedTxIds;
+
+ // transaction priorities:
+ // 0 => neutral, use LockManager's default _policy
+ // + => high, queue forward
+ // - => low, queue back
+ //
+ std::map<TxId, int> _txPriorities;
+
+ // stats, but also used internally
+ LockStats _stats;
+ };
+
+ /**
+ * RAII wrapper around LockManager, for scoped locking
+ */
+ class ResourceLock {
+ public:
+ ResourceLock(LockManager& lm,
+ const TxId& requestor,
+ const uint32_t& mode,
+ const ResourceId& resId,
+ LockManager::Notifier* notifier = NULL);
+
+ ~ResourceLock();
+ private:
+ LockManager& _lm;
+ LockManager::LockId _lid;
+ };
+
+ class SharedResourceLock : public ResourceLock {
+ public:
+ SharedResourceLock(const TxId& requestor, void* resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kShared,
+ (size_t)resource) { }
+ SharedResourceLock(const TxId& requestor, size_t resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kShared,
+ resource) { }
+ };
+
+ class ExclusiveResourceLock : public ResourceLock {
+ public:
+ ExclusiveResourceLock(const TxId& requestor, void* resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kExclusive,
+ (size_t)resource) { }
+ ExclusiveResourceLock(const TxId& requestor, size_t resource)
+ : ResourceLock(LockManager::getSingleton(),
+ requestor,
+ LockManager::kExclusive,
+ resource) { }
+ };
+} // namespace mongo
diff --git a/src/mongo/db/concurrency/lock_mgr_test.cpp b/src/mongo/db/concurrency/lock_mgr_test.cpp
new file mode 100644
index 00000000000..06840a41cb3
--- /dev/null
+++ b/src/mongo/db/concurrency/lock_mgr_test.cpp
@@ -0,0 +1,896 @@
+/**
+* Copyright (C) MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+/**
+ * tests for util/concurrency/lock_mgr.* capabilities
+ *
+ * Testing concurrency requires multiple threads. In this test, these are packaged as
+ * instances of a ClientTransaction class. The test's main thread creates instances
+ * of ClientTransactions and communicates with them through a pair of producer/consumer
+ * buffers. The driver thread sends requests to ClientTransaction threads using a
+ * TxCommandBuffer, and waits for responses in a TxResponseBuffer. This protocol
+ * allows precise control over timing.
+ *
+ * The producer/consumer buffer is possibly overkill. At present there is only
+ * one producer and one consumer using any one buffer. Also, the current set of
+ * tests are 'lock-step', with the driver never issuing more than one command
+ * before waiting for a response.
+ */
+
+#include <boost/thread/thread.hpp>
+#include "mongo/unittest/unittest.h"
+#include "mongo/db/concurrency/lock_mgr.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+ enum TxCmd {
+ ACQUIRE,
+ RELEASE,
+ ABORT,
+ POLICY,
+ QUIT,
+ INVALID
+ };
+
+ enum TxRsp {
+ ACQUIRED,
+ RELEASED,
+ BLOCKED,
+ AWAKENED,
+ ABORTED,
+ INVALID_RESPONSE
+ };
+
+ class TxResponse {
+ public:
+ TxRsp rspCode;
+ TxId xid;
+ unsigned mode;
+ ResourceId resId;
+ };
+
+ class TxRequest {
+ public:
+ TxCmd cmd;
+ TxId xid;
+ unsigned mode;
+ ResourceId resId;
+ LockManager::Policy policy;
+ };
+
+ class TxResponseBuffer {
+ public:
+ TxResponseBuffer() : _count(0), _readPos(0), _writePos(0) { }
+
+ void post(const TxRsp& rspCode) {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 10)
+ _full.wait(guard);
+ buffer[_writePos++].rspCode = rspCode;
+ _writePos %= 10;
+ _count++;
+ _empty.notify_one();
+ }
+
+ TxResponse* consume() {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 0)
+ _empty.wait(guard);
+ TxResponse* result = &buffer[_readPos++];
+ _readPos %= 10;
+ _count--;
+ _full.notify_one();
+ return result;
+ }
+
+ boost::mutex _guard;
+ boost::condition_variable _full;
+ boost::condition_variable _empty;
+
+ size_t _count;
+ size_t _readPos;
+ size_t _writePos;
+
+ TxResponse buffer[10];
+ };
+
+class TxCommandBuffer {
+public:
+ TxCommandBuffer() : _count(0), _readPos(0), _writePos(0) { }
+
+ void post(const TxCmd& cmd,
+ const TxId& xid = 0,
+ const unsigned& mode = 0,
+ const ResourceId& resId = 0,
+ const LockManager::Policy& policy = LockManager::kPolicyFirstCome) {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 10)
+ _full.wait(guard);
+ buffer[_writePos].cmd = cmd;
+ buffer[_writePos].xid = xid;
+ buffer[_writePos].mode = mode;
+ buffer[_writePos].resId = resId;
+ buffer[_writePos].policy = policy;
+ _writePos++;
+ _writePos %= 10;
+ _count++;
+ _empty.notify_one();
+ }
+
+ TxRequest* consume() {
+ boost::unique_lock<boost::mutex> guard(_guard);
+ while (_count == 0)
+ _empty.wait(guard);
+ TxRequest* result = &buffer[_readPos++];
+ _readPos %= 10;
+ _count--;
+ _full.notify_one();
+ return result;
+ }
+
+ boost::mutex _guard;
+ boost::condition_variable _full;
+ boost::condition_variable _empty;
+
+ size_t _count;
+ size_t _readPos;
+ size_t _writePos;
+
+ TxRequest buffer[10];
+};
+
+class ClientTransaction : public LockManager::Notifier {
+public:
+ // these are called in the main driver program
+
+ ClientTransaction(LockManager* lm, const TxId& xid) : _lm(lm), _xid(xid), _thr(&ClientTransaction::processCmd, this) { }
+ virtual ~ClientTransaction() { _thr.join(); }
+
+ void acquire(const unsigned& mode, const ResourceId resId, const TxRsp& rspCode) {
+ _cmd.post(ACQUIRE, _xid, mode, 1, resId);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(rspCode == rsp->rspCode);
+ }
+
+ void release(const unsigned& mode, const ResourceId resId) {
+ _cmd.post(RELEASE, _xid, mode, 1, resId);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(RELEASED == rsp->rspCode);
+ }
+
+ void abort() {
+ _cmd.post(ABORT, _xid);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(ABORTED == rsp->rspCode);
+ }
+
+ void wakened() {
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(ACQUIRED == rsp->rspCode);
+ }
+
+ void setPolicy(const LockManager::Policy& policy, const TxRsp& rspCode) {
+ _cmd.post(POLICY, _xid, 0, 0, 0, policy);
+ TxResponse* rsp = _rsp.consume();
+ ASSERT(rspCode == rsp->rspCode);
+ }
+
+ void quit() {
+ _cmd.post(QUIT);
+ }
+
+ // these are run within the client threads
+ void processCmd() {
+ bool more = true;
+ while (more) {
+ TxRequest* req = _cmd.consume();
+ switch (req->cmd) {
+ case ACQUIRE:
+ try {
+ _lm->acquire(_xid, req->mode, req->resId, this);
+ _rsp.post(ACQUIRED);
+ } catch (const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+// log() << "t" << _xid << ": aborted, ending" << endl;
+ return;
+ }
+ break;
+ case RELEASE:
+ _lm->release(_xid, req->mode, req->resId);
+ _rsp.post(RELEASED);
+ break;
+ case ABORT:
+ try {
+ _lm->abort(_xid);
+ } catch (const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+ }
+ break;
+ case POLICY:
+ try {
+ _lm->setPolicy(_xid, req->policy, this);
+ _rsp.post(ACQUIRED);
+ } catch( const LockManager::AbortException& err) {
+ _rsp.post(ABORTED);
+ }
+ break;
+ case QUIT:
+ default:
+ more = false;
+ break;
+ }
+ }
+ }
+
+ // inherited from Notifier, used by LockManager::acquire
+ virtual void operator()(const TxId& blocker) {
+ _rsp.post(BLOCKED);
+ }
+
+private:
+ TxCommandBuffer _cmd;
+ TxResponseBuffer _rsp;
+ LockManager* _lm;
+ TxId _xid;
+ boost::thread _thr;
+
+};
+
+TEST(LockManagerTest, TxError) {
+ LockManager lm;
+ LockManager::LockStatus status;
+
+ // release a lock on a container we haven't locked
+ lm.release(1, LockManager::kShared, 0, 1);
+
+
+ // release a lock on a record we haven't locked
+ status = lm.release(1, LockManager::kShared, 0, 1);
+ ASSERT(LockManager::kLockContainerNotFound == status);
+
+
+ // release a lock on a record we haven't locked in a store we have locked
+ lm.acquire(1, LockManager::kShared, 0, 2);
+ status = lm.release(1, LockManager::kShared, 0, 1); // this is in error
+ ASSERT(LockManager::kLockResourceNotFound == status);
+ status = lm.release(1, LockManager::kShared, 0, 2);
+ ASSERT(LockManager::kLockReleased == status);
+
+ // release a record we've locked in a different mode
+ lm.acquire(1, LockManager::kShared, 0, 1);
+ status = lm.release(1, LockManager::kExclusive, 0, 1); // this is in error
+ ASSERT(LockManager::kLockModeNotFound == status);
+ status = lm.release(1, LockManager::kShared, 0, 1);
+ ASSERT(LockManager::kLockReleased == status);
+
+ lm.acquire(1, LockManager::kExclusive, 0, 1);
+ status = lm.release(1, LockManager::kShared, 0, 1); // this is in error
+ ASSERT(LockManager::kLockModeNotFound == status);
+ status = lm.release(1, LockManager::kExclusive, 0, 1);
+ ASSERT(LockManager::kLockReleased == status);
+
+ // attempt to acquire on a transaction that aborted
+ try {
+ lm.abort(1);
+ } catch (const LockManager::AbortException& err) { }
+ try {
+ lm.acquire(1, LockManager::kShared, 0, 1); // error
+ ASSERT(false);
+ } catch (const LockManager::AbortException& error) {
+ }
+}
+
+TEST(LockManagerTest, SingleTx) {
+ LockManager lm;
+ ResourceId store = 1;
+ TxId t1 = 1;
+ ResourceId r1 = 1;
+ LockManager::LockStatus status;
+
+ // acquire a shared record lock
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release a shared record lock
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // acquire a shared record lock twice, on same ResourceId
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release the twice-acquired lock, once. Still locked
+ status = lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(LockManager::kLockCountDecremented == status);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // after 2nd release, it's not locked
+ status = lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(LockManager::kLockReleased == status);
+ ASSERT(!lm.isLocked(t1, LockManager::kShared, store, r1));
+
+
+
+ // --- test downgrade and release ---
+
+ // acquire an exclusive then a shared lock, on the same ResourceId
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release shared first, then exclusive
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+
+ // release exclusive first, then shared
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+
+
+ // --- test upgrade and release ---
+
+ // acquire a shared, then an exclusive lock on the same ResourceId
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+
+ // release exclusive first, then shared
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1));
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+
+ // release shared first, then exclusive
+ lm.acquire(t1, LockManager::kShared, store, r1);
+ lm.acquire(t1, LockManager::kExclusive, store, r1);
+ lm.release(t1, LockManager::kShared, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1));
+ ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1));
+ lm.release(t1, LockManager::kExclusive, store, r1);
+ ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1));
+}
+
+TEST(LockManagerTest, TxConflict) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ // no conflicts with shared locks on same/different objects
+
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ t1.acquire(LockManager::kShared, 2, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kShared, 2);
+ t2.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 2);
+
+
+ // no conflicts with exclusive locks on different objects
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 2, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.release(LockManager::kExclusive, 2);
+
+
+ // shared then exclusive conflict
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t2's request is incompatible with t1's lock, so it should block
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened(); // with t1's lock released, t2 should wake
+ t2.release(LockManager::kExclusive, 1);
+
+ // exclusive then shared conflict
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ // exclusive then exclusive conflict
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxDeadlock) {
+ LockManager lm(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ ClientTransaction a1(&lm, 4);
+ ClientTransaction a2(&lm, 5);
+ ClientTransaction a3(&lm, 6);
+ ClientTransaction a4(&lm, 7);
+ ClientTransaction a5(&lm, 8);
+
+ // simple deadlock test 1
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ a1.acquire(LockManager::kShared, 2, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 2, BLOCKED);
+ // a1's request would form a dependency cycle, so it should abort
+ a1.acquire(LockManager::kExclusive, 1, ABORTED);
+ t1.wakened(); // with t2's locks released, t1 should wake
+ t1.release(LockManager::kExclusive, 2);
+ t1.release(LockManager::kShared, 1);
+
+ // simple deadlock test 2
+ a2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ // a2's request would form a dependency cycle, so it should abort
+ a2.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened(); // with a2's locks released, t2 should wake
+ t2.release(LockManager::kExclusive, 1);
+ t2.release(LockManager::kShared, 2);
+
+ // three way deadlock
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED);
+ a3.acquire(LockManager::kShared, 3, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 2, BLOCKED);
+ t2.acquire(LockManager::kExclusive, 3, BLOCKED);
+ // a3's request would form a dependency cycle, so it should abort
+ a3.acquire(LockManager::kExclusive, 1, ABORTED);
+ t2.wakened(); // with a3's lock release, t2 should wake
+ t2.release(LockManager::kShared, 2);
+ t1.wakened(); // with t2's locks released, t1 should wake
+ t2.release(LockManager::kExclusive, 3);
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kExclusive, 2);
+
+ // test for phantom deadlocks
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ // at this point, t2 should no longer be waiting for t1
+ // so it should be OK for t1 to wait for t2
+ t1.acquire(LockManager::kShared, 1, BLOCKED);
+ t2.release(LockManager::kExclusive, 1);
+ t1.wakened();
+ t1.release(LockManager::kShared, 1);
+
+ // test for missing deadlocks
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a4
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on t1
+ // after this, because readers first policy, t2 should
+ // also be waiting on a4.
+ a4.acquire(LockManager::kShared, 1, ACQUIRED);
+ // after this, t2 should be waiting ONLY on a4
+ t1.release(LockManager::kShared, 1);
+ // So a4 should not be allowed to wait on t2's resource.
+ a4.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+ t2.release(LockManager::kExclusive, 1);
+
+
+ // test for missing deadlocks: due to downgrades
+ a5.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ a5.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a5
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on a5
+ a5.release(LockManager::kExclusive, 1);
+ // at this point, t2 should still be blocked on a5's downgraded lock
+ // So a5 should not be allowed to wait on t2's resource.
+ a5.acquire(LockManager::kExclusive, 2, ABORTED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxDowngrade) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade
+ // t1 still has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade
+ // t1 still has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kShared, 1);
+ // with t1 still holding exclusive on resource 1, t2 still blocked
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t2.release(LockManager::kShared, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ // t1 has exclusive on resource 1, so t2 must wait
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ // even though t2 is waiting for resource 1, t1 can still use it shared,
+ // because it already owns exclusive lock and can't block on itself
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened(); // with the exclusive lock released, t2 wakes
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ // t2 acquires exclusive during t1's downgrade
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+}
+
+TEST(LockManagerTest, TxUpgrade) {
+ LockManager lm(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+ ClientTransaction t3(&lm, 3);
+
+ ClientTransaction a2(&lm, 4);
+ ClientTransaction a3(&lm, 5);
+
+ // test upgrade succeeds, blocks subsequent reads
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED); // upgrade
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t1.release(LockManager::kShared, 1);
+ t2.release(LockManager::kShared, 1);
+
+ // test upgrade blocks, then wakes
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t1 can't use resource 1 exclusively yet, because t2 is using it
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.release(LockManager::kShared, 1);
+ t1.wakened(); // with t2's shared lock released, t1 wakes
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+
+ // test upgrade blocks on several, then wakes
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ // t1 can't use resource 1 exclusively yet, because t2 is using it
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t3.acquire(LockManager::kShared, 1, ACQUIRED); // additional blocker
+ t2.release(LockManager::kShared, 1); // t1 still blocked
+ t3.release(LockManager::kShared, 1);
+ t1.wakened(); // with t3's shared lock released, t1 wakes
+ t1.release(LockManager::kExclusive, 1);
+ t1.release(LockManager::kShared, 1);
+
+ // failure to upgrade
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ a2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ a2.acquire(LockManager::kExclusive, 1, ABORTED);
+ // with a2's abort, t1 can wake
+ t1.wakened();
+ t1.release(LockManager::kShared, 1);
+ t1.release(LockManager::kExclusive, 1);
+
+ // failure to upgrade
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, ACQUIRED);
+ t1.acquire(LockManager::kExclusive, 1, BLOCKED);
+ a3.acquire(LockManager::kShared, 1, ACQUIRED);
+ t2.release(LockManager::kShared, 1); // t1 still blocked on a3
+ a3.acquire(LockManager::kExclusive, 1, ABORTED);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+}
+
+TEST(LockManagerTest, TxPolicy) {
+
+ {
+ // Test FirstComeFirstServe policy
+ LockManager lm_first;
+ ClientTransaction t1(&lm_first, 1);
+ ClientTransaction t2(&lm_first, 2);
+ ClientTransaction t3(&lm_first, 3);
+ // test1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t2 should wake first, because its request came before t3's
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ // test2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, because its request came before t2's
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ // Test kPolicyReadersFirst
+ // shared request are considered read requests
+
+ LockManager lm_readers(LockManager::kPolicyReadersFirst);
+ ClientTransaction t1(&lm_readers, 1);
+ ClientTransaction t2(&lm_readers, 2);
+ ClientTransaction t3(&lm_readers, 3);
+
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, even though t3 came first in time
+ // because t2 is a reader and t3 is a writer
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ // Test OLDEST_TX_FIRST policy
+ // for now, smaller TxIds are considered older
+
+ LockManager lm_oldest(LockManager::kPolicyOldestTxFirst);
+ ClientTransaction t1(&lm_oldest, 1);
+ ClientTransaction t2(&lm_oldest, 2);
+ ClientTransaction t3(&lm_oldest, 3);
+
+ // test 1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, even though t3 came first in time
+ // because t2 is older than t3
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ // test 2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+
+ // t2 should wake first, because it's older than t3
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ }
+
+ {
+ LockManager lm_blockers(LockManager::kPolicyBlockersFirst);
+ ClientTransaction t1(&lm_blockers, 1);
+ ClientTransaction t2(&lm_blockers, 2);
+ ClientTransaction t3(&lm_blockers, 3);
+ ClientTransaction t4(&lm_blockers, 4);
+
+ // BIGGEST_BLOCKER_FIRST policy
+
+ // set up t3 as the biggest blocker
+ t3.acquire(LockManager::kExclusive, 2, ACQUIRED);
+ t4.acquire(LockManager::kExclusive, 2, BLOCKED);
+
+ // test 1
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, because it's a bigger blocker than t2
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ // test 2
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ t2.acquire(LockManager::kShared, 1, BLOCKED);
+ t3.acquire(LockManager::kExclusive, 1, BLOCKED);
+ t1.release(LockManager::kExclusive, 1);
+ // t3 should wake first, even though t2 came first,
+ // because it's a bigger blocker than t2
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 1);
+ t2.wakened();
+ t2.release(LockManager::kShared, 1);
+
+ t3.release(LockManager::kExclusive, 2);
+ t4.wakened();
+ t4.release(LockManager::kExclusive, 2);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ t4.quit();
+ }
+}
+
+/*
+ * test kPolicyReadersOnly and kPolicyWritersOnly
+ */
+TEST(LockManagerTest, TxOnlyPolicies) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+ ClientTransaction t3(&lm, 3);
+ ClientTransaction t4(&lm, 4);
+ ClientTransaction t5(&lm, 5);
+ ClientTransaction tp(&lm, 6);
+
+ // show kPolicyReadersOnly blocking writers, which
+ // awake when policy reverts
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyReadersOnly, ACQUIRED);
+ t3.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict
+ t4.acquire(LockManager::kExclusive, 1, BLOCKED); // both policy & t1
+ t5.acquire(LockManager::kShared, 1, ACQUIRED); // even tho t4
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t3.wakened();
+ t3.release(LockManager::kExclusive, 2);
+ t1.release(LockManager::kShared, 1);
+ t5.release(LockManager::kShared, 1);
+ t4.wakened();
+ t4.release(LockManager::kExclusive, 1);
+
+ // show WRITERS_ONLY blocking readers, which
+ // awake when policy reverts
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyWritersOnly, ACQUIRED);
+ t3.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict
+ t4.acquire(LockManager::kShared, 1, BLOCKED); // both policy & t1
+ t1.release(LockManager::kExclusive, 1);
+ t5.acquire(0x3/*LockManager::kExclusive*/, 2, ACQUIRED); // even tho t3
+ t5.release(LockManager::kExclusive, 2);
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t3.wakened();
+ t3.release(LockManager::kShared, 2);
+ t4.wakened();
+ t4.release(LockManager::kShared, 1);
+
+ // show READERS_ONLY blocked by existing writer
+ // but still blocking new writers
+ t1.acquire(LockManager::kExclusive, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyReadersOnly, BLOCKED); // blocked by t1
+ t2.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict
+ t3.acquire(LockManager::kShared, 2, ACQUIRED); // even tho t2
+ t3.release(LockManager::kShared, 2);
+ t1.release(LockManager::kExclusive, 1);
+ tp.wakened();
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t2.wakened();
+ t2.release(LockManager::kExclusive, 2);
+
+ // show WRITERS_ONLY blocked by existing reader
+ // but still blocking new readers
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ tp.setPolicy(LockManager::kPolicyWritersOnly, BLOCKED); // blocked by t1
+ t2.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict
+ t1.release(LockManager::kShared, 1);
+ tp.wakened();
+ tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED);
+ t2.wakened();
+ t2.release(LockManager::kShared, 2);
+
+ t1.quit();
+ t2.quit();
+ t3.quit();
+ t4.quit();
+ t5.quit();
+ tp.quit();
+}
+
+TEST(LockManagerTest, TxShutdown) {
+ LockManager lm;
+ ClientTransaction t1(&lm, 1);
+ ClientTransaction t2(&lm, 2);
+
+ t1.acquire(LockManager::kShared, 1, ACQUIRED);
+ lm.shutdown(3000);
+
+ // t1 can still do work while quiescing
+ t1.release(LockManager::kShared, 1);
+ t1.acquire(LockManager::kShared, 2, ACQUIRED);
+
+ // t2 is new and should be refused
+ t2.acquire(LockManager::kShared, 3, ABORTED);
+
+ // after the quiescing period, t1's request should be refused
+ sleep(3);
+ t1.acquire(LockManager::kShared, 4, ABORTED);
+}
+}
diff --git a/src/mongo/db/d_concurrency.cpp b/src/mongo/db/d_concurrency.cpp
index c13ea3e9866..a8651645303 100644
--- a/src/mongo/db/d_concurrency.cpp
+++ b/src/mongo/db/d_concurrency.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/d_concurrency.h"
#include "mongo/db/client.h"
@@ -36,6 +38,7 @@
#include "mongo/db/d_globals.h"
#include "mongo/db/lockstat.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/server.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/mapsf.h"
@@ -402,7 +405,46 @@ namespace mongo {
}
}
- void Lock::DBWrite::lockOther(const StringData& db) {
+ void Lock::DBWrite::lockOtherRead(const StringData& db) {
+ fassert(18517, !db.empty());
+
+ // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work.
+ if( _lockState->otherCount() ) {
+ // nested. prev could be read or write. if/when we do temprelease with DBRead/DBWrite we will need to increment/decrement here
+ // (so we can not release or assert if nested). temprelease we should avoid if we can though, it's a bit of an anti-pattern.
+ massert(18513,
+ str::stream() << "internal error tried to lock two databases at the same time. old:"
+ << _lockState->otherName() << " new:" << db,
+ db == _lockState->otherName());
+ return;
+ }
+
+ // first lock for this db. check consistent order with local db lock so we never deadlock. local always comes last
+ massert(18514,
+ str::stream() << "can't dblock:" << db
+ << " when local or admin is already locked",
+ _lockState->nestableCount() == 0);
+
+ if (db != _lockState->otherName()) {
+ DBLocksMap::ref r(dblocks);
+ WrapperForRWLock*& lock = r[db];
+ if (lock == NULL) {
+ lock = new WrapperForRWLock(db);
+ }
+
+ _lockState->lockedOther(db, -1, lock);
+ }
+ else {
+ DEV OCCASIONALLY{ dassert(dblocks.get(db) == _lockState->otherLock()); }
+ _lockState->lockedOther(-1);
+ }
+
+ fassert(18515, _weLocked == 0);
+ _lockState->otherLock()->lock_shared();
+ _weLocked = _lockState->otherLock();
+ }
+
+ void Lock::DBWrite::lockOtherWrite(const StringData& db) {
fassert(16252, !db.empty());
// we do checks first, as on assert destructor won't be called so don't want to be half finished with our work.
@@ -470,8 +512,16 @@ namespace mongo {
_locked_W = true;
return;
}
- if( !nested )
- lockOther(db);
+
+ if (!nested) {
+ if (_isIntentWrite) {
+ lockOtherRead(db);
+ }
+ else {
+ lockOtherWrite(db);
+ }
+ }
+
lockTop();
if( nested )
lockNestable(nested);
@@ -496,9 +546,12 @@ namespace mongo {
lockNestable(nested);
}
- Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns)
- : ScopedLock(lockState, 'w' ), _what(ns.toString()), _nested(false) {
- lockDB( _what );
+ Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns, bool intentWrite)
+ : ScopedLock(lockState, 'w'),
+ _isIntentWrite(intentWrite),
+ _what(ns.toString()),
+ _nested(false) {
+ lockDB(_what);
}
Lock::DBRead::DBRead(LockState* lockState, const StringData& ns)
@@ -753,4 +806,10 @@ namespace mongo {
} lockStatsServerStatusSection;
+
+ // This startup parameter enables experimental document-level locking features, which work
+ // for update-in-place changes only (i.e., no index updates and no document growth or
+ // movement). It should be removed once full document-level locking is checked-in.
+ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(useExperimentalDocLocking, bool, false);
+
}
diff --git a/src/mongo/db/d_concurrency.h b/src/mongo/db/d_concurrency.h
index 4696626b704..a11351b5a84 100644
--- a/src/mongo/db/d_concurrency.h
+++ b/src/mongo/db/d_concurrency.h
@@ -170,7 +170,8 @@ namespace mongo {
void lockTop();
void lockNestable(Nestable db);
- void lockOther(const StringData& db);
+ void lockOtherWrite(const StringData& db);
+ void lockOtherRead(const StringData& db);
void lockDB(const std::string& ns);
void unlockDB();
@@ -179,12 +180,13 @@ namespace mongo {
void _relock();
public:
- DBWrite(LockState* lockState, const StringData& dbOrNs);
+ DBWrite(LockState* lockState, const StringData& dbOrNs, bool intentWrite = false);
virtual ~DBWrite();
private:
bool _locked_w;
bool _locked_W;
+ bool _isIntentWrite;
WrapperForRWLock *_weLocked;
const std::string _what;
bool _nested;
@@ -248,4 +250,7 @@ namespace mongo {
~writelocktry();
bool got() const { return _got; }
};
+
+ // Defined in instance.cpp
+ extern bool useExperimentalDocLocking;
}
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index b70210c4ef2..17111a87b73 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -605,7 +605,7 @@ namespace mongo {
UpdateExecutor executor(&request, &op.debug());
uassertStatusOK(executor.prepare());
- Lock::DBWrite lk(txn->lockState(), ns.ns());
+ Lock::DBWrite lk(txn->lockState(), ns.ns(), useExperimentalDocLocking);
// if this ever moves to outside of lock, need to adjust check
// Client::Context::_finishInit
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index f3d21129fb3..963109b2537 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -56,6 +56,7 @@
namespace mongo {
namespace mb = mutablebson;
+
namespace {
const char idFieldName[] = "_id";
@@ -624,7 +625,6 @@ namespace mongo {
runner->saveState();
if (inPlace && !driver->modsAffectIndices()) {
-
// If a set of modifiers were all no-ops, we are still 'in place', but there is
// no work to do, in which case we want to consider the object unchanged.
if (!damages.empty() ) {
@@ -636,8 +636,12 @@ namespace mongo {
newObj = oldObj;
}
else {
-
// The updates were not in place. Apply them through the file manager.
+
+ // XXX: With experimental document-level locking, we do not hold the sufficient
+ // locks, so this would cause corruption.
+ fassert(18516, !useExperimentalDocLocking);
+
newObj = doc.getObject();
uassert(17419,
str::stream() << "Resulting document after update is larger than "
diff --git a/src/mongo/db/startup_warnings.cpp b/src/mongo/db/startup_warnings.cpp
index d7e187cd5b0..15570c973c6 100644
--- a/src/mongo/db/startup_warnings.cpp
+++ b/src/mongo/db/startup_warnings.cpp
@@ -40,6 +40,8 @@
namespace mongo {
+ extern bool useExperimentalDocLocking;
+
//
// system warnings
//
@@ -72,6 +74,18 @@ namespace mongo {
warned = true;
}
+ if (useExperimentalDocLocking) {
+ log() << "** WARNING: Experimental (and untested) document-level locking for in-place"
+ << startupWarningsLog;
+ log() << " updates, which do not modify indexed values, has been enabled."
+ << startupWarningsLog;
+ log() << " This should absolutely not be used on production systems and is"
+ << startupWarningsLog;
+ log() << " for demonstration purposes only."
+ << startupWarningsLog;
+ warned = true;
+ }
+
#if defined(_WIN32) && !defined(_WIN64)
// Warn user that they are running a 32-bit app on 64-bit Windows
BOOL wow64Process;
diff --git a/src/mongo/db/structure/record_store_v1_base.cpp b/src/mongo/db/structure/record_store_v1_base.cpp
index 9a80cd62ceb..e2fc7ff8312 100644
--- a/src/mongo/db/structure/record_store_v1_base.cpp
+++ b/src/mongo/db/structure/record_store_v1_base.cpp
@@ -325,7 +325,7 @@ namespace mongo {
Status RecordStoreV1Base::updateWithDamages( OperationContext* txn,
const DiskLoc& loc,
- const char* damangeSource,
+ const char* damageSource,
const mutablebson::DamageVector& damages ) {
_paddingFits( txn );
@@ -336,7 +336,7 @@ namespace mongo {
mutablebson::DamageVector::const_iterator where = damages.begin();
const mutablebson::DamageVector::const_iterator end = damages.end();
for( ; where != end; ++where ) {
- const char* sourcePtr = damangeSource + where->sourceOffset;
+ const char* sourcePtr = damageSource + where->sourceOffset;
void* targetPtr = txn->recoveryUnit()->writingPtr(root + where->targetOffset, where->size);
std::memcpy(targetPtr, sourcePtr, where->size);
}