/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/db/concurrency/lock_state.h" #include #include "mongo/db/namespace_string.h" #include "mongo/db/service_context.h" #include "mongo/platform/compiler.h" #include "mongo/stdx/new.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace { /** * Partitioned global lock statistics, so we don't hit the same bucket. */ class PartitionedInstanceWideLockStats { MONGO_DISALLOW_COPYING(PartitionedInstanceWideLockStats); public: PartitionedInstanceWideLockStats() {} void recordAcquisition(LockerId id, ResourceId resId, LockMode mode) { _get(id).recordAcquisition(resId, mode); } void recordWait(LockerId id, ResourceId resId, LockMode mode) { _get(id).recordWait(resId, mode); } void recordWaitTime(LockerId id, ResourceId resId, LockMode mode, uint64_t waitMicros) { _get(id).recordWaitTime(resId, mode, waitMicros); } void recordDeadlock(ResourceId resId, LockMode mode) { _get(resId).recordDeadlock(resId, mode); } void report(SingleThreadedLockStats* outStats) const { for (int i = 0; i < NumPartitions; i++) { outStats->append(_partitions[i].stats); } } void reset() { for (int i = 0; i < NumPartitions; i++) { _partitions[i].stats.reset(); } } private: // This alignment is a best effort approach to ensure that each partition falls on a // separate page/cache line in order to avoid false sharing. struct alignas(stdx::hardware_destructive_interference_size) AlignedLockStats { AtomicLockStats stats; }; enum { NumPartitions = 8 }; AtomicLockStats& _get(LockerId id) { return _partitions[id % NumPartitions].stats; } AlignedLockStats _partitions[NumPartitions]; }; // Global lock manager instance. LockManager globalLockManager; // Global lock. Every server operation, which uses the Locker must acquire this lock at least // once. See comments in the header file (begin/endTransaction) for more information. const ResourceId resourceIdGlobal = ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_GLOBAL); // Flush lock. This is only used for the MMAP V1 storage engine and synchronizes journal writes // to the shared view and remaps. See the comments in the header for information on how MMAP V1 // concurrency control works. const ResourceId resourceIdMMAPV1Flush = ResourceId(RESOURCE_MMAPV1_FLUSH, ResourceId::SINGLETON_MMAPV1_FLUSH); // How often (in millis) to check for deadlock if a lock has not been granted for some time const Milliseconds DeadlockTimeout = Milliseconds(500); // Dispenses unique LockerId identifiers AtomicUInt64 idCounter(0); // Partitioned global lock statistics, so we don't hit the same bucket PartitionedInstanceWideLockStats globalStats; } // namespace template bool LockerImpl::_shouldDelayUnlock(ResourceId resId, LockMode mode) const { switch (resId.getType()) { // The flush lock must not participate in two-phase locking because we need to temporarily // yield it while blocked waiting to acquire other locks. case RESOURCE_MMAPV1_FLUSH: case RESOURCE_MUTEX: return false; case RESOURCE_GLOBAL: case RESOURCE_DATABASE: case RESOURCE_COLLECTION: case RESOURCE_METADATA: break; default: MONGO_UNREACHABLE; } switch (mode) { case MODE_X: case MODE_IX: return true; case MODE_IS: case MODE_S: return _sharedLocksShouldTwoPhaseLock; default: MONGO_UNREACHABLE; } } template bool LockerImpl::isW() const { return getLockMode(resourceIdGlobal) == MODE_X; } template bool LockerImpl::isR() const { return getLockMode(resourceIdGlobal) == MODE_S; } template bool LockerImpl::isLocked() const { return getLockMode(resourceIdGlobal) != MODE_NONE; } template bool LockerImpl::isWriteLocked() const { return isLockHeldForMode(resourceIdGlobal, MODE_IX); } template bool LockerImpl::isReadLocked() const { return isLockHeldForMode(resourceIdGlobal, MODE_IS); } template void LockerImpl::dump() const { StringBuilder ss; ss << "Locker id " << _id << " status: "; _lock.lock(); LockRequestsMap::ConstIterator it = _requests.begin(); while (!it.finished()) { ss << it.key().toString() << " " << lockRequestStatusName(it->status) << " in " << modeName(it->mode) << "; "; it.next(); } _lock.unlock(); log() << ss.str(); } // // CondVarLockGrantNotification // CondVarLockGrantNotification::CondVarLockGrantNotification() { clear(); } void CondVarLockGrantNotification::clear() { _result = LOCK_INVALID; } LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) { stdx::unique_lock lock(_mutex); return _cond.wait_for( lock, timeout.toSystemDuration(), [this] { return _result != LOCK_INVALID; }) ? _result : LOCK_TIMEOUT; } LockResult CondVarLockGrantNotification::wait(OperationContext* opCtx, Milliseconds timeout) { invariant(opCtx); stdx::unique_lock lock(_mutex); return opCtx->waitForConditionOrInterruptFor( _cond, lock, timeout, [this] { return _result != LOCK_INVALID; }) ? _result : LOCK_TIMEOUT; } void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) { stdx::unique_lock lock(_mutex); invariant(_result == LOCK_INVALID); _result = result; _cond.notify_all(); } namespace { TicketHolder* ticketHolders[LockModesCount] = {}; } // namespace // // Locker // /* static */ void Locker::setGlobalThrottling(class TicketHolder* reading, class TicketHolder* writing) { ticketHolders[MODE_S] = reading; ticketHolders[MODE_IS] = reading; ticketHolders[MODE_IX] = writing; } template LockerImpl::LockerImpl() : _id(idCounter.addAndFetch(1)), _wuowNestingLevel(0), _threadId(stdx::this_thread::get_id()) {} template stdx::thread::id LockerImpl::getThreadId() const { return _threadId; } template LockerImpl::~LockerImpl() { // Cannot delete the Locker while there are still outstanding requests, because the // LockManager may attempt to access deleted memory. Besides it is probably incorrect // to delete with unaccounted locks anyways. invariant(!inAWriteUnitOfWork()); invariant(_resourcesToUnlockAtEndOfUnitOfWork.empty()); invariant(_requests.empty()); invariant(_modeForTicket == MODE_NONE); // Reset the locking statistics so the object can be reused _stats.reset(); } template Locker::ClientState LockerImpl::getClientState() const { auto state = _clientState.load(); if (state == kActiveReader && hasLockPending()) state = kQueuedReader; if (state == kActiveWriter && hasLockPending()) state = kQueuedWriter; return state; } template LockResult LockerImpl::lockGlobal(OperationContext* opCtx, LockMode mode) { LockResult result = _lockGlobalBegin(opCtx, mode, Date_t::max()); if (result == LOCK_WAITING) { result = lockGlobalComplete(opCtx, Date_t::max()); } if (result == LOCK_OK) { lockMMAPV1Flush(); } return result; } template void LockerImpl::reacquireTicket(OperationContext* opCtx) { invariant(_modeForTicket != MODE_NONE); auto acquireTicketResult = _acquireTicket(opCtx, _modeForTicket, Date_t::max()); invariant(acquireTicketResult == LOCK_OK); } template LockResult LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t deadline) { const bool reader = isSharedLockMode(mode); auto holder = shouldAcquireTicket() ? ticketHolders[mode] : nullptr; if (holder) { _clientState.store(reader ? kQueuedReader : kQueuedWriter); // If the ticket wait is interrupted, restore the state of the client. auto restoreStateOnErrorGuard = MakeGuard([&] { _clientState.store(kInactive); }); if (deadline == Date_t::max()) { holder->waitForTicket(opCtx); } else if (!holder->waitForTicketUntil(opCtx, deadline)) { return LOCK_TIMEOUT; } restoreStateOnErrorGuard.Dismiss(); } _clientState.store(reader ? kActiveReader : kActiveWriter); return LOCK_OK; } template LockResult LockerImpl::_lockGlobalBegin(OperationContext* opCtx, LockMode mode, Date_t deadline) { dassert(isLocked() == (_modeForTicket != MODE_NONE)); if (_modeForTicket == MODE_NONE) { auto acquireTicketResult = _acquireTicket(opCtx, mode, deadline); if (acquireTicketResult != LOCK_OK) { return acquireTicketResult; } _modeForTicket = mode; } const LockResult result = lockBegin(resourceIdGlobal, mode); if (result == LOCK_OK) return LOCK_OK; // Currently, deadlock detection does not happen inline with lock acquisition so the only // unsuccessful result that the lock manager would return is LOCK_WAITING. invariant(result == LOCK_WAITING); return result; } template LockResult LockerImpl::lockGlobalComplete(OperationContext* opCtx, Date_t deadline) { return lockComplete(opCtx, resourceIdGlobal, getLockMode(resourceIdGlobal), deadline, false); } template void LockerImpl::lockMMAPV1Flush() { if (!IsForMMAPV1) return; // The flush lock always has a reference count of 1, because it is dropped at the end of // each write unit of work in order to allow the flush thread to run. See the comments in // the header for information on how the MMAP V1 journaling system works. LockRequest* globalLockRequest = _requests.find(resourceIdGlobal).objAddr(); if (globalLockRequest->recursiveCount == 1) { invariant(LOCK_OK == lock(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); } dassert(getLockMode(resourceIdMMAPV1Flush) == _getModeForMMAPV1FlushLock()); } template void LockerImpl::downgradeGlobalXtoSForMMAPV1() { invariant(!inAWriteUnitOfWork()); LockRequest* globalLockRequest = _requests.find(resourceIdGlobal).objAddr(); invariant(globalLockRequest->mode == MODE_X); invariant(globalLockRequest->recursiveCount == 1); invariant(_modeForTicket == MODE_X); // Note that this locker will not actually have a ticket (as MODE_X has no TicketHolder) or // acquire one now, but at most a single thread can be in this downgraded MODE_S situation, // so it's OK. // Making this call here will record lock downgrades as acquisitions, which is acceptable globalStats.recordAcquisition(_id, resourceIdGlobal, MODE_S); _stats.recordAcquisition(resourceIdGlobal, MODE_S); globalLockManager.downgrade(globalLockRequest, MODE_S); if (IsForMMAPV1) { invariant(unlock(resourceIdMMAPV1Flush)); } } template bool LockerImpl::unlockGlobal() { if (!unlock(resourceIdGlobal)) { return false; } invariant(!inAWriteUnitOfWork()); LockRequestsMap::Iterator it = _requests.begin(); while (!it.finished()) { // If we're here we should only have one reference to any lock. It is a programming // error for any lock used with multi-granularity locking to have more references than // the global lock, because every scope starts by calling lockGlobal. if (it.key().getType() == RESOURCE_GLOBAL || it.key().getType() == RESOURCE_MUTEX) { it.next(); } else { invariant(_unlockImpl(&it)); } } return true; } template void LockerImpl::beginWriteUnitOfWork() { // Sanity check that write transactions under MMAP V1 have acquired the flush lock, so we // don't allow partial changes to be written. dassert(!IsForMMAPV1 || isLockHeldForMode(resourceIdMMAPV1Flush, MODE_IX)); _wuowNestingLevel++; } template void LockerImpl::endWriteUnitOfWork() { invariant(_wuowNestingLevel > 0); if (--_wuowNestingLevel > 0) { // Don't do anything unless leaving outermost WUOW. return; } while (!_resourcesToUnlockAtEndOfUnitOfWork.empty()) { unlock(_resourcesToUnlockAtEndOfUnitOfWork.front()); _resourcesToUnlockAtEndOfUnitOfWork.pop(); } // For MMAP V1, we need to yield the flush lock so that the flush thread can run if (IsForMMAPV1) { invariant(unlock(resourceIdMMAPV1Flush)); invariant(LOCK_OK == lock(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); } } template LockResult LockerImpl::lock( OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { const LockResult result = lockBegin(resId, mode); // Fast, uncontended path if (result == LOCK_OK) return LOCK_OK; // Currently, deadlock detection does not happen inline with lock acquisition so the only // unsuccessful result that the lock manager would return is LOCK_WAITING. invariant(result == LOCK_WAITING); return lockComplete(opCtx, resId, mode, deadline, checkDeadlock); } template void LockerImpl::downgrade(ResourceId resId, LockMode newMode) { LockRequestsMap::Iterator it = _requests.find(resId); globalLockManager.downgrade(it.objAddr(), newMode); } template bool LockerImpl::unlock(ResourceId resId) { LockRequestsMap::Iterator it = _requests.find(resId); if (inAWriteUnitOfWork() && _shouldDelayUnlock(it.key(), (it->mode))) { _resourcesToUnlockAtEndOfUnitOfWork.push(it.key()); return false; } // Don't attempt to unlock twice. This can happen when an interrupted global lock is destructed. if (it.finished()) return false; return _unlockImpl(&it); } template LockMode LockerImpl::getLockMode(ResourceId resId) const { scoped_spinlock scopedLock(_lock); const LockRequestsMap::ConstIterator it = _requests.find(resId); if (!it) return MODE_NONE; return it->mode; } template bool LockerImpl::isLockHeldForMode(ResourceId resId, LockMode mode) const { return isModeCovered(mode, getLockMode(resId)); } template bool LockerImpl::isDbLockedForMode(StringData dbName, LockMode mode) const { invariant(nsIsDbOnly(dbName)); if (isW()) return true; if (isR() && isSharedLockMode(mode)) return true; const ResourceId resIdDb(RESOURCE_DATABASE, dbName); return isLockHeldForMode(resIdDb, mode); } template bool LockerImpl::isCollectionLockedForMode(StringData ns, LockMode mode) const { invariant(nsIsFull(ns)); if (isW()) return true; if (isR() && isSharedLockMode(mode)) return true; const NamespaceString nss(ns); const ResourceId resIdDb(RESOURCE_DATABASE, nss.db()); LockMode dbMode = getLockMode(resIdDb); if (!shouldConflictWithSecondaryBatchApplication()) return true; switch (dbMode) { case MODE_NONE: return false; case MODE_X: return true; case MODE_S: return isSharedLockMode(mode); case MODE_IX: case MODE_IS: { const ResourceId resIdColl(RESOURCE_COLLECTION, ns); return isLockHeldForMode(resIdColl, mode); } break; case LockModesCount: break; } invariant(false); return false; } template ResourceId LockerImpl::getWaitingResource() const { scoped_spinlock scopedLock(_lock); LockRequestsMap::ConstIterator it = _requests.begin(); while (!it.finished()) { if (it->status == LockRequest::STATUS_WAITING || it->status == LockRequest::STATUS_CONVERTING) { return it.key(); } it.next(); } return ResourceId(); } template void LockerImpl::getLockerInfo(LockerInfo* lockerInfo) const { invariant(lockerInfo); // Zero-out the contents lockerInfo->locks.clear(); lockerInfo->waitingResource = ResourceId(); lockerInfo->stats.reset(); _lock.lock(); LockRequestsMap::ConstIterator it = _requests.begin(); while (!it.finished()) { OneLock info; info.resourceId = it.key(); info.mode = it->mode; lockerInfo->locks.push_back(info); it.next(); } _lock.unlock(); std::sort(lockerInfo->locks.begin(), lockerInfo->locks.end()); lockerInfo->waitingResource = getWaitingResource(); lockerInfo->stats.append(_stats); } template boost::optional LockerImpl::getLockerInfo() const { Locker::LockerInfo lockerInfo; getLockerInfo(&lockerInfo); return std::move(lockerInfo); } template bool LockerImpl::saveLockStateAndUnlock(Locker::LockSnapshot* stateOut) { // We shouldn't be saving and restoring lock state from inside a WriteUnitOfWork. invariant(!inAWriteUnitOfWork()); // Clear out whatever is in stateOut. stateOut->locks.clear(); stateOut->globalMode = MODE_NONE; // First, we look at the global lock. There is special handling for this (as the flush // lock goes along with it) so we store it separately from the more pedestrian locks. LockRequestsMap::Iterator globalRequest = _requests.find(resourceIdGlobal); if (!globalRequest) { // If there's no global lock there isn't really anything to do. Check that. for (auto it = _requests.begin(); !it.finished(); it.next()) { invariant(it.key().getType() == RESOURCE_MUTEX); } return false; } // If the global lock has been acquired more than once, we're probably somewhere in a // DBDirectClient call. It's not safe to release and reacquire locks -- the context using // the DBDirectClient is probably not prepared for lock release. if (globalRequest->recursiveCount > 1) { return false; } // The global lock must have been acquired just once stateOut->globalMode = globalRequest->mode; invariant(unlock(resourceIdGlobal)); // Next, the non-global locks. for (LockRequestsMap::Iterator it = _requests.begin(); !it.finished(); it.next()) { const ResourceId resId = it.key(); const ResourceType resType = resId.getType(); if (resType == RESOURCE_MUTEX) continue; // We should never have to save and restore metadata locks. invariant((IsForMMAPV1 && (resourceIdMMAPV1Flush == resId)) || RESOURCE_DATABASE == resId.getType() || RESOURCE_COLLECTION == resId.getType() || (RESOURCE_GLOBAL == resId.getType() && isSharedLockMode(it->mode))); // And, stuff the info into the out parameter. OneLock info; info.resourceId = resId; info.mode = it->mode; stateOut->locks.push_back(info); invariant(unlock(resId)); } invariant(!isLocked()); // Sort locks by ResourceId. They'll later be acquired in this canonical locking order. std::sort(stateOut->locks.begin(), stateOut->locks.end()); return true; } template void LockerImpl::restoreLockState(OperationContext* opCtx, const Locker::LockSnapshot& state) { // We shouldn't be saving and restoring lock state from inside a WriteUnitOfWork. invariant(!inAWriteUnitOfWork()); invariant(_modeForTicket == MODE_NONE); std::vector::const_iterator it = state.locks.begin(); // If we locked the PBWM, it must be locked before the resourceIdGlobal resource. if (it != state.locks.end() && it->resourceId == resourceIdParallelBatchWriterMode) { invariant(LOCK_OK == lock(opCtx, it->resourceId, it->mode)); it++; } invariant(LOCK_OK == lockGlobal(opCtx, state.globalMode)); for (; it != state.locks.end(); it++) { // This is a sanity check that lockGlobal restored the MMAP V1 flush lock in the // expected mode. if (IsForMMAPV1 && (it->resourceId == resourceIdMMAPV1Flush)) { invariant(it->mode == _getModeForMMAPV1FlushLock()); } else { invariant(LOCK_OK == lock(it->resourceId, it->mode)); } } invariant(_modeForTicket != MODE_NONE); } template LockResult LockerImpl::lockBegin(ResourceId resId, LockMode mode) { dassert(!getWaitingResource().isValid()); LockRequest* request; bool isNew = true; LockRequestsMap::Iterator it = _requests.find(resId); if (!it) { scoped_spinlock scopedLock(_lock); LockRequestsMap::Iterator itNew = _requests.insert(resId); itNew->initNew(this, &_notify); request = itNew.objAddr(); } else { request = it.objAddr(); isNew = false; } // Making this call here will record lock re-acquisitions and conversions as well. globalStats.recordAcquisition(_id, resId, mode); _stats.recordAcquisition(resId, mode); // Give priority to the full modes for global, parallel batch writer mode, // and flush lock so we don't stall global operations such as shutdown or flush. const ResourceType resType = resId.getType(); if (resType == RESOURCE_GLOBAL || (IsForMMAPV1 && resId == resourceIdMMAPV1Flush)) { if (mode == MODE_S || mode == MODE_X) { request->enqueueAtFront = true; request->compatibleFirst = true; } } else if (resType != RESOURCE_MUTEX) { // This is all sanity checks that the global and flush locks are always be acquired // before any other lock has been acquired and they must be in sync with the nesting. DEV { const LockRequestsMap::Iterator itGlobal = _requests.find(resourceIdGlobal); invariant(itGlobal->recursiveCount > 0); invariant(itGlobal->mode != MODE_NONE); // Check the MMAP V1 flush lock is held in the appropriate mode invariant(!IsForMMAPV1 || isLockHeldForMode(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); }; } // The notification object must be cleared before we invoke the lock manager, because // otherwise we might reset state if the lock becomes granted very fast. _notify.clear(); LockResult result = isNew ? globalLockManager.lock(resId, request, mode) : globalLockManager.convert(resId, request, mode); if (result == LOCK_WAITING) { globalStats.recordWait(_id, resId, mode); _stats.recordWait(resId, mode); } return result; } template LockResult LockerImpl::lockComplete( OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { // Under MMAP V1 engine a deadlock can occur if a thread goes to sleep waiting on // DB lock, while holding the flush lock, so it has to be released. This is only // correct to do if not in a write unit of work. const bool yieldFlushLock = IsForMMAPV1 && !inAWriteUnitOfWork() && resId.getType() != RESOURCE_GLOBAL && resId.getType() != RESOURCE_MUTEX && resId != resourceIdMMAPV1Flush; if (yieldFlushLock) { invariant(unlock(resourceIdMMAPV1Flush)); } auto relockFlushLockGuard = MakeGuard([&] { if (yieldFlushLock) { // We cannot obey the timeout here, because it is not correct to return from the lock // request with the flush lock released. invariant(LOCK_OK == lock(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); } }); LockResult result; Milliseconds timeout; if (deadline == Date_t::max()) { timeout = Milliseconds::max(); } else if (deadline == Date_t::min()) { timeout = Milliseconds(0); } else { timeout = deadline - Date_t::now(); } // Don't go sleeping without bound in order to be able to report long waits or wake up for // deadlock detection. Milliseconds waitTime = std::min(timeout, DeadlockTimeout); const uint64_t startOfTotalWaitTime = curTimeMicros64(); uint64_t startOfCurrentWaitTime = startOfTotalWaitTime; // Clean up the state on any failed lock attempts. auto unlockOnErrorGuard = MakeGuard([&] { LockRequestsMap::Iterator it = _requests.find(resId); _unlockImpl(&it); }); while (true) { // It is OK if this call wakes up spuriously, because we re-evaluate the remaining // wait time anyways. // If we have an operation context, we want to use its interruptible wait so that // pending lock acquisitions can be cancelled, so long as no callers have requested an // uninterruptible lock. if (opCtx && _uninterruptibleLocksRequested == 0) { result = _notify.wait(opCtx, waitTime); } else { result = _notify.wait(waitTime); } // Account for the time spent waiting on the notification object const uint64_t curTimeMicros = curTimeMicros64(); const uint64_t elapsedTimeMicros = curTimeMicros - startOfCurrentWaitTime; startOfCurrentWaitTime = curTimeMicros; globalStats.recordWaitTime(_id, resId, mode, elapsedTimeMicros); _stats.recordWaitTime(resId, mode, elapsedTimeMicros); if (result == LOCK_OK) break; if (checkDeadlock) { DeadlockDetector wfg(globalLockManager, this); if (wfg.check().hasCycle()) { warning() << "Deadlock found: " << wfg.toString(); globalStats.recordDeadlock(resId, mode); _stats.recordDeadlock(resId, mode); result = LOCK_DEADLOCK; break; } } // If infinite timeout was requested, just keep waiting if (timeout == Milliseconds::max()) { continue; } const auto totalBlockTime = duration_cast( Microseconds(int64_t(curTimeMicros - startOfTotalWaitTime))); waitTime = (totalBlockTime < timeout) ? std::min(timeout - totalBlockTime, DeadlockTimeout) : Milliseconds(0); if (waitTime == Milliseconds(0)) { break; } } // Note: in case of the _notify object returning LOCK_TIMEOUT, it is possible to find that the // lock was still granted after all, but we don't try to take advantage of that and will return // a timeout. if (result == LOCK_OK) { unlockOnErrorGuard.Dismiss(); } return result; } template void LockerImpl::releaseTicket() { invariant(_modeForTicket != MODE_NONE); _releaseTicket(); } template void LockerImpl::_releaseTicket() { auto holder = shouldAcquireTicket() ? ticketHolders[_modeForTicket] : nullptr; if (holder) { holder->release(); } _clientState.store(kInactive); } template bool LockerImpl::_unlockImpl(LockRequestsMap::Iterator* it) { if (globalLockManager.unlock(it->objAddr())) { if (it->key() == resourceIdGlobal) { invariant(_modeForTicket != MODE_NONE); // We may have already released our ticket through a call to releaseTicket(). if (_clientState.load() != kInactive) { _releaseTicket(); } _modeForTicket = MODE_NONE; } scoped_spinlock scopedLock(_lock); it->remove(); return true; } return false; } template LockMode LockerImpl::_getModeForMMAPV1FlushLock() const { invariant(IsForMMAPV1); LockMode mode = getLockMode(resourceIdGlobal); switch (mode) { case MODE_X: case MODE_IX: return MODE_IX; case MODE_S: case MODE_IS: return MODE_IS; default: invariant(false); return MODE_NONE; } } template bool LockerImpl::isGlobalLockedRecursively() { auto globalLockRequest = _requests.find(resourceIdGlobal); return !globalLockRequest.finished() && globalLockRequest->recursiveCount > 1; } // // Auto classes // AutoYieldFlushLockForMMAPV1Commit::AutoYieldFlushLockForMMAPV1Commit(Locker* locker) : _locker(static_cast(locker)) { // Explicit yielding of the flush lock should happen only at global synchronization points // such as database drop. There should not be any active writes at these points. invariant(!_locker->inAWriteUnitOfWork()); if (isMMAPV1()) { invariant(_locker->unlock(resourceIdMMAPV1Flush)); } } AutoYieldFlushLockForMMAPV1Commit::~AutoYieldFlushLockForMMAPV1Commit() { if (isMMAPV1()) { invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, _locker->_getModeForMMAPV1FlushLock())); } } AutoAcquireFlushLockForMMAPV1Commit::AutoAcquireFlushLockForMMAPV1Commit(Locker* locker) : _locker(locker), _released(false) { // The journal thread acquiring the journal lock in S-mode opens opportunity for deadlock // involving operations which do not acquire and release the Oplog collection's X lock // inside a WUOW (see SERVER-17416 for the sequence of events), therefore acquire it with // check for deadlock and back-off if one is encountered. // // This exposes theoretical chance that we might starve the journaling system, but given // that these deadlocks happen extremely rarely and are usually due to incorrect locking // policy, and we have the deadlock counters as part of the locking statistics, this is a // reasonable handling. // // In the worst case, if we are to starve the journaling system, the server will shut down // due to too much uncommitted in-memory journal, but won't have corruption. while (true) { LockResult result = _locker->lock(resourceIdMMAPV1Flush, MODE_S, Date_t::max(), true); if (result == LOCK_OK) { break; } invariant(result == LOCK_DEADLOCK); warning() << "Delayed journaling in order to avoid deadlock during MMAP V1 journal " << "lock acquisition. See the previous messages for information on the " << "involved threads."; } } void AutoAcquireFlushLockForMMAPV1Commit::upgradeFlushLockToExclusive() { // This should not be able to deadlock, since we already hold the S journal lock, which // means all writers are kicked out. Readers always yield the journal lock if they block // waiting on any other lock. invariant(LOCK_OK == _locker->lock(resourceIdMMAPV1Flush, MODE_X, Date_t::max(), false)); // Lock bumps the recursive count. Drop it back down so that the destructor doesn't // complain. invariant(!_locker->unlock(resourceIdMMAPV1Flush)); } void AutoAcquireFlushLockForMMAPV1Commit::release() { if (!_released) { invariant(_locker->unlock(resourceIdMMAPV1Flush)); _released = true; } } AutoAcquireFlushLockForMMAPV1Commit::~AutoAcquireFlushLockForMMAPV1Commit() { release(); } namespace { /** * Periodically purges unused lock buckets. The first time the lock is used again after * cleanup it needs to be allocated, and similarly, every first use by a client for an intent * mode may need to create a partitioned lock head. Cleanup is done roughly once a minute. */ class UnusedLockCleaner : PeriodicTask { public: std::string taskName() const { return "UnusedLockCleaner"; } void taskDoWork() { LOG(2) << "cleaning up unused lock buckets of the global lock manager"; getGlobalLockManager()->cleanupUnusedLocks(); } } unusedLockCleaner; } // namespace // // Standalone functions // LockManager* getGlobalLockManager() { return &globalLockManager; } void reportGlobalLockingStats(SingleThreadedLockStats* outStats) { globalStats.report(outStats); } void resetGlobalLockStats() { globalStats.reset(); } // Ensures that there are two instances compiled for LockerImpl for the two values of the // template argument. template class LockerImpl; template class LockerImpl; // Definition for the hardcoded localdb and oplog collection info const ResourceId resourceIdLocalDB = ResourceId(RESOURCE_DATABASE, StringData("local")); const ResourceId resourceIdOplog = ResourceId(RESOURCE_COLLECTION, StringData("local.oplog.rs")); const ResourceId resourceIdAdminDB = ResourceId(RESOURCE_DATABASE, StringData("admin")); const ResourceId resourceIdParallelBatchWriterMode = ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_PARALLEL_BATCH_WRITER_MODE); } // namespace mongo