diff options
-rw-r--r-- | src/mongo/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_cursor_cache.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/concurrency/SConscript | 15 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_mgr.cpp | 1044 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_mgr.h | 621 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_mgr_test.cpp | 896 | ||||
-rw-r--r-- | src/mongo/db/d_concurrency.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/d_concurrency.h | 9 | ||||
-rw-r--r-- | src/mongo/db/instance.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/update.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/startup_warnings.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/structure/record_store_v1_base.cpp | 4 |
14 files changed, 2687 insertions, 21 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 7ed2211bf51..84e32ad556b 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -20,6 +20,7 @@ env.SConscript(['base/SConscript', 'db/auth/SConscript', 'db/catalog/SConscript', 'db/commands/SConscript', + 'db/concurrency/SConscript', 'db/exec/SConscript', 'db/fts/SConscript', 'db/index/SConscript', @@ -902,6 +903,7 @@ env.Library("serveronly", serverOnlyFiles, "db/auth/authmongod", "db/fts/ftsmongod", "db/common", + "db/concurrency/lock_mgr", "db/ops/update_driver", "defaultversion", "geoparser", diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 1c137515d65..b1f705f1411 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -37,6 +37,7 @@ #include "mongo/db/curop.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/concurrency/lock_mgr.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/operation_context.h" @@ -406,6 +407,8 @@ namespace mongo { // Broadcast the mutation so that query results stay correct. _cursorCache.invalidateDocument(loc, INVALIDATION_MUTATION); + + ExclusiveResourceLock lk((size_t)txn->getCurOp()->opNum(), *(size_t*)&loc); return _recordStore->updateWithDamages( txn, loc, damangeSource, damages ); } diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp index e9962c9441f..2e941dd1f40 100644 --- a/src/mongo/db/catalog/collection_cursor_cache.cpp +++ b/src/mongo/db/catalog/collection_cursor_cache.cpp @@ -388,14 +388,18 @@ namespace mongo { } void CollectionCursorCache::registerRunner( Runner* runner ) { - SimpleMutex::scoped_lock lk( _mutex ); - const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner); - invariant(result.second); // make sure this was inserted + if (!useExperimentalDocLocking) { + SimpleMutex::scoped_lock lk(_mutex); + const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner); + invariant(result.second); // make sure this was inserted + } } void CollectionCursorCache::deregisterRunner( Runner* runner ) { - SimpleMutex::scoped_lock lk( _mutex ); - _nonCachedRunners.erase( runner ); + if (!useExperimentalDocLocking) { + SimpleMutex::scoped_lock lk(_mutex); + _nonCachedRunners.erase(runner); + } } ClientCursor* CollectionCursorCache::find( CursorId id, bool pin ) { diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 1711348a35b..52acb7aae6f 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -1092,14 +1092,13 @@ namespace mongo { } /////////////////////////////////////////// - Lock::DBWrite writeLock(txn->lockState(), nsString.ns()); + Lock::DBWrite writeLock(txn->lockState(), nsString.ns(), useExperimentalDocLocking); /////////////////////////////////////////// if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) return; - Client::Context ctx( nsString.ns(), - false /* don't check version */ ); + Client::Context ctx(nsString.ns(), false /* don't check version */); try { UpdateResult res = executor.execute(txn, ctx.db()); diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript new file mode 100644 index 00000000000..53be5435c91 --- /dev/null +++ b/src/mongo/db/concurrency/SConscript @@ -0,0 +1,15 @@ +# -*- mode: python -*- + +Import("env") + +env.Library( + target='lock_mgr', + source=[ + 'lock_mgr.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/third_party/shim_boost', + '$BUILD_DIR/mongo/foundation' + ], +) diff --git a/src/mongo/db/concurrency/lock_mgr.cpp b/src/mongo/db/concurrency/lock_mgr.cpp new file mode 100644 index 00000000000..d0cc06ead8d --- /dev/null +++ b/src/mongo/db/concurrency/lock_mgr.cpp @@ -0,0 +1,1044 @@ +/** +* Copyright (C) MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/platform/basic.h" + +#include "mongo/db/concurrency/lock_mgr.h" + +#include <boost/thread/locks.hpp> +#include <sstream> + +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/timer.h" + +using boost::unique_lock; + +using std::endl; +using std::exception; +using std::list; +using std::map; +using std::set; +using std::string; +using std::stringstream; +using std::vector; + +namespace mongo { + + /*---------- Utility functions ----------*/ + + namespace { + + /* + * Given a 'hierarchical' resource (e.g. document in collection in database) + * return true if the resource is reqeuested for exclusive access at a given level. + * In the example above, level 0 for document, level 1 for collection, level 2 for DB + * and level 3 for the system as a whole. + * + * mode is a bit vector, level is the index. + */ + bool isExclusive(unsigned mode, const unsigned level=0) { + return 0 != (mode & (0x1 << level)); + } + + bool isShared(unsigned mode, const unsigned level=0) { + return 0 == (mode & (0x1 << level)); + } + + bool isCompatible(unsigned mode1, unsigned mode2) { + return mode1==mode2 && (isShared(mode1) || isShared(mode1)); + } + + bool hasConflict(const LockManager::ResourceStatus& status) { + return LockManager::kResourceConflict == status || + LockManager::kResourceUpgradeConflict == status || + LockManager::kResourcePolicyConflict == status; + } + + // unique id for each LockRequest. zero is reserved + static LockManager::LockId nextLid = 1; + } // namespace + + /*---------- AbortException functions ----------*/ + + const char* LockManager::AbortException::what() const throw() { return "AbortException"; } + + /*---------- LockStats functions ----------*/ + string LockManager::LockStats::toString() const { + stringstream result; + result << "----- LockManager Stats -----" << endl + << "\ttotal requests: " << getNumRequests() << endl + << "\t# pre-existing: " << getNumPreexistingRequests() << endl + << "\t# same: " << getNumSameRequests() << endl + << "\t# times blocked: " << getNumBlocks() << endl + << "\t# ms blocked: " << getNumMillisBlocked() << endl + << "\t# deadlocks: " << getNumDeadlocks() << endl + << "\t# downgrades: " << getNumDowngrades() << endl + << "\t# upgrades: " << getNumUpgrades() << endl + ; + return result.str(); + } + + /*---------- LockRequest functions ----------*/ + + + LockManager::LockRequest::LockRequest(const TxId& xid, + const unsigned& mode, + const ResourceId& resId) + : lid(nextLid++) + , xid(xid) + , mode(mode) + , resId(resId) + , count(1) + , sleepCount(0) { } + + + LockManager::LockRequest::~LockRequest() { } + + bool LockManager::LockRequest::matches(const TxId& xid, + const unsigned& mode, + const ResourceId& resId) const { + return + this->xid == xid && + this->mode == mode && + this->resId == resId; + } + + string LockManager::LockRequest::toString() const { + stringstream result; + result << "<lid:" << lid + << ",xid:" << xid + << ",mode:" << mode + << ",resId:" << resId + << ",count:" << count + << ",sleepCount:" << sleepCount + << ">"; + return result.str(); + } + + bool LockManager::LockRequest::isBlocked() const { + return sleepCount > 0; + } + + bool LockManager::LockRequest::shouldAwake() { + return 0 == --sleepCount; + } + + /*---------- LockManager public functions (mutex guarded) ---------*/ + + LockManager* LockManager::_singleton = NULL; + boost::mutex LockManager::_getSingletonMutex; + LockManager& LockManager::getSingleton() { + unique_lock<boost::mutex> lk(_getSingletonMutex); + if (NULL == _singleton) { + _singleton = new LockManager(); + } + return *_singleton; + } + + LockManager::LockManager(const Policy& policy) + : _policy(policy), + _mutex(), + _shuttingDown(false), + _millisToQuiesce(-1) { } + + LockManager::~LockManager() { + unique_lock<boost::mutex> lk(_mutex); + for (map<LockId, LockRequest*>::iterator locks = _locks.begin(); + locks != _locks.end(); ++locks) { + delete locks->second; + } + } + + void LockManager::shutdown(const unsigned& millisToQuiesce) { + unique_lock<boost::mutex> lk(_mutex); + +#ifdef DONT_ALLOW_CHANGE_TO_QUIESCE_PERIOD + // XXX not sure whether we want to allow multiple shutdowns + // in order to change quiesce period? + if (_shuttingDown) { + return; // already in shutdown, don't extend quiescence(?) + } +#endif + + _shuttingDown = true; + _millisToQuiesce = millisToQuiesce; + _timer.millisReset(); + } + + LockManager::Policy LockManager::getPolicy() const { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(); + return _policy; + } + + TxId LockManager::getPolicySetter() const { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(); + return _policySetter; + } + + void LockManager::setTransactionPriority(const TxId& xid, int priority) { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(xid); + _txPriorities[xid] = priority; + } + + int LockManager::getTransactionPriority(const TxId& xid) const { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(xid); + return _getTransactionPriorityInternal(xid); + } + + LockManager::LockId LockManager::acquire(const TxId& requestor, + const uint32_t& mode, + const ResourceId& resId, + Notifier* notifier) { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(requestor); + + // don't accept requests from aborted transactions + if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) { + throw AbortException(); + } + + _stats.incRequests(); + _stats.incStatsForMode(mode); + + return _acquireInternal(requestor, mode, resId, notifier, lk); + } + + int LockManager::acquireOne(const TxId& requestor, + const uint32_t& mode, + const vector<ResourceId>& resources, + Notifier* notifier) { + + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(requestor); + + if (resources.empty()) { return -1; } + + // don't accept requests from aborted transactions + if (_abortedTxIds.find(requestor) != _abortedTxIds.end()) { + throw AbortException(); + } + + _stats.incRequests(); + + // acquire the first available recordId + for (unsigned ix=0; ix < resources.size(); ix++) { + if (_isAvailable(requestor, mode, resources[ix])) { + _acquireInternal(requestor, mode, resources[ix], notifier, lk); + _stats.incStatsForMode(mode); + return ix; + } + } + + // sigh. none of the records are currently available. wait on the first. + _stats.incStatsForMode(mode); + _acquireInternal(requestor, mode, resources[0], notifier, lk); + return 0; + } + + LockManager::LockStatus LockManager::releaseLock(const LockId& lid) { + unique_lock<boost::mutex> lk(_mutex); + + LockMap::iterator it = _locks.find(lid); + if (it != _locks.end()) { + LockRequest* theLock = it->second; + _throwIfShuttingDown(theLock->xid); + _stats.decStatsForMode(theLock->mode); + if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) || + (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) { + _policyLock.notify_one(); + } + } + return _releaseInternal(lid); + } + + LockManager::LockStatus LockManager::release(const TxId& holder, + const uint32_t& mode, + const ResourceId& resId) { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(holder); + + LockId lid; + LockStatus status = _findLock(holder, mode, resId, &lid); + if (kLockFound != status) { + return status; // error, resource wasn't acquired in this mode by holder + } + _stats.decStatsForMode(_locks[lid]->mode); + if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) || + (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) { + _policyLock.notify_one(); + } + return _releaseInternal(lid); + } + + /* + * release all resource acquired by a transaction, returning the count + */ + size_t LockManager::release(const TxId& holder) { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(holder); + + TxLockMap::iterator lockIdsHeld = _xaLocks.find(holder); + if (lockIdsHeld == _xaLocks.end()) { return 0; } + size_t numLocksReleased = 0; + for (set<LockId>::iterator nextLockId = lockIdsHeld->second.begin(); + nextLockId != lockIdsHeld->second.end(); ++nextLockId) { + _releaseInternal(*nextLockId); + + _stats.decStatsForMode(_locks[*nextLockId]->mode); + + if ((kPolicyWritersOnly == _policy && 0 == _stats.numActiveReads()) || + (kPolicyReadersOnly == _policy && 0 == _stats.numActiveWrites())) { + _policyLock.notify_one(); + } + numLocksReleased++; + } + return numLocksReleased; + } + + void LockManager::abort(const TxId& goner) { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(goner); + _abortInternal(goner); + } + + LockManager::LockStats LockManager::getStats() const { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(); + return _stats; + } + + string LockManager::toString() const { +// unique_lock<boost::mutex> lk(_mutex); +#ifdef DONT_CARE_ABOUT_DEBUG_EVEN_WHEN_SHUTTING_DOWN + // seems like we might want to allow toString for debug during shutdown? + _throwIfShuttingDown(); +#endif + stringstream result; + result << "Policy: "; + switch(_policy) { + case kPolicyFirstCome: + result << "FirstCome"; + break; + case kPolicyReadersFirst: + result << "ReadersFirst"; + break; + case kPolicyOldestTxFirst: + result << "OldestFirst"; + break; + case kPolicyReadersOnly: + result << "ReadersOnly"; + break; + case kPolicyWritersOnly: + result << "WritersOnly"; + break; + } + result << endl; + + if (_shuttingDown) + result << " shutting down in " << _millisToQuiesce - _timer.millis(); + + result << "\t_locks:" << endl; + for (map<LockId,LockRequest*>::const_iterator locks = _locks.begin(); + locks != _locks.end(); ++locks) { + result << "\t\t" << locks->first << locks->second->toString() << endl; + } + + result << "\t_resourceLocks:" << endl; + bool firstResource=true; + result << "resources=" << ": {"; + for (map<ResourceId,list<LockId> >::const_iterator nextResource = _resourceLocks.begin(); + nextResource != _resourceLocks.end(); ++nextResource) { + if (firstResource) firstResource=false; + else result << ", "; + result << nextResource->first << ": {"; + bool firstLock=true; + for (list<LockId>::const_iterator nextLockId = nextResource->second.begin(); + nextLockId != nextResource->second.end(); ++nextLockId) { + if (firstLock) firstLock=false; + else result << ", "; + result << *nextLockId; + } + result << "}"; + } + result << "}" << endl; + + result << "\t_waiters:" << endl; + for (map<TxId, set<TxId> >::const_iterator txWaiters = _waiters.begin(); + txWaiters != _waiters.end(); ++txWaiters) { + bool firstTime=true; + result << "\t\t" << txWaiters->first << ": {"; + for (set<TxId>::const_iterator nextWaiter = txWaiters->second.begin(); + nextWaiter != txWaiters->second.end(); ++nextWaiter) { + if (firstTime) firstTime=false; + else result << ", "; + result << *nextWaiter; + } + result << "}" << endl; + } + + bool firstGoner = true; + result << "\t_aborted: {" << endl; + for (set<TxId>::iterator goners = _abortedTxIds.begin(); + goners != _abortedTxIds.end(); ++goners) { + if (firstGoner) firstGoner = false; + else result << ","; + result << "t" << *goners; + } + result << "}"; + + return result.str(); + } + + bool LockManager::isLocked(const TxId& holder, + const uint32_t& mode, + const ResourceId& resId) const { + unique_lock<boost::mutex> lk(_mutex); + _throwIfShuttingDown(holder); + + LockId unused; + return kLockFound == _findLock(holder, mode, resId, &unused); + } + + /*---------- LockManager private functions (alphabetical) ----------*/ + + /* + * release resources acquired by a transaction about to abort, notifying + * any waiters that they can retry their resource acquisition. cleanup + * and throw an AbortException. + */ + void LockManager::_abortInternal(const TxId& goner) { + TxLockMap::iterator locks = _xaLocks.find(goner); + + if (locks == _xaLocks.end()) { + // unusual, but possible to abort a transaction with no locks + throw AbortException(); + } + + // make a copy of the TxId's locks, because releasing + // would otherwise affect the iterator. XXX find a better way? + // + set<LockId> copyOfLocks = locks->second; + + // release all resources acquired by this transaction + // notifying any waiters that they can continue + // + for (set<LockId>::iterator nextLockId = copyOfLocks.begin(); + nextLockId != copyOfLocks.end(); ++nextLockId) { + _releaseInternal(*nextLockId); + } + + // add to set of aborted transactions + _abortedTxIds.insert(goner); + + throw AbortException(); + } + + LockManager::LockId LockManager::_acquireInternal(const TxId& requestor, + const uint32_t& mode, + const ResourceId& resId, + Notifier* sleepNotifier, + unique_lock<boost::mutex>& guard) { +#if 0 + static unsigned long funCount = 0; + if (0 == ++funCount % 10000) { + log() << _stats.toString(); + } +#endif + list<LockId>& queue = _resourceLocks[resId]; + if (!queue.empty()) { _stats.incPreexisting(); } + list<LockId>::iterator conflictPosition = queue.begin(); + ResourceStatus resourceStatus = _conflictExists(requestor, mode, resId, + queue, conflictPosition); + if (kResourceAcquired == resourceStatus) { + _stats.incSame(); + ++_locks[*conflictPosition]->count; + return *conflictPosition; + } + + // create the lock request and add to TxId's set of lock requests + + // XXX should probably use placement operator new and manage LockRequest memory + LockRequest* lr = new LockRequest(requestor, mode, resId); + _locks[lr->lid] = lr; + + // add lock request to set of requests of requesting TxId + _xaLocks[requestor].insert(lr->lid); + + if (kResourceAvailable == resourceStatus) { + queue.insert(conflictPosition, lr->lid); + _addWaiters(lr, conflictPosition, queue.end()); + return lr->lid; + } + + // some type of conflict, insert after confictPosition + + verify(conflictPosition != queue.end()); + ++conflictPosition; + + if (kResourceUpgradeConflict == resourceStatus) { + queue.insert(conflictPosition, lr->lid); + } + else { + _addLockToQueueUsingPolicy(lr, queue, conflictPosition); + } +#if 0 + if (isExclusive(mode)) { + for (list<LockId>::iterator followers = conflictPosition; + followers != queue.end(); ++followers) { + LockRequest* nextLockRequest = _locks[*followers]; + if (nextLockRequest->xid == requestor) continue; + verify(nextLockRequest->isBlocked()); + } + } +#endif + // set remaining incompatible requests as lr's waiters + _addWaiters(lr, conflictPosition, queue.end()); + + + // call the sleep notification function once + if (NULL != sleepNotifier) { + // XXX should arg be xid of blocker? + (*sleepNotifier)(lr->xid); + } + + _stats.incBlocks(); + + // this loop typically executes once + do { + // set up for future deadlock detection add requestor to blockers' waiters + // + for (list<LockId>::iterator nextBlocker = queue.begin(); + nextBlocker != conflictPosition; ++nextBlocker) { + LockRequest* nextBlockingRequest = _locks[*nextBlocker]; + if (nextBlockingRequest->lid == lr->lid) {break;} + if (nextBlockingRequest->xid == requestor) {continue;} + if (isCompatible(_locks[*nextBlocker]->mode, lr->mode)) {continue;} + _addWaiter(_locks[*nextBlocker]->xid, requestor); + ++lr->sleepCount; + } + if (kResourcePolicyConflict == resourceStatus) { + // to facilitate waking once the policy reverts, add requestor to system's waiters + _addWaiter(kReservedTxId, requestor); + ++lr->sleepCount; + } + + // wait for blocker to release + while (lr->isBlocked()) { + Timer timer; + lr->lock.wait(guard); + _stats.incTimeBlocked(timer.millis()); + } + + conflictPosition = queue.begin(); + resourceStatus = _conflictExists(lr->xid, lr->mode, lr->resId, queue, conflictPosition); + } while (hasConflict(resourceStatus)); + + return lr->lid; + } + + /* + * called only when there are conflicting LockRequests + * positions a lock request (lr) in a queue at or after position + * also adds remaining requests in queue as lr's waiters + * for subsequent deadlock detection + */ + void LockManager::_addLockToQueueUsingPolicy(LockRequest* lr, + list<LockId>& queue, + list<LockId>::iterator& position) { + + if (position == queue.end()) { + queue.insert(position, lr->lid); + return; + } + + // use lock request's transaction's priority if specified + int txPriority = _getTransactionPriorityInternal(lr->xid); + if (txPriority > 0) { + for (; position != queue.end(); ++position) { + LockRequest* nextRequest = _locks[*position]; + if (txPriority > _getTransactionPriorityInternal(nextRequest->xid)) { + // add in front of request with lower priority that is either + // compatible, or blocked + // + queue.insert(position, lr->lid); + return; + } + } + queue.push_back(lr->lid); + return; + } + else if (txPriority < 0) { + // for now, just push to end + // TODO: honor position of low priority requests + queue.push_back(lr->lid); + } + + // use LockManager's default policy + switch (_policy) { + case kPolicyFirstCome: + queue.push_back(lr->lid); + position = queue.end(); + return; + case kPolicyReadersFirst: + if (isExclusive(lr->mode)) { + queue.push_back(lr->lid); + position = queue.end(); + return; + } + for (; position != queue.end(); ++position) { + LockRequest* nextRequest = _locks[*position]; + if (isExclusive(nextRequest->mode) && nextRequest->isBlocked()) { + // insert shared lock before first sleeping exclusive lock + queue.insert(position, lr->lid); + return; + } + } + break; + case kPolicyOldestTxFirst: + for (; position != queue.end(); ++position) { + LockRequest* nextRequest = _locks[*position]; + if (lr->xid < nextRequest->xid && + (isCompatible(lr->mode, nextRequest->mode) || nextRequest->isBlocked())) { + // smaller xid is older, so queue it before + queue.insert(position, lr->lid); + return; + } + } + break; + default: + break; + } + + queue.push_back(lr->lid); + position = queue.end(); + } + + void LockManager::_addWaiter(const TxId& blocker, const TxId& requestor) { + if (blocker == requestor) { + // can't wait on self + return; + } + + // add requestor to blocker's waiters + _waiters[blocker].insert(requestor); + + // add all of requestor's waiters to blocker's waiters + _waiters[blocker].insert(_waiters[requestor].begin(), _waiters[requestor].end()); + } + + void LockManager::_addWaiters(LockRequest* blocker, + list<LockId>::iterator nextLockId, + list<LockId>::iterator lastLockId) { + for (; nextLockId != lastLockId; ++nextLockId) { + LockRequest* nextLockRequest = _locks[*nextLockId]; + if (! isCompatible(blocker->mode, nextLockRequest->mode)) { + if (nextLockRequest->sleepCount > 0) { + _addWaiter(blocker->xid, nextLockRequest->xid); + ++nextLockRequest->sleepCount; + } + } + } + } + + bool LockManager::_comesBeforeUsingPolicy(const TxId& requestor, + const unsigned& mode, + const LockRequest* oldRequest) const { + + // handle special policies + if (kPolicyReadersOnly == _policy && kShared == mode && oldRequest->isBlocked()) + return true; + if (kPolicyWritersOnly == _policy && kExclusive == mode && oldRequest->isBlocked()) + return true; + + if (_getTransactionPriorityInternal(requestor) > + _getTransactionPriorityInternal(oldRequest->xid)) { + return true; + } + + switch (_policy) { + case kPolicyFirstCome: + return false; + case kPolicyReadersFirst: + return isShared(mode); + case kPolicyOldestTxFirst: + return requestor < oldRequest->xid; + default: + return false; + } + } + + LockManager::ResourceStatus LockManager::_conflictExists(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId, + list<LockId>& queue, + list<LockId>::iterator& nextLockId) { + + // handle READERS/kPolicyWritersOnly policy conflicts + if ((kPolicyReadersOnly == _policy && isExclusive(mode)) || + (kPolicyWritersOnly == _policy && isShared(mode))) { + + if (nextLockId == queue.end()) { return kResourcePolicyConflict; } + + // position past the last active lock request on the queue + list<LockId>::iterator lastActivePosition = queue.end(); + for (; nextLockId != queue.end(); ++nextLockId) { + LockRequest* nextLockRequest = _locks[*nextLockId]; + if (requestor == nextLockRequest->xid && mode == nextLockRequest->mode) { + return kResourceAcquired; // already have the lock + } + if (! nextLockRequest->isBlocked()) { + lastActivePosition = nextLockId; + } + } + if (lastActivePosition != queue.end()) { + nextLockId = lastActivePosition; + } + return kResourcePolicyConflict; + } + + // loop over the lock requests in the queue, looking for the 1st conflict + // normally, we'll leave the nextLockId iterator positioned at the 1st conflict + // if there is one, or the position (often the end) where we know there is no conflict. + // + // upgrades complicate this picture, because we want to position the iterator + // after all initial share locks. but we may not know whether an exclusived request + // is an upgrade until we look at all the initial share locks. + // + // so we record the position of the 1st conflict, but continue advancing the + // nextLockId iterator until we've seen all initial share locks. If none have + // the same TxId as the exclusive request, we restore the position to 1st conflict + // + list<LockId>::iterator firstConflict = queue.end(); // invalid + set<TxId> sharedOwners; // all initial share lock owners + bool alreadyHadLock = false; // true if we see a lock with the same Txid + + for (; nextLockId != queue.end(); ++nextLockId) { + + LockRequest* nextLockRequest = _locks[*nextLockId]; + + if (nextLockRequest->matches(requestor, mode, resId)) { + // if we're already on the queue, there's no conflict + return kResourceAcquired; + } + + if (requestor == nextLockRequest->xid) { + // an upgrade or downgrade request, can't conflict with ourselves + if (isShared(mode)) { + // downgrade + _stats.incDowngrades(); + ++nextLockId; + return kResourceAvailable; + } + + // upgrade + alreadyHadLock = true; + _stats.incUpgrades(); + // position after initial readers + continue; + } + + if (isShared(nextLockRequest->mode)) { + invariant(!nextLockRequest->isBlocked() || kPolicyWritersOnly == _policy); + + sharedOwners.insert(nextLockRequest->xid); + + if (isExclusive(mode) && firstConflict == queue.end()) { + // if "lr" proves not to be an upgrade, restore this position later + firstConflict = nextLockId; + } + // either there's no conflict yet, or we're not done checking for an upgrade + continue; + } + + // the next lock on the queue is an exclusive request + invariant(isExclusive(nextLockRequest->mode)); + + if (alreadyHadLock) { + // bumped into something incompatible while up/down grading + if (isExclusive(mode)) { + // upgrading: bumped into another exclusive lock + if (sharedOwners.find(nextLockRequest->xid) != sharedOwners.end()) { + // the exclusive lock is also an upgrade, and it must + // be blocked, waiting for our original share lock to be released + // if we wait for its shared lock, we would deadlock + // + invariant(nextLockRequest->isBlocked()); + _abortInternal(requestor); + } + + if (sharedOwners.empty()) { + // simple upgrade, queue in front of nextLockRequest, no conflict + return kResourceAvailable; + } + else { + // we have to wait for another shared lock before upgrading + return kResourceUpgradeConflict; + } + } + + // downgrading, bumped into an exclusive lock, blocked on our original + invariant (isShared(mode)); + invariant(nextLockRequest->isBlocked()); + // lr will be inserted before nextLockRequest + return kResourceAvailable; + } + else if (firstConflict != queue.end()) { + // restore first conflict position + nextLockId = firstConflict; + nextLockRequest = _locks[*nextLockId]; + } + + // no conflict if nextLock is blocked and we come before + if (nextLockRequest->isBlocked() && + _comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) { + return kResourceAvailable; + } + + return kResourceConflict; + } + + // positioned to the end of the queue + if (alreadyHadLock && isExclusive(mode) && !sharedOwners.empty()) { + // upgrading, queue consists of requestor's earlier share lock + // plus other share lock. Must wait for the others to release + return kResourceUpgradeConflict; + } + else if (firstConflict != queue.end()) { + nextLockId = firstConflict; + LockRequest* nextLockRequest = _locks[*nextLockId]; + + if (_comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) { + return kResourceAvailable; + } + + return kResourceConflict; + } + return kResourceAvailable; + } + + LockManager::LockStatus LockManager::_findLock(const TxId& holder, + const unsigned& mode, + const ResourceId& resId, + LockId* outLockId) const { + + *outLockId = kReservedLockId; // set invalid; + + // get iterator for resId's locks + ResourceLocks::const_iterator resourceLocks = _resourceLocks.find(resId); + if (resourceLocks == _resourceLocks.end()) { return kLockResourceNotFound; } + + // look for an existing lock request from holder in mode + for (list<LockId>::const_iterator nextLockId = resourceLocks->second.begin(); + nextLockId != resourceLocks->second.end(); ++nextLockId) { + LockRequest* nextLockRequest = _locks.at(*nextLockId); + if (nextLockRequest->xid == holder && nextLockRequest->mode == mode) { + *outLockId = nextLockRequest->lid; + return kLockFound; + } + } + return kLockModeNotFound; + } + + int LockManager::_getTransactionPriorityInternal(const TxId& xid) const { + map<TxId, int>::const_iterator txPriority = _txPriorities.find(xid); + if (txPriority == _txPriorities.end()) { + return 0; + } + return txPriority->second; + } + + /* + * Used by acquireOne + * XXX: there's overlap between this, _conflictExists and _findLock + */ + bool LockManager::_isAvailable(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId) const { + + // check for exceptional policies + if (kPolicyReadersOnly == _policy && isExclusive(mode)) + return false; + else if (kPolicyWritersOnly == _policy && isShared(mode)) + return false; + + ResourceLocks::const_iterator resLocks = _resourceLocks.find(resId); + if (resLocks == _resourceLocks.end()) { + return true; // no lock requests against this ResourceId, so must be available + } + + // walk over the queue of previous requests for this ResourceId + const list<LockId>& queue = resLocks->second; + for (list<LockId>::const_iterator nextLockId = queue.begin(); + nextLockId != queue.end(); ++nextLockId) { + + LockRequest* nextLockRequest = _locks.at(*nextLockId); + + if (nextLockRequest->matches(requestor, mode, resId)) { + // we're already have this lock, if we're asking, we can't be asleep + invariant(! nextLockRequest->isBlocked()); + return true; + } + + // no conflict if we're compatible + if (isCompatible(mode, nextLockRequest->mode)) continue; + + // no conflict if nextLock is blocked and we come before + if (nextLockRequest->isBlocked() && _comesBeforeUsingPolicy(requestor, mode, nextLockRequest)) + return true; + + return false; // we're incompatible and would block + } + + // everything on the queue (if anything is on the queue) is compatible + return true; + } + + LockManager::LockStatus LockManager::_releaseInternal(const LockId& lid) { + + if (kReservedLockId == lid) { return kLockIdNotFound; } + + LockRequest* lr = _locks[lid]; + if (NULL == lr) { return kLockIdNotFound; } + + const TxId holder = lr->xid; + const unsigned mode = lr->mode; + const ResourceId resId = lr->resId; + + ResourceLocks::iterator recordLocks = _resourceLocks.find(resId); + if (recordLocks == _resourceLocks.end()) { + return kLockResourceNotFound; + } + + bool foundLock = false; + bool foundResource = false; + + list<LockId>& queue = recordLocks->second; + list<LockId>::iterator nextLockId = queue.begin(); + + // find the position of the lock to release in the queue + for(; !foundLock && nextLockId != queue.end(); ++nextLockId) { + LockRequest* nextLock = _locks[*nextLockId]; + if (lid != *nextLockId) { + if (nextLock->xid == holder) { + foundResource = true; + } + } + else { + // this is our lock. + if (0 < --nextLock->count) { return kLockCountDecremented; } + + // release the lock + _xaLocks[holder].erase(*nextLockId); + _locks.erase(*nextLockId); + queue.erase(nextLockId++); + delete nextLock; + + foundLock = true; + break; // don't increment nextLockId again + } + } + + if (! foundLock) { + // can't release a lock that hasn't been acquired in the specified mode + return foundResource ? kLockModeNotFound : kLockResourceNotFound; + } + + if (isShared(mode)) { + // skip over any remaining shared requests. they can't be waiting for us. + for (; nextLockId != queue.end(); ++nextLockId) { + LockRequest* nextLock = _locks[*nextLockId]; + if (isExclusive(nextLock->mode)) { + break; + } + } + } + + // everything left on the queue potentially conflicts with the lock just + // released, unless it's an up/down-grade of that lock. So iterate, and + // when TxIds differ, decrement sleepCount, wake those with zero counts, and + // decrement their sleep counts, waking sleepers with zero counts, and + // cleanup state used for deadlock detection + + for (; nextLockId != queue.end(); ++nextLockId) { + LockRequest* nextSleeper = _locks[*nextLockId]; + if (nextSleeper->xid == holder) continue; + + invariant(nextSleeper->isBlocked()); + + // wake up sleepy heads + if (nextSleeper->shouldAwake()) { + nextSleeper->lock.notify_one(); + } + } +#if 0 + // verify stuff + if (_xaLocks[holder].empty()) { + for (WaitersMap::iterator dependencies = _waiters.begin(); + dependencies != _waiters.end(); ++dependencies) { + verify( dependencies->second.find(holder) == dependencies->second.end()); + } + } + + if (!queue.empty()) { + verify(! _locks[queue.front()]->isBlocked()); + } +#endif + return kLockReleased; + } + + void LockManager::_throwIfShuttingDown(const TxId& xid) const { + if (_shuttingDown && (_timer.millis() >= _millisToQuiesce || + _xaLocks.find(xid) == _xaLocks.end())) { + + throw AbortException(); // XXX should this be something else? ShutdownException? + } + } + + /*---------- ResourceLock functions ----------*/ + + ResourceLock::ResourceLock(LockManager& lm, + const TxId& requestor, + const uint32_t& mode, + const ResourceId& resId, + LockManager::Notifier* notifier) + : _lm(lm), + _lid(LockManager::kReservedLockId) // if acquire throws, we want this initialized + { + _lid = lm.acquire(requestor, mode, resId, notifier); + } + + ResourceLock::~ResourceLock() { + _lm.releaseLock(_lid); + } + +} // namespace mongo diff --git a/src/mongo/db/concurrency/lock_mgr.h b/src/mongo/db/concurrency/lock_mgr.h new file mode 100644 index 00000000000..451e88daabe --- /dev/null +++ b/src/mongo/db/concurrency/lock_mgr.h @@ -0,0 +1,621 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <iterator> +#include <list> +#include <map> +#include <set> +#include <string> +#include <vector> + +#include "mongo/platform/compiler.h" +#include "mongo/platform/cstdint.h" +#include "mongo/util/timer.h" + +/* + * LockManager controls access to resources through two functions: acquire and release + * + * Resources are either RecordStores, or Records within an RecordStore, identified by a ResourceId. + * Resources are acquired for either shared or exclusive use, by transactions identified by a TxId. + * Acquiring Records in any mode implies acquisition of the Record's RecordStore + * + * Contention for a resource is resolved by a LockingPolicy, which determines which blocked + * resource requests to awaken when the blocker releases the resource. + * + */ + +namespace mongo { +#if 1 + class TxId { + public: + TxId() : _xid(0) { } + TxId(size_t xid) : _xid(xid) { } + bool operator<(const TxId& other) const { return _xid < other._xid; } + bool operator==(const TxId& other) const { return _xid == other._xid; } + operator size_t() const { return _xid; } + + private: + size_t _xid; + }; + + class ResourceId { + public: + ResourceId() : _rid(0) { } + ResourceId(size_t rid) : _rid(rid) { } + bool operator<(const ResourceId& other) const { return _rid < other._rid; } + bool operator==(const ResourceId& other) const { return _rid == other._rid; } + operator size_t() const { return _rid; } + + private: + size_t _rid; + }; +#else + typedef size_t TxId; // identifies requesting transaction. 0 is reserved + typedef size_t ResourceId; // identifies requested resource. 0 is reserved +#endif + static const TxId kReservedTxId = 0; + static const ResourceId kReservedResourceId = 0; + + /** + * LockManager is used to control access to resources. Usually a singleton. For deadlock detection + * all resources used by a set of transactions that could deadlock, should use one LockManager. + * + * Primary functions are: + * acquire - acquire a resource for shared or exclusive use; may throw Abort. + * acquireOne - acquire one of a vector of resources, hopefully without blocking + * release - release a resource previously acquired for shared/exclusive use + */ + class LockManager { + public: + + /** + * thrown primarily when deadlocks are detected, or when LockManager::abort is called. + * also thrown when LockManager requests are made during shutdown. + */ + class AbortException : public std::exception { + public: + const char* what() const throw (); + }; + + /** + * LockModes: shared and exclusive + */ + enum LockModes { + kShared = 0x0, + kExclusive = 0x1 + }; + + /** + * Used to decide which blocked requests to honor first when resource becomes available + */ + enum Policy { + kPolicyFirstCome, // wake the first blocked request in arrival order + kPolicyReadersFirst, // wake the first blocked read request(s) + kPolicyOldestTxFirst, // wake the blocked request with the lowest TxId + kPolicyReadersOnly, // block write requests (used during fsync) + kPolicyWritersOnly // block read requests + }; + + /** + * returned by ::conflictExists, called from acquire + */ + enum ResourceStatus { + kResourceAcquired, // requested resource was already acquired, increment count + kResourceAvailable, // requested resource is available. no waiting + kResourceConflict, // requested resource is in use by another transaction + kResourcePolicyConflict, // requested mode blocked by READER/kPolicyWritersOnly policy + kResourceUpgradeConflict // requested resource was previously acquired for shared use + // now requested for exclusive use, but there are other shared + // users, so this request must wait. + }; + + /** + * returned by ::find_lock, and ::release and, mostly for testing + * explains why a lock wasn't found or released + */ + enum LockStatus { + kLockFound, // found a matching lock request + kLockReleased, // released requested lock + kLockCountDecremented, // decremented lock count, but didn't release + kLockIdNotFound, // no locks with this id + kLockResourceNotFound, // no locks on the resource + kLockModeNotFound // locks on the resource, but not of the specified mode + }; + + typedef size_t LockId; // valid LockIds are > 0 + static const LockId kReservedLockId = 0; + + /** + * Used to do something just before an acquisition request blocks. + * + * XXX: should perhaps define Notifier as a functor so C++11 lambda's match + * for the test.cpp, it was convenient for the call operator to access private + * state, which is why we're using the class formulation for now + */ + class Notifier { + public: + virtual ~Notifier() { } + virtual void operator()(const TxId& blocker) = 0; + }; + + /** + * Tracks locking statistics. For now, just aggregated across all resources/TxIds + * Eventually might keep per TxId and/or per Resource, to facilitate identifying + * hotspots and problem transactions. + */ + class LockStats { + public: + LockStats() + : _numRequests(0) + , _numPreexistingRequests(0) + , _numSameRequests(0) + , _numBlocks(0) + , _numDeadlocks(0) + , _numDowngrades(0) + , _numUpgrades(0) + , _numMillisBlocked(0) + , _numCurrentActiveReadRequests(0) + , _numCurrentActiveWriteRequests(0) { } + + void incRequests() { _numRequests++; } + void incPreexisting() { _numPreexistingRequests++; } + void incSame() { _numSameRequests++; } + void incBlocks() { _numBlocks++; } + void incDeadlocks() { _numDeadlocks++; } + void incDowngrades() { _numDowngrades++; } + void incUpgrades() { _numUpgrades++; } + void incTimeBlocked(size_t numMillis ) { _numMillisBlocked += numMillis; } + + void incStatsForMode(const unsigned mode) { + 0==mode ? incActiveReads() : incActiveWrites(); + } + void decStatsForMode(const unsigned mode) { + 0==mode ? decActiveReads() : decActiveWrites(); + } + + void incActiveReads() { _numCurrentActiveReadRequests++; } + void decActiveReads() { _numCurrentActiveReadRequests--; } + void incActiveWrites() { _numCurrentActiveWriteRequests++; } + void decActiveWrites() { _numCurrentActiveWriteRequests--; } + + unsigned numActiveReads() const { return _numCurrentActiveReadRequests; } + unsigned numActiveWrites() const { return _numCurrentActiveWriteRequests; } + + size_t getNumRequests() const { return _numRequests; } + size_t getNumPreexistingRequests() const { return _numPreexistingRequests; } + size_t getNumSameRequests() const { return _numSameRequests; } + size_t getNumBlocks() const { return _numBlocks; } + size_t getNumDeadlocks() const { return _numDeadlocks; } + size_t getNumDowngrades() const { return _numDowngrades; } + size_t getNumUpgrades() const { return _numUpgrades; } + size_t getNumMillisBlocked() const { return _numMillisBlocked; } + + std::string toString() const; + + private: + size_t _numRequests; + size_t _numPreexistingRequests; + size_t _numSameRequests; + size_t _numBlocks; + size_t _numDeadlocks; + size_t _numDowngrades; + size_t _numUpgrades; + size_t _numMillisBlocked; + unsigned long _numCurrentActiveReadRequests; + unsigned long _numCurrentActiveWriteRequests; + }; + + + public: + + /** + * Singleton factory - retrieves a common instance of LockManager + */ + static LockManager& getSingleton(); + + /** + * It's possibly useful to allow multiple LockManagers for non-overlapping sets + * of resources, so the constructor is left public. Eventually we may want + * to enforce a singleton pattern. + */ + explicit LockManager(const Policy& policy=kPolicyFirstCome); + ~LockManager(); + + /** + * Get the current policy + */ + Policy getPolicy() const; + + /** + * Who set the current policy. Of use when the Policy is ReadersOnly + * and we want to find out who is blocking a writer. + */ + TxId getPolicySetter() const; + + /** + * Initiate a shutdown, specifying a period of time to quiesce. + * + * During this period, existing transactions can continue to acquire resources, + * but new transaction requests will throw AbortException. + * + * After quiescing, any new requests will throw AbortException + */ + void shutdown(const unsigned& millisToQuiesce = 1000); + + + /** + * override default LockManager's default Policy for a transaction. + * + * positive priority moves transaction's resource requests toward the front + * of the queue, behind only those requests with higher priority. + * + * negative priority moves transaction's resource requests toward the back + * of the queue, ahead of only those requests with lower priority. + * + * zero priority uses the LockManager's default Policy + */ + void setTransactionPriority(const TxId& xid, int priority); + int getTransactionPriority(const TxId& xid) const; + + + /** + * acquire a resource in a mode. + * can throw AbortException + */ + LockId acquire(const TxId& requestor, + const uint32_t& mode, + const ResourceId& resId, + Notifier* notifier = NULL); + + /** + * for bulk operations: + * acquire one of a vector of ResourceIds in a mode, + * hopefully without blocking, return index of + * acquired ResourceId, or -1 if vector was empty + */ + int acquireOne(const TxId& requestor, + const uint32_t& mode, + const std::vector<ResourceId>& records, + Notifier* notifier = NULL); + + /** + * release a ResourceId. + * The mode here is just the mode that applies to the resId + */ + LockStatus release(const TxId& holder, + const uint32_t& mode, + const ResourceId& resId); + + /** + * releases the lock returned by acquire. should perhaps replace above? + */ + LockStatus releaseLock(const LockId& lid); + + /** + * release all resources acquired by a transaction + * returns number of locks released + */ + size_t release(const TxId& holder); + + /** + * called internally for deadlock + * possibly called publicly to stop a long transaction + * also used for testing + */ + MONGO_COMPILER_NORETURN void abort(const TxId& goner); + + /** + * returns a copy of the stats that exist at the time of the call + */ + LockStats getStats() const; + + + + // --- for testing and logging + + std::string toString() const; + + /** + * test whether a TxId has locked a ResourceId in a mode + */ + bool isLocked(const TxId& holder, + const unsigned& mode, + const ResourceId& resId) const; + + protected: + + /** + * Data structure used to record a resource acquisition request + */ + class LockRequest { + public: + LockRequest(const TxId& xid, + const unsigned& mode, + const ResourceId& resId); + + ~LockRequest(); + + bool matches(const TxId& xid, + const unsigned& mode, + const ResourceId& resId) const; + + bool isBlocked() const; + bool shouldAwake(); + + std::string toString() const; + + // uniquely identifies a LockRequest + const LockId lid; + + // transaction that made this request + const TxId xid; + + // shared or exclusive use + const unsigned mode; + + // resource requested + const ResourceId resId; + + // number of times xid requested this resource in this mode + // request will be deleted when count goes to 0 + size_t count; + + // number of existing things blocking this request + // usually preceding requests on the queue, but also policy + size_t sleepCount; + + // used for waiting and waking + boost::condition_variable lock; + }; + + typedef std::map<TxId, std::set<LockId> > TxLocks; + typedef std::map<ResourceId, std::list<LockId> > ResourceLocks; + typedef std::map<LockId, LockRequest*> LockMap; + typedef std::map<TxId, std::set<LockId> > TxLockMap; + + private: // alphabetical + + /** + * called by public ::abort and internally upon deadlock + * releases all locks acquired by goner, notify's any + * transactions that were waiting, then throws AbortException + */ + MONGO_COMPILER_NORETURN void _abortInternal(const TxId& goner); + + /** + * main workhorse for acquiring locks on resources, blocking + * or aborting on conflict + * + * returns a non-zero LockId, or throws AbortException on deadlock + * + */ + LockId _acquireInternal(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId, + Notifier* notifier, + boost::unique_lock<boost::mutex>& guard); + + /** + * adds a conflicting lock request to the list of requests for a resource + * using the Policy. Called by acquireInternal + */ + void _addLockToQueueUsingPolicy(LockRequest* lr, + std::list<LockId>& queue, + std::list<LockId>::iterator& position); + + /** + * set up for future deadlock detection, called from acquire + */ + void _addWaiter(const TxId& blocker, const TxId& waiter); + + /** + * when inserting a new lock request into the middle of a queue, + * add any remaining incompatible requests in the queue to the + * new lock request's set of waiters... for future deadlock detection + */ + void _addWaiters(LockRequest* blocker, + std::list<LockId>::iterator nextLockId, + std::list<LockId>::iterator lastLockId); + + /** + * returns true if a newRequest should be honored before an oldRequest according + * to the lockManager's policy. Used by acquire to decide whether a new share request + * conflicts with a previous upgrade-to-exclusive request that is blocked. + */ + bool _comesBeforeUsingPolicy(const TxId& newReqXid, + const unsigned& newReqMode, + const LockRequest* oldReq) const; + + /** + * determine whether a resource request would conflict with an existing lock + * set the position to the first possible insertion point, which is usually + * the position of the first conflict, or the end of the queue, or to an existing lock + */ + ResourceStatus _conflictExists(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId, + std::list<LockId>& queue, + std::list<LockId>::iterator& position /* in/out */); + + /** + * looks for an existing LockRequest that matches the four input params + * if not found, sets outLid to zero and returns a reason, otherwise + * sets outLid to the LockId that matches and returns kLockFound + */ + LockStatus _findLock(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId, + LockId* outLid) const; + + /** + * called externally by getTransactionPriority + * and internally by addLockToQueueUsingPolicy + */ + int _getTransactionPriorityInternal(const TxId& xid) const; + + /** + * returns true if acquire would return without waiting + * used by acquireOne + */ + bool _isAvailable(const TxId& requestor, + const unsigned& mode, + const ResourceId& resId) const; + + /** + * called by public ::release and internally by abort. + * assumes caller as acquired a mutex. + */ + LockStatus _releaseInternal(const LockId& lid); + + /** + * called at start of public APIs, throws exception + * if quiescing period has expired, or if xid is new + */ + void _throwIfShuttingDown(const TxId& xid = 0 ) const; + + + private: + + // Singleton instance + static boost::mutex _getSingletonMutex; + static LockManager* _singleton; + + // The Policy controls which requests should be honored first. This is + // used to guide the position of a request in a list of requests waiting for + // a resource. + // + // XXX At some point, we may want this to also guide the decision of which + // transaction to abort in case of deadlock. For now, the transaction whose + // request would lead to a deadlock is aborted. Since deadlocks are rare, + // careful choices may not matter much. + // + Policy _policy; + TxId _policySetter; + + // synchronizes access to the lock manager, which is shared across threads + mutable boost::mutex _mutex; + + // for blocking when setting kPolicyReadersOnly or kPolicyWritersOnly policy + boost::condition_variable _policyLock; + + // only meaningful when _policy == SHUTDOWN + bool _shuttingDown; + int _millisToQuiesce; + Timer _timer; + + // owns the LockRequest* + std::map<LockId, LockRequest*> _locks; + + // Lists of lock requests associated with a resource, + // + // The lock-request lists have two sections. Some number (at least one) of requests + // at the front of a list are "active". All remaining lock requests are blocked by + // some earlier (not necessarily active) lock request, and are waiting. The order + // of lock request in the waiting section is determined by the LockPolicty. + // The order of lock request in the active/front portion of the list is irrelevant. + // + std::map<ResourceId, std::list<LockId> > _resourceLocks; + + // For cleanup and abort processing, references all LockRequests made by a transaction + std::map<TxId, std::set<LockId> > _xaLocks; + + // For deadlock detection: the set of transactions blocked by another transaction + // NB: a transaction can only be directly waiting for a single resource/transaction + // but to facilitate deadlock detection, if T1 is waiting for T2 and T2 is waiting + // for T3, then both T1 and T2 are listed as T3's waiters. + std::map<TxId, std::set<TxId> > _waiters; + + // track transactions that have aborted, and don't accept further + // lock requests from them (which shouldn't happen anyway). + // + // XXX: this set can grow without bound in the pathological case. One way to deal + // with it is to track the oldest active transaction (which may or may not be the + // smallest TxId value), and flush older values from this set and ignore older values + // in new requests without consulting this set. + std::set<TxId> _abortedTxIds; + + // transaction priorities: + // 0 => neutral, use LockManager's default _policy + // + => high, queue forward + // - => low, queue back + // + std::map<TxId, int> _txPriorities; + + // stats, but also used internally + LockStats _stats; + }; + + /** + * RAII wrapper around LockManager, for scoped locking + */ + class ResourceLock { + public: + ResourceLock(LockManager& lm, + const TxId& requestor, + const uint32_t& mode, + const ResourceId& resId, + LockManager::Notifier* notifier = NULL); + + ~ResourceLock(); + private: + LockManager& _lm; + LockManager::LockId _lid; + }; + + class SharedResourceLock : public ResourceLock { + public: + SharedResourceLock(const TxId& requestor, void* resource) + : ResourceLock(LockManager::getSingleton(), + requestor, + LockManager::kShared, + (size_t)resource) { } + SharedResourceLock(const TxId& requestor, size_t resource) + : ResourceLock(LockManager::getSingleton(), + requestor, + LockManager::kShared, + resource) { } + }; + + class ExclusiveResourceLock : public ResourceLock { + public: + ExclusiveResourceLock(const TxId& requestor, void* resource) + : ResourceLock(LockManager::getSingleton(), + requestor, + LockManager::kExclusive, + (size_t)resource) { } + ExclusiveResourceLock(const TxId& requestor, size_t resource) + : ResourceLock(LockManager::getSingleton(), + requestor, + LockManager::kExclusive, + resource) { } + }; +} // namespace mongo diff --git a/src/mongo/db/concurrency/lock_mgr_test.cpp b/src/mongo/db/concurrency/lock_mgr_test.cpp new file mode 100644 index 00000000000..06840a41cb3 --- /dev/null +++ b/src/mongo/db/concurrency/lock_mgr_test.cpp @@ -0,0 +1,896 @@ +/** +* Copyright (C) MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +/** + * tests for util/concurrency/lock_mgr.* capabilities + * + * Testing concurrency requires multiple threads. In this test, these are packaged as + * instances of a ClientTransaction class. The test's main thread creates instances + * of ClientTransactions and communicates with them through a pair of producer/consumer + * buffers. The driver thread sends requests to ClientTransaction threads using a + * TxCommandBuffer, and waits for responses in a TxResponseBuffer. This protocol + * allows precise control over timing. + * + * The producer/consumer buffer is possibly overkill. At present there is only + * one producer and one consumer using any one buffer. Also, the current set of + * tests are 'lock-step', with the driver never issuing more than one command + * before waiting for a response. + */ + +#include <boost/thread/thread.hpp> +#include "mongo/unittest/unittest.h" +#include "mongo/db/concurrency/lock_mgr.h" +#include "mongo/util/log.h" + +namespace mongo { + + enum TxCmd { + ACQUIRE, + RELEASE, + ABORT, + POLICY, + QUIT, + INVALID + }; + + enum TxRsp { + ACQUIRED, + RELEASED, + BLOCKED, + AWAKENED, + ABORTED, + INVALID_RESPONSE + }; + + class TxResponse { + public: + TxRsp rspCode; + TxId xid; + unsigned mode; + ResourceId resId; + }; + + class TxRequest { + public: + TxCmd cmd; + TxId xid; + unsigned mode; + ResourceId resId; + LockManager::Policy policy; + }; + + class TxResponseBuffer { + public: + TxResponseBuffer() : _count(0), _readPos(0), _writePos(0) { } + + void post(const TxRsp& rspCode) { + boost::unique_lock<boost::mutex> guard(_guard); + while (_count == 10) + _full.wait(guard); + buffer[_writePos++].rspCode = rspCode; + _writePos %= 10; + _count++; + _empty.notify_one(); + } + + TxResponse* consume() { + boost::unique_lock<boost::mutex> guard(_guard); + while (_count == 0) + _empty.wait(guard); + TxResponse* result = &buffer[_readPos++]; + _readPos %= 10; + _count--; + _full.notify_one(); + return result; + } + + boost::mutex _guard; + boost::condition_variable _full; + boost::condition_variable _empty; + + size_t _count; + size_t _readPos; + size_t _writePos; + + TxResponse buffer[10]; + }; + +class TxCommandBuffer { +public: + TxCommandBuffer() : _count(0), _readPos(0), _writePos(0) { } + + void post(const TxCmd& cmd, + const TxId& xid = 0, + const unsigned& mode = 0, + const ResourceId& resId = 0, + const LockManager::Policy& policy = LockManager::kPolicyFirstCome) { + boost::unique_lock<boost::mutex> guard(_guard); + while (_count == 10) + _full.wait(guard); + buffer[_writePos].cmd = cmd; + buffer[_writePos].xid = xid; + buffer[_writePos].mode = mode; + buffer[_writePos].resId = resId; + buffer[_writePos].policy = policy; + _writePos++; + _writePos %= 10; + _count++; + _empty.notify_one(); + } + + TxRequest* consume() { + boost::unique_lock<boost::mutex> guard(_guard); + while (_count == 0) + _empty.wait(guard); + TxRequest* result = &buffer[_readPos++]; + _readPos %= 10; + _count--; + _full.notify_one(); + return result; + } + + boost::mutex _guard; + boost::condition_variable _full; + boost::condition_variable _empty; + + size_t _count; + size_t _readPos; + size_t _writePos; + + TxRequest buffer[10]; +}; + +class ClientTransaction : public LockManager::Notifier { +public: + // these are called in the main driver program + + ClientTransaction(LockManager* lm, const TxId& xid) : _lm(lm), _xid(xid), _thr(&ClientTransaction::processCmd, this) { } + virtual ~ClientTransaction() { _thr.join(); } + + void acquire(const unsigned& mode, const ResourceId resId, const TxRsp& rspCode) { + _cmd.post(ACQUIRE, _xid, mode, 1, resId); + TxResponse* rsp = _rsp.consume(); + ASSERT(rspCode == rsp->rspCode); + } + + void release(const unsigned& mode, const ResourceId resId) { + _cmd.post(RELEASE, _xid, mode, 1, resId); + TxResponse* rsp = _rsp.consume(); + ASSERT(RELEASED == rsp->rspCode); + } + + void abort() { + _cmd.post(ABORT, _xid); + TxResponse* rsp = _rsp.consume(); + ASSERT(ABORTED == rsp->rspCode); + } + + void wakened() { + TxResponse* rsp = _rsp.consume(); + ASSERT(ACQUIRED == rsp->rspCode); + } + + void setPolicy(const LockManager::Policy& policy, const TxRsp& rspCode) { + _cmd.post(POLICY, _xid, 0, 0, 0, policy); + TxResponse* rsp = _rsp.consume(); + ASSERT(rspCode == rsp->rspCode); + } + + void quit() { + _cmd.post(QUIT); + } + + // these are run within the client threads + void processCmd() { + bool more = true; + while (more) { + TxRequest* req = _cmd.consume(); + switch (req->cmd) { + case ACQUIRE: + try { + _lm->acquire(_xid, req->mode, req->resId, this); + _rsp.post(ACQUIRED); + } catch (const LockManager::AbortException& err) { + _rsp.post(ABORTED); +// log() << "t" << _xid << ": aborted, ending" << endl; + return; + } + break; + case RELEASE: + _lm->release(_xid, req->mode, req->resId); + _rsp.post(RELEASED); + break; + case ABORT: + try { + _lm->abort(_xid); + } catch (const LockManager::AbortException& err) { + _rsp.post(ABORTED); + } + break; + case POLICY: + try { + _lm->setPolicy(_xid, req->policy, this); + _rsp.post(ACQUIRED); + } catch( const LockManager::AbortException& err) { + _rsp.post(ABORTED); + } + break; + case QUIT: + default: + more = false; + break; + } + } + } + + // inherited from Notifier, used by LockManager::acquire + virtual void operator()(const TxId& blocker) { + _rsp.post(BLOCKED); + } + +private: + TxCommandBuffer _cmd; + TxResponseBuffer _rsp; + LockManager* _lm; + TxId _xid; + boost::thread _thr; + +}; + +TEST(LockManagerTest, TxError) { + LockManager lm; + LockManager::LockStatus status; + + // release a lock on a container we haven't locked + lm.release(1, LockManager::kShared, 0, 1); + + + // release a lock on a record we haven't locked + status = lm.release(1, LockManager::kShared, 0, 1); + ASSERT(LockManager::kLockContainerNotFound == status); + + + // release a lock on a record we haven't locked in a store we have locked + lm.acquire(1, LockManager::kShared, 0, 2); + status = lm.release(1, LockManager::kShared, 0, 1); // this is in error + ASSERT(LockManager::kLockResourceNotFound == status); + status = lm.release(1, LockManager::kShared, 0, 2); + ASSERT(LockManager::kLockReleased == status); + + // release a record we've locked in a different mode + lm.acquire(1, LockManager::kShared, 0, 1); + status = lm.release(1, LockManager::kExclusive, 0, 1); // this is in error + ASSERT(LockManager::kLockModeNotFound == status); + status = lm.release(1, LockManager::kShared, 0, 1); + ASSERT(LockManager::kLockReleased == status); + + lm.acquire(1, LockManager::kExclusive, 0, 1); + status = lm.release(1, LockManager::kShared, 0, 1); // this is in error + ASSERT(LockManager::kLockModeNotFound == status); + status = lm.release(1, LockManager::kExclusive, 0, 1); + ASSERT(LockManager::kLockReleased == status); + + // attempt to acquire on a transaction that aborted + try { + lm.abort(1); + } catch (const LockManager::AbortException& err) { } + try { + lm.acquire(1, LockManager::kShared, 0, 1); // error + ASSERT(false); + } catch (const LockManager::AbortException& error) { + } +} + +TEST(LockManagerTest, SingleTx) { + LockManager lm; + ResourceId store = 1; + TxId t1 = 1; + ResourceId r1 = 1; + LockManager::LockStatus status; + + // acquire a shared record lock + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + lm.acquire(t1, LockManager::kShared, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + + // release a shared record lock + lm.release(t1, LockManager::kShared, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + + // acquire a shared record lock twice, on same ResourceId + lm.acquire(t1, LockManager::kShared, store, r1); + lm.acquire(t1, LockManager::kShared, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + + // release the twice-acquired lock, once. Still locked + status = lm.release(t1, LockManager::kShared, store, r1); + ASSERT(LockManager::kLockCountDecremented == status); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + + // after 2nd release, it's not locked + status = lm.release(t1, LockManager::kShared, store, r1); + ASSERT(LockManager::kLockReleased == status); + ASSERT(!lm.isLocked(t1, LockManager::kShared, store, r1)); + + + + // --- test downgrade and release --- + + // acquire an exclusive then a shared lock, on the same ResourceId + lm.acquire(t1, LockManager::kExclusive, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1)); + lm.acquire(t1, LockManager::kShared, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + + // release shared first, then exclusive + lm.release(t1, LockManager::kShared, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1)); + lm.release(t1, LockManager::kExclusive, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1)); + + // release exclusive first, then shared + lm.acquire(t1, LockManager::kExclusive, store, r1); + lm.acquire(t1, LockManager::kShared, store, r1); + lm.release(t1, LockManager::kExclusive, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + lm.release(t1, LockManager::kShared, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + + + + // --- test upgrade and release --- + + // acquire a shared, then an exclusive lock on the same ResourceId + lm.acquire(t1, LockManager::kShared, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + lm.acquire(t1, LockManager::kExclusive, store, r1); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1)); + + // release exclusive first, then shared + lm.release(t1, LockManager::kExclusive, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kShared, store, r1)); + lm.release(t1, LockManager::kShared, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + + // release shared first, then exclusive + lm.acquire(t1, LockManager::kShared, store, r1); + lm.acquire(t1, LockManager::kExclusive, store, r1); + lm.release(t1, LockManager::kShared, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kShared, store, r1)); + ASSERT(lm.isLocked(t1, LockManager::kExclusive, store, r1)); + lm.release(t1, LockManager::kExclusive, store, r1); + ASSERT(! lm.isLocked(t1, LockManager::kExclusive, store, r1)); +} + +TEST(LockManagerTest, TxConflict) { + LockManager lm; + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + + // no conflicts with shared locks on same/different objects + + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 2, ACQUIRED); + t1.acquire(LockManager::kShared, 2, ACQUIRED); + t2.acquire(LockManager::kShared, 1, ACQUIRED); + + t1.release(LockManager::kShared, 1); + t1.release(LockManager::kShared, 2); + t2.release(LockManager::kShared, 1); + t2.release(LockManager::kShared, 2); + + + // no conflicts with exclusive locks on different objects + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kExclusive, 2, ACQUIRED); + t1.release(LockManager::kExclusive, 1); + t2.release(LockManager::kExclusive, 2); + + + // shared then exclusive conflict + t1.acquire(LockManager::kShared, 1, ACQUIRED); + // t2's request is incompatible with t1's lock, so it should block + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kShared, 1); + t2.wakened(); // with t1's lock released, t2 should wake + t2.release(LockManager::kExclusive, 1); + + // exclusive then shared conflict + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + t2.wakened(); + t2.release(LockManager::kShared, 1); + + // exclusive then exclusive conflict + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + t2.wakened(); + t2.release(LockManager::kExclusive, 1); + + t1.quit(); + t2.quit(); +} + +TEST(LockManagerTest, TxDeadlock) { + LockManager lm(LockManager::kPolicyReadersFirst); + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + + ClientTransaction a1(&lm, 4); + ClientTransaction a2(&lm, 5); + ClientTransaction a3(&lm, 6); + ClientTransaction a4(&lm, 7); + ClientTransaction a5(&lm, 8); + + // simple deadlock test 1 + t1.acquire(LockManager::kShared, 1, ACQUIRED); + a1.acquire(LockManager::kShared, 2, ACQUIRED); + t1.acquire(LockManager::kExclusive, 2, BLOCKED); + // a1's request would form a dependency cycle, so it should abort + a1.acquire(LockManager::kExclusive, 1, ABORTED); + t1.wakened(); // with t2's locks released, t1 should wake + t1.release(LockManager::kExclusive, 2); + t1.release(LockManager::kShared, 1); + + // simple deadlock test 2 + a2.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 2, ACQUIRED); + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + // a2's request would form a dependency cycle, so it should abort + a2.acquire(LockManager::kExclusive, 2, ABORTED); + t2.wakened(); // with a2's locks released, t2 should wake + t2.release(LockManager::kExclusive, 1); + t2.release(LockManager::kShared, 2); + + // three way deadlock + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 2, ACQUIRED); + a3.acquire(LockManager::kShared, 3, ACQUIRED); + t1.acquire(LockManager::kExclusive, 2, BLOCKED); + t2.acquire(LockManager::kExclusive, 3, BLOCKED); + // a3's request would form a dependency cycle, so it should abort + a3.acquire(LockManager::kExclusive, 1, ABORTED); + t2.wakened(); // with a3's lock release, t2 should wake + t2.release(LockManager::kShared, 2); + t1.wakened(); // with t2's locks released, t1 should wake + t2.release(LockManager::kExclusive, 3); + t1.release(LockManager::kShared, 1); + t1.release(LockManager::kExclusive, 2); + + // test for phantom deadlocks + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kShared, 1); + t2.wakened(); + // at this point, t2 should no longer be waiting for t1 + // so it should be OK for t1 to wait for t2 + t1.acquire(LockManager::kShared, 1, BLOCKED); + t2.release(LockManager::kExclusive, 1); + t1.wakened(); + t1.release(LockManager::kShared, 1); + + // test for missing deadlocks + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a4 + t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on t1 + // after this, because readers first policy, t2 should + // also be waiting on a4. + a4.acquire(LockManager::kShared, 1, ACQUIRED); + // after this, t2 should be waiting ONLY on a4 + t1.release(LockManager::kShared, 1); + // So a4 should not be allowed to wait on t2's resource. + a4.acquire(LockManager::kExclusive, 2, ABORTED); + t2.wakened(); + t2.release(LockManager::kShared, 2); + t2.release(LockManager::kExclusive, 1); + + + // test for missing deadlocks: due to downgrades + a5.acquire(LockManager::kExclusive, 1, ACQUIRED); + a5.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 2, ACQUIRED); // setup for deadlock with a5 + t2.acquire(LockManager::kExclusive, 1, BLOCKED); // block on a5 + a5.release(LockManager::kExclusive, 1); + // at this point, t2 should still be blocked on a5's downgraded lock + // So a5 should not be allowed to wait on t2's resource. + a5.acquire(LockManager::kExclusive, 2, ABORTED); + t2.wakened(); + t2.release(LockManager::kShared, 2); + t2.release(LockManager::kExclusive, 1); + + t1.quit(); + t2.quit(); +} + +TEST(LockManagerTest, TxDowngrade) { + LockManager lm; + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade + // t1 still has exclusive on resource 1, so t2 must wait + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + t2.wakened(); // with the exclusive lock released, t2 wakes + t1.release(LockManager::kShared, 1); + t2.release(LockManager::kShared, 1); + + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t1.acquire(LockManager::kShared, 1, ACQUIRED); // downgrade + // t1 still has exclusive on resource 1, so t2 must wait + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kShared, 1); + // with t1 still holding exclusive on resource 1, t2 still blocked + t1.release(LockManager::kExclusive, 1); + t2.wakened(); // with the exclusive lock released, t2 wakes + t2.release(LockManager::kShared, 1); + + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + // t1 has exclusive on resource 1, so t2 must wait + t2.acquire(LockManager::kShared, 1, BLOCKED); + // even though t2 is waiting for resource 1, t1 can still use it shared, + // because it already owns exclusive lock and can't block on itself + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t1.release(LockManager::kExclusive, 1); + t2.wakened(); // with the exclusive lock released, t2 wakes + t1.release(LockManager::kShared, 1); + t2.release(LockManager::kShared, 1); + + // t2 acquires exclusive during t1's downgrade + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + t1.release(LockManager::kShared, 1); + t2.wakened(); + t2.release(LockManager::kExclusive, 1); + + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t1.release(LockManager::kExclusive, 1); + t1.release(LockManager::kShared, 1); + t2.wakened(); + t2.release(LockManager::kExclusive, 1); + + t1.quit(); + t2.quit(); +} + +TEST(LockManagerTest, TxUpgrade) { + LockManager lm(LockManager::kPolicyReadersFirst); + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + ClientTransaction t3(&lm, 3); + + ClientTransaction a2(&lm, 4); + ClientTransaction a3(&lm, 5); + + // test upgrade succeeds, blocks subsequent reads + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); // upgrade + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + t2.wakened(); + t1.release(LockManager::kShared, 1); + t2.release(LockManager::kShared, 1); + + // test upgrade blocks, then wakes + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, ACQUIRED); + // t1 can't use resource 1 exclusively yet, because t2 is using it + t1.acquire(LockManager::kExclusive, 1, BLOCKED); + t2.release(LockManager::kShared, 1); + t1.wakened(); // with t2's shared lock released, t1 wakes + t1.release(LockManager::kExclusive, 1); + t1.release(LockManager::kShared, 1); + + // test upgrade blocks on several, then wakes + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, ACQUIRED); + // t1 can't use resource 1 exclusively yet, because t2 is using it + t1.acquire(LockManager::kExclusive, 1, BLOCKED); + t3.acquire(LockManager::kShared, 1, ACQUIRED); // additional blocker + t2.release(LockManager::kShared, 1); // t1 still blocked + t3.release(LockManager::kShared, 1); + t1.wakened(); // with t3's shared lock released, t1 wakes + t1.release(LockManager::kExclusive, 1); + t1.release(LockManager::kShared, 1); + + // failure to upgrade + t1.acquire(LockManager::kShared, 1, ACQUIRED); + a2.acquire(LockManager::kShared, 1, ACQUIRED); + t1.acquire(LockManager::kExclusive, 1, BLOCKED); + a2.acquire(LockManager::kExclusive, 1, ABORTED); + // with a2's abort, t1 can wake + t1.wakened(); + t1.release(LockManager::kShared, 1); + t1.release(LockManager::kExclusive, 1); + + // failure to upgrade + t1.acquire(LockManager::kShared, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, ACQUIRED); + t1.acquire(LockManager::kExclusive, 1, BLOCKED); + a3.acquire(LockManager::kShared, 1, ACQUIRED); + t2.release(LockManager::kShared, 1); // t1 still blocked on a3 + a3.acquire(LockManager::kExclusive, 1, ABORTED); + + t1.quit(); + t2.quit(); + t3.quit(); +} + +TEST(LockManagerTest, TxPolicy) { + + { + // Test FirstComeFirstServe policy + LockManager lm_first; + ClientTransaction t1(&lm_first, 1); + ClientTransaction t2(&lm_first, 2); + ClientTransaction t3(&lm_first, 3); + // test1 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + // t2 should wake first, because its request came before t3's + t2.wakened(); + t2.release(LockManager::kShared, 1); + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + + // test2 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + // t3 should wake first, because its request came before t2's + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + t2.wakened(); + t2.release(LockManager::kShared, 1); + + t1.quit(); + t2.quit(); + t3.quit(); + } + + { + // Test kPolicyReadersFirst + // shared request are considered read requests + + LockManager lm_readers(LockManager::kPolicyReadersFirst); + ClientTransaction t1(&lm_readers, 1); + ClientTransaction t2(&lm_readers, 2); + ClientTransaction t3(&lm_readers, 3); + + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + + // t2 should wake first, even though t3 came first in time + // because t2 is a reader and t3 is a writer + t2.wakened(); + t2.release(LockManager::kShared, 1); + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + + t1.quit(); + t2.quit(); + t3.quit(); + } + + { + // Test OLDEST_TX_FIRST policy + // for now, smaller TxIds are considered older + + LockManager lm_oldest(LockManager::kPolicyOldestTxFirst); + ClientTransaction t1(&lm_oldest, 1); + ClientTransaction t2(&lm_oldest, 2); + ClientTransaction t3(&lm_oldest, 3); + + // test 1 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + + // t2 should wake first, even though t3 came first in time + // because t2 is older than t3 + t2.wakened(); + t2.release(LockManager::kShared, 1); + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + + // test 2 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + + // t2 should wake first, because it's older than t3 + t2.wakened(); + t2.release(LockManager::kShared, 1); + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + + t1.quit(); + t2.quit(); + t3.quit(); + } + + { + LockManager lm_blockers(LockManager::kPolicyBlockersFirst); + ClientTransaction t1(&lm_blockers, 1); + ClientTransaction t2(&lm_blockers, 2); + ClientTransaction t3(&lm_blockers, 3); + ClientTransaction t4(&lm_blockers, 4); + + // BIGGEST_BLOCKER_FIRST policy + + // set up t3 as the biggest blocker + t3.acquire(LockManager::kExclusive, 2, ACQUIRED); + t4.acquire(LockManager::kExclusive, 2, BLOCKED); + + // test 1 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + // t3 should wake first, because it's a bigger blocker than t2 + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + t2.wakened(); + t2.release(LockManager::kShared, 1); + + // test 2 + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + t2.acquire(LockManager::kShared, 1, BLOCKED); + t3.acquire(LockManager::kExclusive, 1, BLOCKED); + t1.release(LockManager::kExclusive, 1); + // t3 should wake first, even though t2 came first, + // because it's a bigger blocker than t2 + t3.wakened(); + t3.release(LockManager::kExclusive, 1); + t2.wakened(); + t2.release(LockManager::kShared, 1); + + t3.release(LockManager::kExclusive, 2); + t4.wakened(); + t4.release(LockManager::kExclusive, 2); + + t1.quit(); + t2.quit(); + t3.quit(); + t4.quit(); + } +} + +/* + * test kPolicyReadersOnly and kPolicyWritersOnly + */ +TEST(LockManagerTest, TxOnlyPolicies) { + LockManager lm; + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + ClientTransaction t3(&lm, 3); + ClientTransaction t4(&lm, 4); + ClientTransaction t5(&lm, 5); + ClientTransaction tp(&lm, 6); + + // show kPolicyReadersOnly blocking writers, which + // awake when policy reverts + t1.acquire(LockManager::kShared, 1, ACQUIRED); + tp.setPolicy(LockManager::kPolicyReadersOnly, ACQUIRED); + t3.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict + t4.acquire(LockManager::kExclusive, 1, BLOCKED); // both policy & t1 + t5.acquire(LockManager::kShared, 1, ACQUIRED); // even tho t4 + tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED); + t3.wakened(); + t3.release(LockManager::kExclusive, 2); + t1.release(LockManager::kShared, 1); + t5.release(LockManager::kShared, 1); + t4.wakened(); + t4.release(LockManager::kExclusive, 1); + + // show WRITERS_ONLY blocking readers, which + // awake when policy reverts + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + tp.setPolicy(LockManager::kPolicyWritersOnly, ACQUIRED); + t3.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict + t4.acquire(LockManager::kShared, 1, BLOCKED); // both policy & t1 + t1.release(LockManager::kExclusive, 1); + t5.acquire(0x3/*LockManager::kExclusive*/, 2, ACQUIRED); // even tho t3 + t5.release(LockManager::kExclusive, 2); + tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED); + t3.wakened(); + t3.release(LockManager::kShared, 2); + t4.wakened(); + t4.release(LockManager::kShared, 1); + + // show READERS_ONLY blocked by existing writer + // but still blocking new writers + t1.acquire(LockManager::kExclusive, 1, ACQUIRED); + tp.setPolicy(LockManager::kPolicyReadersOnly, BLOCKED); // blocked by t1 + t2.acquire(LockManager::kExclusive, 2, BLOCKED); // just policy conflict + t3.acquire(LockManager::kShared, 2, ACQUIRED); // even tho t2 + t3.release(LockManager::kShared, 2); + t1.release(LockManager::kExclusive, 1); + tp.wakened(); + tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED); + t2.wakened(); + t2.release(LockManager::kExclusive, 2); + + // show WRITERS_ONLY blocked by existing reader + // but still blocking new readers + t1.acquire(LockManager::kShared, 1, ACQUIRED); + tp.setPolicy(LockManager::kPolicyWritersOnly, BLOCKED); // blocked by t1 + t2.acquire(LockManager::kShared, 2, BLOCKED); // just policy conflict + t1.release(LockManager::kShared, 1); + tp.wakened(); + tp.setPolicy(LockManager::kPolicyReadersFirst, ACQUIRED); + t2.wakened(); + t2.release(LockManager::kShared, 2); + + t1.quit(); + t2.quit(); + t3.quit(); + t4.quit(); + t5.quit(); + tp.quit(); +} + +TEST(LockManagerTest, TxShutdown) { + LockManager lm; + ClientTransaction t1(&lm, 1); + ClientTransaction t2(&lm, 2); + + t1.acquire(LockManager::kShared, 1, ACQUIRED); + lm.shutdown(3000); + + // t1 can still do work while quiescing + t1.release(LockManager::kShared, 1); + t1.acquire(LockManager::kShared, 2, ACQUIRED); + + // t2 is new and should be refused + t2.acquire(LockManager::kShared, 3, ABORTED); + + // after the quiescing period, t1's request should be refused + sleep(3); + t1.acquire(LockManager::kShared, 4, ABORTED); +} +} diff --git a/src/mongo/db/d_concurrency.cpp b/src/mongo/db/d_concurrency.cpp index c13ea3e9866..a8651645303 100644 --- a/src/mongo/db/d_concurrency.cpp +++ b/src/mongo/db/d_concurrency.cpp @@ -28,6 +28,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/d_concurrency.h" #include "mongo/db/client.h" @@ -36,6 +38,7 @@ #include "mongo/db/d_globals.h" #include "mongo/db/lockstat.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/server_parameters.h" #include "mongo/server.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/mapsf.h" @@ -402,7 +405,46 @@ namespace mongo { } } - void Lock::DBWrite::lockOther(const StringData& db) { + void Lock::DBWrite::lockOtherRead(const StringData& db) { + fassert(18517, !db.empty()); + + // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work. + if( _lockState->otherCount() ) { + // nested. prev could be read or write. if/when we do temprelease with DBRead/DBWrite we will need to increment/decrement here + // (so we can not release or assert if nested). temprelease we should avoid if we can though, it's a bit of an anti-pattern. + massert(18513, + str::stream() << "internal error tried to lock two databases at the same time. old:" + << _lockState->otherName() << " new:" << db, + db == _lockState->otherName()); + return; + } + + // first lock for this db. check consistent order with local db lock so we never deadlock. local always comes last + massert(18514, + str::stream() << "can't dblock:" << db + << " when local or admin is already locked", + _lockState->nestableCount() == 0); + + if (db != _lockState->otherName()) { + DBLocksMap::ref r(dblocks); + WrapperForRWLock*& lock = r[db]; + if (lock == NULL) { + lock = new WrapperForRWLock(db); + } + + _lockState->lockedOther(db, -1, lock); + } + else { + DEV OCCASIONALLY{ dassert(dblocks.get(db) == _lockState->otherLock()); } + _lockState->lockedOther(-1); + } + + fassert(18515, _weLocked == 0); + _lockState->otherLock()->lock_shared(); + _weLocked = _lockState->otherLock(); + } + + void Lock::DBWrite::lockOtherWrite(const StringData& db) { fassert(16252, !db.empty()); // we do checks first, as on assert destructor won't be called so don't want to be half finished with our work. @@ -470,8 +512,16 @@ namespace mongo { _locked_W = true; return; } - if( !nested ) - lockOther(db); + + if (!nested) { + if (_isIntentWrite) { + lockOtherRead(db); + } + else { + lockOtherWrite(db); + } + } + lockTop(); if( nested ) lockNestable(nested); @@ -496,9 +546,12 @@ namespace mongo { lockNestable(nested); } - Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns) - : ScopedLock(lockState, 'w' ), _what(ns.toString()), _nested(false) { - lockDB( _what ); + Lock::DBWrite::DBWrite(LockState* lockState, const StringData& ns, bool intentWrite) + : ScopedLock(lockState, 'w'), + _isIntentWrite(intentWrite), + _what(ns.toString()), + _nested(false) { + lockDB(_what); } Lock::DBRead::DBRead(LockState* lockState, const StringData& ns) @@ -753,4 +806,10 @@ namespace mongo { } lockStatsServerStatusSection; + + // This startup parameter enables experimental document-level locking features, which work + // for update-in-place changes only (i.e., no index updates and no document growth or + // movement). It should be removed once full document-level locking is checked-in. + MONGO_EXPORT_STARTUP_SERVER_PARAMETER(useExperimentalDocLocking, bool, false); + } diff --git a/src/mongo/db/d_concurrency.h b/src/mongo/db/d_concurrency.h index 4696626b704..a11351b5a84 100644 --- a/src/mongo/db/d_concurrency.h +++ b/src/mongo/db/d_concurrency.h @@ -170,7 +170,8 @@ namespace mongo { void lockTop(); void lockNestable(Nestable db); - void lockOther(const StringData& db); + void lockOtherWrite(const StringData& db); + void lockOtherRead(const StringData& db); void lockDB(const std::string& ns); void unlockDB(); @@ -179,12 +180,13 @@ namespace mongo { void _relock(); public: - DBWrite(LockState* lockState, const StringData& dbOrNs); + DBWrite(LockState* lockState, const StringData& dbOrNs, bool intentWrite = false); virtual ~DBWrite(); private: bool _locked_w; bool _locked_W; + bool _isIntentWrite; WrapperForRWLock *_weLocked; const std::string _what; bool _nested; @@ -248,4 +250,7 @@ namespace mongo { ~writelocktry(); bool got() const { return _got; } }; + + // Defined in instance.cpp + extern bool useExperimentalDocLocking; } diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index b70210c4ef2..17111a87b73 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -605,7 +605,7 @@ namespace mongo { UpdateExecutor executor(&request, &op.debug()); uassertStatusOK(executor.prepare()); - Lock::DBWrite lk(txn->lockState(), ns.ns()); + Lock::DBWrite lk(txn->lockState(), ns.ns(), useExperimentalDocLocking); // if this ever moves to outside of lock, need to adjust check // Client::Context::_finishInit diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index f3d21129fb3..963109b2537 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -56,6 +56,7 @@ namespace mongo { namespace mb = mutablebson; + namespace { const char idFieldName[] = "_id"; @@ -624,7 +625,6 @@ namespace mongo { runner->saveState(); if (inPlace && !driver->modsAffectIndices()) { - // If a set of modifiers were all no-ops, we are still 'in place', but there is // no work to do, in which case we want to consider the object unchanged. if (!damages.empty() ) { @@ -636,8 +636,12 @@ namespace mongo { newObj = oldObj; } else { - // The updates were not in place. Apply them through the file manager. + + // XXX: With experimental document-level locking, we do not hold the sufficient + // locks, so this would cause corruption. + fassert(18516, !useExperimentalDocLocking); + newObj = doc.getObject(); uassert(17419, str::stream() << "Resulting document after update is larger than " diff --git a/src/mongo/db/startup_warnings.cpp b/src/mongo/db/startup_warnings.cpp index d7e187cd5b0..15570c973c6 100644 --- a/src/mongo/db/startup_warnings.cpp +++ b/src/mongo/db/startup_warnings.cpp @@ -40,6 +40,8 @@ namespace mongo { + extern bool useExperimentalDocLocking; + // // system warnings // @@ -72,6 +74,18 @@ namespace mongo { warned = true; } + if (useExperimentalDocLocking) { + log() << "** WARNING: Experimental (and untested) document-level locking for in-place" + << startupWarningsLog; + log() << " updates, which do not modify indexed values, has been enabled." + << startupWarningsLog; + log() << " This should absolutely not be used on production systems and is" + << startupWarningsLog; + log() << " for demonstration purposes only." + << startupWarningsLog; + warned = true; + } + #if defined(_WIN32) && !defined(_WIN64) // Warn user that they are running a 32-bit app on 64-bit Windows BOOL wow64Process; diff --git a/src/mongo/db/structure/record_store_v1_base.cpp b/src/mongo/db/structure/record_store_v1_base.cpp index 9a80cd62ceb..e2fc7ff8312 100644 --- a/src/mongo/db/structure/record_store_v1_base.cpp +++ b/src/mongo/db/structure/record_store_v1_base.cpp @@ -325,7 +325,7 @@ namespace mongo { Status RecordStoreV1Base::updateWithDamages( OperationContext* txn, const DiskLoc& loc, - const char* damangeSource, + const char* damageSource, const mutablebson::DamageVector& damages ) { _paddingFits( txn ); @@ -336,7 +336,7 @@ namespace mongo { mutablebson::DamageVector::const_iterator where = damages.begin(); const mutablebson::DamageVector::const_iterator end = damages.end(); for( ; where != end; ++where ) { - const char* sourcePtr = damangeSource + where->sourceOffset; + const char* sourcePtr = damageSource + where->sourceOffset; void* targetPtr = txn->recoveryUnit()->writingPtr(root + where->targetOffset, where->size); std::memcpy(targetPtr, sourcePtr, where->size); } |