summaryrefslogtreecommitdiff
path: root/src/mongo/db/concurrency/locker_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/concurrency/locker_impl.cpp')
-rw-r--r--src/mongo/db/concurrency/locker_impl.cpp1241
1 files changed, 1241 insertions, 0 deletions
diff --git a/src/mongo/db/concurrency/locker_impl.cpp b/src/mongo/db/concurrency/locker_impl.cpp
new file mode 100644
index 00000000000..a45cb05679d
--- /dev/null
+++ b/src/mongo/db/concurrency/locker_impl.cpp
@@ -0,0 +1,1241 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/concurrency/locker_impl.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/flow_control.h"
+#include "mongo/db/storage/ticketholder_manager.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/compiler.h"
+#include "mongo/stdx/new.h"
+#include "mongo/util/background.h"
+#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/debug_util.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/scopeguard.h"
+#include "mongo/util/str.h"
+#include "mongo/util/testing_proctor.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
+namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(failNonIntentLocksIfWaitNeeded);
+MONGO_FAIL_POINT_DEFINE(enableTestOnlyFlagforRSTL);
+
+namespace {
+
+// Ignore data races in certain functions when running with TSAN. For performance reasons,
+// diagnostic commands are expected to race with concurrent lock acquisitions while gathering
+// statistics.
+#if __has_feature(thread_sanitizer)
+#define MONGO_TSAN_IGNORE __attribute__((no_sanitize("thread")))
+#else
+#define MONGO_TSAN_IGNORE
+#endif
+
+/**
+ * Tracks global (across all clients) lock acquisition statistics, partitioned into multiple
+ * buckets to minimize concurrent access conflicts.
+ *
+ * Each client has a LockerId that monotonically increases across all client instances. The
+ * LockerId % 8 is used to index into one of 8 LockStats instances. These LockStats objects must be
+ * atomically accessed, so maintaining 8 that are indexed by LockerId reduces client conflicts and
+ * improves concurrent write access. A reader, to collect global lock statics for reporting, will
+ * sum the results of all 8 disjoint 'buckets' of stats.
+ */
+class PartitionedInstanceWideLockStats {
+ PartitionedInstanceWideLockStats(const PartitionedInstanceWideLockStats&) = delete;
+ PartitionedInstanceWideLockStats& operator=(const PartitionedInstanceWideLockStats&) = delete;
+
+public:
+ PartitionedInstanceWideLockStats() {}
+
+ void recordAcquisition(LockerId id, ResourceId resId, LockMode mode) {
+ _get(id).recordAcquisition(resId, mode);
+ }
+
+ void recordWait(LockerId id, ResourceId resId, LockMode mode) {
+ _get(id).recordWait(resId, mode);
+ }
+
+ void recordWaitTime(LockerId id, ResourceId resId, LockMode mode, uint64_t waitMicros) {
+ _get(id).recordWaitTime(resId, mode, waitMicros);
+ }
+
+ void report(SingleThreadedLockStats* outStats) const {
+ for (int i = 0; i < NumPartitions; i++) {
+ outStats->append(_partitions[i].stats);
+ }
+ }
+
+ void reset() {
+ for (int i = 0; i < NumPartitions; i++) {
+ _partitions[i].stats.reset();
+ }
+ }
+
+private:
+ // This alignment is a best effort approach to ensure that each partition falls on a
+ // separate page/cache line in order to avoid false sharing.
+ struct alignas(stdx::hardware_destructive_interference_size) AlignedLockStats {
+ AtomicLockStats stats;
+ };
+
+ enum { NumPartitions = 8 };
+
+
+ AtomicLockStats& _get(LockerId id) {
+ return _partitions[id % NumPartitions].stats;
+ }
+
+
+ AlignedLockStats _partitions[NumPartitions];
+};
+
+// How often (in millis) to check for deadlock if a lock has not been granted for some time
+const Milliseconds MaxWaitTime = Milliseconds(500);
+
+// Dispenses unique LockerId identifiers
+AtomicWord<unsigned long long> idCounter(0);
+
+// Tracks lock statistics across all Locker instances. Distributes stats across multiple buckets
+// indexed by LockerId in order to minimize concurrent access conflicts.
+PartitionedInstanceWideLockStats globalStats;
+
+} // namespace
+
+LockManager* getGlobalLockManager() {
+ auto serviceContext = getGlobalServiceContext();
+ invariant(serviceContext);
+ return LockManager::get(serviceContext);
+}
+
+bool LockerImpl::_shouldDelayUnlock(ResourceId resId, LockMode mode) const {
+ switch (resId.getType()) {
+ case RESOURCE_MUTEX:
+ return false;
+
+ case RESOURCE_GLOBAL:
+ case RESOURCE_TENANT:
+ case RESOURCE_DATABASE:
+ case RESOURCE_COLLECTION:
+ case RESOURCE_METADATA:
+ break;
+
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ switch (mode) {
+ case MODE_X:
+ case MODE_IX:
+ return true;
+
+ case MODE_IS:
+ case MODE_S:
+ return _sharedLocksShouldTwoPhaseLock;
+
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+bool LockerImpl::isW() const {
+ return getLockMode(resourceIdGlobal) == MODE_X;
+}
+
+bool LockerImpl::isR() const {
+ return getLockMode(resourceIdGlobal) == MODE_S;
+}
+
+bool LockerImpl::isLocked() const {
+ return getLockMode(resourceIdGlobal) != MODE_NONE;
+}
+
+bool LockerImpl::isWriteLocked() const {
+ return isLockHeldForMode(resourceIdGlobal, MODE_IX);
+}
+
+bool LockerImpl::isReadLocked() const {
+ return isLockHeldForMode(resourceIdGlobal, MODE_IS);
+}
+
+bool LockerImpl::isRSTLExclusive() const {
+ return getLockMode(resourceIdReplicationStateTransitionLock) == MODE_X;
+}
+
+bool LockerImpl::isRSTLLocked() const {
+ return getLockMode(resourceIdReplicationStateTransitionLock) != MODE_NONE;
+}
+
+void LockerImpl::dump() const {
+ struct Entry {
+ ResourceId key;
+ LockRequest::Status status;
+ LockMode mode;
+ unsigned int recursiveCount;
+ unsigned int unlockPending;
+
+ BSONObj toBSON() const {
+ BSONObjBuilder b;
+ b.append("key", key.toString());
+ b.append("status", lockRequestStatusName(status));
+ b.append("recursiveCount", static_cast<int>(recursiveCount));
+ b.append("unlockPending", static_cast<int>(unlockPending));
+ b.append("mode", modeName(mode));
+ return b.obj();
+ }
+ std::string toString() const {
+ return tojson(toBSON());
+ }
+ };
+ std::vector<Entry> entries;
+ {
+ auto lg = stdx::lock_guard(_lock);
+ for (auto it = _requests.begin(); !it.finished(); it.next())
+ entries.push_back(
+ {it.key(), it->status, it->mode, it->recursiveCount, it->unlockPending});
+ }
+ LOGV2(20523,
+ "Locker id {id} status: {requests}",
+ "Locker status",
+ "id"_attr = _id,
+ "requests"_attr = entries);
+}
+
+void LockerImpl::_dumpLockerAndLockManagerRequests() {
+ // Log the _requests that this locker holds. This will provide identifying information to cross
+ // reference with the LockManager dump below for extra information.
+ dump();
+
+ // Log the LockManager's lock information. Given the locker 'dump()' above, we should be able to
+ // easily cross reference to find the lock info matching this operation. The LockManager can
+ // safely access (under internal locks) the LockRequest data that the locker cannot.
+ BSONObjBuilder builder;
+ auto lockToClientMap = LockManager::getLockToClientMap(getGlobalServiceContext());
+ getGlobalLockManager()->getLockInfoBSON(lockToClientMap, &builder);
+ auto lockInfo = builder.done();
+ LOGV2_ERROR(5736000, "Operation ending while holding locks.", "LockInfo"_attr = lockInfo);
+}
+
+
+//
+// CondVarLockGrantNotification
+//
+
+CondVarLockGrantNotification::CondVarLockGrantNotification() {
+ clear();
+}
+
+void CondVarLockGrantNotification::clear() {
+ _result = LOCK_INVALID;
+}
+
+LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ return _cond.wait_for(
+ lock, timeout.toSystemDuration(), [this] { return _result != LOCK_INVALID; })
+ ? _result
+ : LOCK_TIMEOUT;
+}
+
+LockResult CondVarLockGrantNotification::wait(OperationContext* opCtx, Milliseconds timeout) {
+ invariant(opCtx);
+ stdx::unique_lock<Latch> lock(_mutex);
+ if (opCtx->waitForConditionOrInterruptFor(
+ _cond, lock, timeout, [this] { return _result != LOCK_INVALID; })) {
+ // Because waitForConditionOrInterruptFor evaluates the predicate before checking for
+ // interrupt, it is possible that a killed operation can acquire a lock if the request is
+ // granted quickly. For that reason, it is necessary to check if the operation has been
+ // killed at least once before accepting the lock grant.
+ opCtx->checkForInterrupt();
+ return _result;
+ }
+ return LOCK_TIMEOUT;
+}
+
+void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ invariant(_result == LOCK_INVALID);
+ _result = result;
+
+ _cond.notify_all();
+}
+
+//
+// Locker
+//
+
+LockerImpl::LockerImpl(ServiceContext* serviceCtx)
+ : _id(idCounter.addAndFetch(1)),
+ _wuowNestingLevel(0),
+ _threadId(stdx::this_thread::get_id()),
+ _ticketHolderManager(TicketHolderManager::get(serviceCtx)) {}
+
+stdx::thread::id LockerImpl::getThreadId() const {
+ return _threadId;
+}
+
+void LockerImpl::updateThreadIdToCurrentThread() {
+ _threadId = stdx::this_thread::get_id();
+}
+
+void LockerImpl::unsetThreadId() {
+ _threadId = stdx::thread::id(); // Reset to represent a non-executing thread.
+}
+
+LockerImpl::~LockerImpl() {
+ // Cannot delete the Locker while there are still outstanding requests, because the
+ // LockManager may attempt to access deleted memory. Besides it is probably incorrect
+ // to delete with unaccounted locks anyways.
+ invariant(!inAWriteUnitOfWork());
+ invariant(_numResourcesToUnlockAtEndUnitOfWork == 0);
+ invariant(!_ticket || !_ticket->valid());
+
+ if (!_requests.empty()) {
+ _dumpLockerAndLockManagerRequests();
+ }
+ invariant(_requests.empty());
+
+ invariant(_modeForTicket == MODE_NONE);
+}
+
+Locker::ClientState LockerImpl::getClientState() const {
+ auto state = _clientState.load();
+ if (state == kActiveReader && hasLockPending())
+ state = kQueuedReader;
+ if (state == kActiveWriter && hasLockPending())
+ state = kQueuedWriter;
+
+ return state;
+}
+
+void LockerImpl::reacquireTicket(OperationContext* opCtx) {
+ invariant(_modeForTicket != MODE_NONE);
+ auto clientState = _clientState.load();
+ const bool reader = isSharedLockMode(_modeForTicket);
+
+ // Ensure that either we don't have a ticket, or the current ticket mode matches the lock mode.
+ invariant(clientState == kInactive || (clientState == kActiveReader && reader) ||
+ (clientState == kActiveWriter && !reader));
+
+ // If we already have a ticket, there's nothing to do.
+ if (clientState != kInactive)
+ return;
+
+ if (_acquireTicket(opCtx, _modeForTicket, Date_t::now())) {
+ return;
+ }
+
+ do {
+ for (auto it = _requests.begin(); it; it.next()) {
+ invariant(it->mode == LockMode::MODE_IS || it->mode == LockMode::MODE_IX);
+ opCtx->checkForInterrupt();
+
+ // If we've reached this point then that means we tried to acquire a ticket but were
+ // unsuccessful, implying that tickets are currently exhausted. Additionally, since
+ // we're holding an IS or IX lock for this resource, any pending requests for the same
+ // resource must be S or X and will not be able to be granted. Thus, since such a
+ // pending lock request may also be holding a ticket, if there are any present we fail
+ // this ticket reacquisition in order to avoid a deadlock.
+ uassert(ErrorCodes::LockTimeout,
+ fmt::format("Unable to acquire ticket with mode '{}' due to detected lock "
+ "conflict for resource {}",
+ _modeForTicket,
+ it.key().toString()),
+ !getGlobalLockManager()->hasConflictingRequests(it.key(), it.objAddr()));
+ }
+ } while (!_acquireTicket(opCtx, _modeForTicket, Date_t::now() + Milliseconds{100}));
+}
+
+bool LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t deadline) {
+ // Upon startup, the holder is not guaranteed to be initialized.
+ auto holder = _ticketHolderManager ? _ticketHolderManager->getTicketHolder(mode) : nullptr;
+ const bool reader = isSharedLockMode(mode);
+
+ if (!shouldWaitForTicket() && holder) {
+ holder->reportImmediatePriorityAdmission();
+ } else if (mode != MODE_X && mode != MODE_NONE && holder) {
+ // MODE_X is exclusive of all other locks, thus acquiring a ticket is unnecessary.
+ _clientState.store(reader ? kQueuedReader : kQueuedWriter);
+ // If the ticket wait is interrupted, restore the state of the client.
+ ScopeGuard restoreStateOnErrorGuard([&] { _clientState.store(kInactive); });
+
+ // Acquiring a ticket is a potentially blocking operation. This must not be called after a
+ // transaction timestamp has been set, indicating this transaction has created an oplog
+ // hole.
+ invariant(!opCtx->recoveryUnit()->isTimestamped());
+
+ if (auto ticket = holder->waitForTicketUntil(
+ _uninterruptibleLocksRequested ? nullptr : opCtx, &_admCtx, deadline)) {
+ _ticket = std::move(*ticket);
+ } else {
+ return false;
+ }
+ restoreStateOnErrorGuard.dismiss();
+ }
+
+ _clientState.store(reader ? kActiveReader : kActiveWriter);
+ return true;
+}
+
+void LockerImpl::lockGlobal(OperationContext* opCtx, LockMode mode, Date_t deadline) {
+ dassert(isLocked() == (_modeForTicket != MODE_NONE));
+ if (_modeForTicket == MODE_NONE) {
+ if (_uninterruptibleLocksRequested) {
+ // Ignore deadline.
+ invariant(_acquireTicket(opCtx, mode, Date_t::max()));
+ } else {
+ auto beforeAcquire = Date_t::now();
+ uassert(ErrorCodes::LockTimeout,
+ str::stream() << "Unable to acquire ticket with mode '" << mode
+ << "' within a max lock request timeout of '"
+ << Date_t::now() - beforeAcquire << "' milliseconds.",
+ _acquireTicket(opCtx, mode, deadline));
+ }
+ _modeForTicket = mode;
+ } else if (TestingProctor::instance().isEnabled() && !isModeCovered(mode, _modeForTicket)) {
+ LOGV2_FATAL(
+ 6614500,
+ "Ticket held does not cover requested mode for global lock. Global lock upgrades are "
+ "not allowed",
+ "held"_attr = modeName(_modeForTicket),
+ "requested"_attr = modeName(mode));
+ }
+
+ const LockResult result = _lockBegin(opCtx, resourceIdGlobal, mode);
+ // Fast, uncontended path
+ if (result == LOCK_OK)
+ return;
+
+ invariant(result == LOCK_WAITING);
+ _lockComplete(opCtx, resourceIdGlobal, mode, deadline, nullptr);
+}
+
+bool LockerImpl::unlockGlobal() {
+ if (!unlock(resourceIdGlobal)) {
+ return false;
+ }
+
+ invariant(!inAWriteUnitOfWork());
+
+ LockRequestsMap::Iterator it = _requests.begin();
+ while (!it.finished()) {
+ // If we're here we should only have one reference to any lock. It is a programming
+ // error for any lock used with multi-granularity locking to have more references than
+ // the global lock, because every scope starts by calling lockGlobal.
+ const auto resType = it.key().getType();
+ if (resType == RESOURCE_GLOBAL || resType == RESOURCE_MUTEX) {
+ it.next();
+ } else {
+ invariant(_unlockImpl(&it));
+ }
+ }
+
+ return true;
+}
+
+void LockerImpl::beginWriteUnitOfWork() {
+ _wuowNestingLevel++;
+}
+
+void LockerImpl::endWriteUnitOfWork() {
+ invariant(_wuowNestingLevel > 0);
+
+ if (--_wuowNestingLevel > 0) {
+ // Don't do anything unless leaving outermost WUOW.
+ return;
+ }
+
+ LockRequestsMap::Iterator it = _requests.begin();
+ while (_numResourcesToUnlockAtEndUnitOfWork > 0) {
+ if (it->unlockPending) {
+ invariant(!it.finished());
+ _numResourcesToUnlockAtEndUnitOfWork--;
+ }
+ while (it->unlockPending > 0) {
+ // If a lock is converted, unlock() may be called multiple times on a resource within
+ // the same WriteUnitOfWork. All such unlock() requests must thus be fulfilled here.
+ it->unlockPending--;
+ unlock(it.key());
+ }
+ it.next();
+ }
+}
+
+void LockerImpl::releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) {
+ stateOut->wuowNestingLevel = _wuowNestingLevel;
+ _wuowNestingLevel = 0;
+
+ for (auto it = _requests.begin(); _numResourcesToUnlockAtEndUnitOfWork > 0; it.next()) {
+ if (it->unlockPending) {
+ while (it->unlockPending) {
+ it->unlockPending--;
+ stateOut->unlockPendingLocks.push_back({it.key(), it->mode});
+ }
+ _numResourcesToUnlockAtEndUnitOfWork--;
+ }
+ }
+}
+
+void LockerImpl::restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) {
+ invariant(_numResourcesToUnlockAtEndUnitOfWork == 0);
+ invariant(!inAWriteUnitOfWork());
+
+ for (auto& lock : stateToRestore.unlockPendingLocks) {
+ auto it = _requests.begin();
+ while (it && !(it.key() == lock.resourceId && it->mode == lock.mode)) {
+ it.next();
+ }
+ invariant(!it.finished());
+ if (!it->unlockPending) {
+ _numResourcesToUnlockAtEndUnitOfWork++;
+ }
+ it->unlockPending++;
+ }
+ // Equivalent to call beginWriteUnitOfWork() multiple times.
+ _wuowNestingLevel = stateToRestore.wuowNestingLevel;
+}
+
+void LockerImpl::releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) {
+ // Only the global WUOW can be released, since we never need to release and restore
+ // nested WUOW's. Thus we don't have to remember the nesting level.
+ invariant(_wuowNestingLevel == 1);
+ --_wuowNestingLevel;
+ invariant(!isGlobalLockedRecursively());
+
+ // All locks should be pending to unlock.
+ invariant(_requests.size() == _numResourcesToUnlockAtEndUnitOfWork);
+ for (auto it = _requests.begin(); it; it.next()) {
+ // No converted lock so we don't need to unlock more than once.
+ invariant(it->unlockPending == 1);
+ it->unlockPending--;
+ }
+ _numResourcesToUnlockAtEndUnitOfWork = 0;
+
+ saveLockStateAndUnlock(stateOut);
+}
+
+void LockerImpl::restoreWriteUnitOfWorkAndLock(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) {
+ if (stateToRestore.globalMode != MODE_NONE) {
+ restoreLockState(opCtx, stateToRestore);
+ }
+
+ invariant(_numResourcesToUnlockAtEndUnitOfWork == 0);
+ for (auto it = _requests.begin(); it; it.next()) {
+ invariant(_shouldDelayUnlock(it.key(), (it->mode)));
+ invariant(it->unlockPending == 0);
+ it->unlockPending++;
+ }
+ _numResourcesToUnlockAtEndUnitOfWork = static_cast<unsigned>(_requests.size());
+
+ beginWriteUnitOfWork();
+}
+
+void LockerImpl::lock(OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline) {
+ // `lockGlobal` must be called to lock `resourceIdGlobal`.
+ invariant(resId != resourceIdGlobal);
+
+ const LockResult result = _lockBegin(opCtx, resId, mode);
+
+ // Fast, uncontended path
+ if (result == LOCK_OK)
+ return;
+
+ invariant(result == LOCK_WAITING);
+ _lockComplete(opCtx, resId, mode, deadline, nullptr);
+}
+
+void LockerImpl::downgrade(ResourceId resId, LockMode newMode) {
+ LockRequestsMap::Iterator it = _requests.find(resId);
+ getGlobalLockManager()->downgrade(it.objAddr(), newMode);
+}
+
+bool LockerImpl::unlock(ResourceId resId) {
+ LockRequestsMap::Iterator it = _requests.find(resId);
+
+ // Don't attempt to unlock twice. This can happen when an interrupted global lock is destructed.
+ if (it.finished())
+ return false;
+
+ if (inAWriteUnitOfWork() && _shouldDelayUnlock(it.key(), (it->mode))) {
+ // Only delay unlocking if the lock is not acquired more than once. Otherwise, we can simply
+ // call _unlockImpl to decrement recursiveCount instead of incrementing unlockPending. This
+ // is safe because the lock is still being held in the strongest mode necessary.
+ if (it->recursiveCount > 1) {
+ // Invariant that the lock is still being held.
+ invariant(!_unlockImpl(&it));
+ return false;
+ }
+ if (!it->unlockPending) {
+ _numResourcesToUnlockAtEndUnitOfWork++;
+ }
+ it->unlockPending++;
+ // unlockPending will be incremented if a lock is converted or acquired in the same mode
+ // recursively, and unlock() is called multiple times on one ResourceId.
+ invariant(it->unlockPending <= it->recursiveCount);
+ return false;
+ }
+
+ return _unlockImpl(&it);
+}
+
+bool LockerImpl::unlockRSTLforPrepare() {
+ auto rstlRequest = _requests.find(resourceIdReplicationStateTransitionLock);
+
+ // Don't attempt to unlock twice. This can happen when an interrupted global lock is destructed.
+ if (!rstlRequest)
+ return false;
+
+ // If the RSTL is 'unlockPending' and we are fully unlocking it, then we do not want to
+ // attempt to unlock the RSTL when the WUOW ends, since it will already be unlocked.
+ if (rstlRequest->unlockPending) {
+ rstlRequest->unlockPending = 0;
+ _numResourcesToUnlockAtEndUnitOfWork--;
+ }
+
+ // Reset the recursiveCount to 1 so that we fully unlock the RSTL. Since it will be fully
+ // unlocked, any future unlocks will be noops anyways.
+ rstlRequest->recursiveCount = 1;
+
+ return _unlockImpl(&rstlRequest);
+}
+
+LockMode LockerImpl::getLockMode(ResourceId resId) const {
+ scoped_spinlock scopedLock(_lock);
+
+ const LockRequestsMap::ConstIterator it = _requests.find(resId);
+ if (!it)
+ return MODE_NONE;
+
+ return it->mode;
+}
+
+bool LockerImpl::isLockHeldForMode(ResourceId resId, LockMode mode) const {
+ return isModeCovered(mode, getLockMode(resId));
+}
+
+boost::optional<bool> LockerImpl::_globalAndTenantLocksImplyDBOrCollectionLockedForMode(
+ const boost::optional<TenantId>& tenantId, LockMode lockMode) const {
+ if (isW()) {
+ return true;
+ }
+ if (isR() && isSharedLockMode(lockMode)) {
+ return true;
+ }
+ if (tenantId) {
+ const ResourceId tenantResourceId{ResourceType::RESOURCE_TENANT, *tenantId};
+ switch (getLockMode(tenantResourceId)) {
+ case MODE_NONE:
+ return false;
+ case MODE_X:
+ return true;
+ case MODE_S:
+ return isSharedLockMode(lockMode);
+ case MODE_IX:
+ case MODE_IS:
+ break;
+ default:
+ MONGO_UNREACHABLE_TASSERT(6671502);
+ }
+ }
+ return boost::none;
+}
+
+bool LockerImpl::isDbLockedForMode(const DatabaseName& dbName, LockMode mode) const {
+ if (auto lockedForMode =
+ _globalAndTenantLocksImplyDBOrCollectionLockedForMode(dbName.tenantId(), mode);
+ lockedForMode) {
+ return *lockedForMode;
+ }
+
+ const ResourceId resIdDb(RESOURCE_DATABASE, dbName);
+ return isLockHeldForMode(resIdDb, mode);
+}
+
+bool LockerImpl::isCollectionLockedForMode(const NamespaceString& nss, LockMode mode) const {
+ invariant(nss.coll().size());
+
+ if (!shouldConflictWithSecondaryBatchApplication())
+ return true;
+
+ if (auto lockedForMode =
+ _globalAndTenantLocksImplyDBOrCollectionLockedForMode(nss.tenantId(), mode);
+ lockedForMode) {
+ return *lockedForMode;
+ }
+
+ const ResourceId resIdDb(RESOURCE_DATABASE, nss.dbName());
+ LockMode dbMode = getLockMode(resIdDb);
+
+ switch (dbMode) {
+ case MODE_NONE:
+ return false;
+ case MODE_X:
+ return true;
+ case MODE_S:
+ return isSharedLockMode(mode);
+ case MODE_IX:
+ case MODE_IS: {
+ const ResourceId resIdColl(RESOURCE_COLLECTION, nss);
+ return isLockHeldForMode(resIdColl, mode);
+ } break;
+ case LockModesCount:
+ break;
+ }
+
+ MONGO_UNREACHABLE;
+ return false;
+}
+
+bool LockerImpl::wasGlobalLockTakenForWrite() const {
+ return _globalLockMode & ((1 << MODE_IX) | (1 << MODE_X));
+}
+
+bool LockerImpl::wasGlobalLockTakenInModeConflictingWithWrites() const {
+ return _wasGlobalLockTakenInModeConflictingWithWrites.load();
+}
+
+bool LockerImpl::wasGlobalLockTaken() const {
+ return _globalLockMode != (1 << MODE_NONE);
+}
+
+void LockerImpl::setGlobalLockTakenInMode(LockMode mode) {
+ _globalLockMode |= (1 << mode);
+
+ if (mode == MODE_IX || mode == MODE_X || mode == MODE_S) {
+ _wasGlobalLockTakenInModeConflictingWithWrites.store(true);
+ }
+}
+
+ResourceId LockerImpl::getWaitingResource() const {
+ scoped_spinlock scopedLock(_lock);
+
+ return _waitingResource;
+}
+
+MONGO_TSAN_IGNORE
+void LockerImpl::getLockerInfo(LockerInfo* lockerInfo,
+ const boost::optional<SingleThreadedLockStats> lockStatsBase) const {
+ invariant(lockerInfo);
+
+ // Zero-out the contents
+ lockerInfo->locks.clear();
+ lockerInfo->waitingResource = ResourceId();
+ lockerInfo->stats.reset();
+
+ _lock.lock();
+ LockRequestsMap::ConstIterator it = _requests.begin();
+ while (!it.finished()) {
+ OneLock info;
+ info.resourceId = it.key();
+ info.mode = it->mode;
+
+ lockerInfo->locks.push_back(info);
+ it.next();
+ }
+ _lock.unlock();
+
+ std::sort(lockerInfo->locks.begin(), lockerInfo->locks.end());
+
+ lockerInfo->waitingResource = getWaitingResource();
+ lockerInfo->stats.append(_stats);
+
+ // lockStatsBase is a snapshot of lock stats taken when the sub-operation starts. Only
+ // sub-operations have lockStatsBase.
+ if (lockStatsBase)
+ // Adjust the lock stats by subtracting the lockStatsBase. No mutex is needed because
+ // lockStatsBase is immutable.
+ lockerInfo->stats.subtract(*lockStatsBase);
+}
+
+boost::optional<Locker::LockerInfo> LockerImpl::getLockerInfo(
+ const boost::optional<SingleThreadedLockStats> lockStatsBase) const {
+ Locker::LockerInfo lockerInfo;
+ getLockerInfo(&lockerInfo, lockStatsBase);
+ return std::move(lockerInfo);
+}
+
+void LockerImpl::saveLockStateAndUnlock(Locker::LockSnapshot* stateOut) {
+ // We shouldn't be saving and restoring lock state from inside a WriteUnitOfWork.
+ invariant(!inAWriteUnitOfWork());
+
+ // Callers must guarantee that they can actually yield.
+ if (MONGO_unlikely(!canSaveLockState())) {
+ dump();
+ LOGV2_FATAL(7033800,
+ "Attempted to yield locks but we are either not holding locks, holding a "
+ "strong MODE_S/MODE_X lock, or holding one recursively");
+ }
+
+ // Clear out whatever is in stateOut.
+ stateOut->locks.clear();
+ stateOut->globalMode = MODE_NONE;
+
+ // First, we look at the global lock. There is special handling for this so we store it
+ // separately from the more pedestrian locks.
+ auto globalRequest = _requests.find(resourceIdGlobal);
+ invariant(globalRequest);
+
+ stateOut->globalMode = globalRequest->mode;
+ invariant(unlock(resourceIdGlobal));
+
+ // Next, the non-global locks.
+ for (LockRequestsMap::Iterator it = _requests.begin(); !it.finished(); it.next()) {
+ const ResourceId resId = it.key();
+ const ResourceType resType = resId.getType();
+ if (resType == RESOURCE_MUTEX)
+ continue;
+
+ // We should never have to save and restore metadata locks.
+ invariant(RESOURCE_DATABASE == resType || RESOURCE_COLLECTION == resType ||
+ resId == resourceIdParallelBatchWriterMode || RESOURCE_TENANT == resType ||
+ resId == resourceIdFeatureCompatibilityVersion ||
+ resId == resourceIdReplicationStateTransitionLock);
+
+ // And, stuff the info into the out parameter.
+ OneLock info;
+ info.resourceId = resId;
+ info.mode = it->mode;
+ stateOut->locks.push_back(info);
+ invariant(unlock(resId));
+ }
+ invariant(!isLocked());
+
+ // Sort locks by ResourceId. They'll later be acquired in this canonical locking order.
+ std::sort(stateOut->locks.begin(), stateOut->locks.end());
+}
+
+void LockerImpl::restoreLockState(OperationContext* opCtx, const Locker::LockSnapshot& state) {
+ // We shouldn't be restoring lock state from inside a WriteUnitOfWork.
+ invariant(!inAWriteUnitOfWork());
+ invariant(_modeForTicket == MODE_NONE);
+ invariant(_clientState.load() == kInactive);
+
+ getFlowControlTicket(opCtx, state.globalMode);
+
+ std::vector<OneLock>::const_iterator it = state.locks.begin();
+ // If we locked the PBWM, it must be locked before the
+ // resourceIdFeatureCompatibilityVersion, resourceIdReplicationStateTransitionLock, and
+ // resourceIdGlobal resources.
+ if (it != state.locks.end() && it->resourceId == resourceIdParallelBatchWriterMode) {
+ lock(opCtx, it->resourceId, it->mode);
+ it++;
+ }
+
+ // If we locked the FCV lock, it must be locked before the
+ // resourceIdReplicationStateTransitionLock and resourceIdGlobal resources.
+ if (it != state.locks.end() && it->resourceId == resourceIdFeatureCompatibilityVersion) {
+ lock(opCtx, it->resourceId, it->mode);
+ it++;
+ }
+
+ // If we locked the RSTL, it must be locked before the resourceIdGlobal resource.
+ if (it != state.locks.end() && it->resourceId == resourceIdReplicationStateTransitionLock) {
+ lock(opCtx, it->resourceId, it->mode);
+ it++;
+ }
+
+ lockGlobal(opCtx, state.globalMode);
+ for (; it != state.locks.end(); it++) {
+ // Ensures we don't acquire locks out of order which can lead to deadlock.
+ invariant(it->resourceId.getType() != ResourceType::RESOURCE_GLOBAL);
+ lock(opCtx, it->resourceId, it->mode);
+ }
+ invariant(_modeForTicket != MODE_NONE);
+}
+
+MONGO_TSAN_IGNORE
+FlowControlTicketholder::CurOp LockerImpl::getFlowControlStats() const {
+ return _flowControlStats;
+}
+
+MONGO_TSAN_IGNORE
+LockResult LockerImpl::_lockBegin(OperationContext* opCtx, ResourceId resId, LockMode mode) {
+ dassert(!getWaitingResource().isValid());
+
+ // Operations which are holding open an oplog hole cannot block when acquiring locks.
+ if (opCtx && !shouldAllowLockAcquisitionOnTimestampedUnitOfWork() &&
+ resId.getType() != RESOURCE_METADATA && resId.getType() != RESOURCE_MUTEX) {
+ invariant(!opCtx->recoveryUnit()->isTimestamped(),
+ str::stream()
+ << "Operation holding open an oplog hole tried to acquire locks. ResourceId: "
+ << resId << ", mode: " << modeName(mode));
+ }
+
+ LockRequest* request;
+ bool isNew = true;
+
+ LockRequestsMap::Iterator it = _requests.find(resId);
+ if (!it) {
+ scoped_spinlock scopedLock(_lock);
+ LockRequestsMap::Iterator itNew = _requests.insert(resId);
+ itNew->initNew(this, &_notify);
+
+ request = itNew.objAddr();
+ } else {
+ request = it.objAddr();
+ isNew = false;
+ }
+
+ // If unlockPending is nonzero, that means a LockRequest already exists for this resource but
+ // is planned to be released at the end of this WUOW due to two-phase locking. Rather than
+ // unlocking the existing request, we can reuse it if the existing mode matches the new mode.
+ if (request->unlockPending && isModeCovered(mode, request->mode)) {
+ request->unlockPending--;
+ if (!request->unlockPending) {
+ _numResourcesToUnlockAtEndUnitOfWork--;
+ }
+ return LOCK_OK;
+ }
+
+ // Making this call here will record lock re-acquisitions and conversions as well.
+ globalStats.recordAcquisition(_id, resId, mode);
+ _stats.recordAcquisition(resId, mode);
+
+ // Give priority to the full modes for Global, PBWM, and RSTL resources so we don't stall global
+ // operations such as shutdown or stepdown.
+ const ResourceType resType = resId.getType();
+ if (resType == RESOURCE_GLOBAL) {
+ if (mode == MODE_S || mode == MODE_X) {
+ request->enqueueAtFront = true;
+ request->compatibleFirst = true;
+ }
+ } else if (resType != RESOURCE_MUTEX) {
+ // This is all sanity checks that the global locks are always be acquired
+ // before any other lock has been acquired and they must be in sync with the nesting.
+ if (kDebugBuild) {
+ const LockRequestsMap::Iterator itGlobal = _requests.find(resourceIdGlobal);
+ invariant(itGlobal->recursiveCount > 0);
+ invariant(itGlobal->mode != MODE_NONE);
+ };
+ }
+
+ // The notification object must be cleared before we invoke the lock manager, because
+ // otherwise we might reset state if the lock becomes granted very fast.
+ _notify.clear();
+
+ LockResult result = isNew ? getGlobalLockManager()->lock(resId, request, mode)
+ : getGlobalLockManager()->convert(resId, request, mode);
+
+ if (result == LOCK_WAITING) {
+ globalStats.recordWait(_id, resId, mode);
+ _stats.recordWait(resId, mode);
+ _setWaitingResource(resId);
+ } else if (result == LOCK_OK && opCtx && _uninterruptibleLocksRequested == 0) {
+ // Lock acquisitions are not allowed to succeed when opCtx is marked as interrupted, unless
+ // the caller requested an uninterruptible lock.
+ auto interruptStatus = opCtx->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ auto unlockIt = _requests.find(resId);
+ invariant(unlockIt);
+ _unlockImpl(&unlockIt);
+ uassertStatusOK(interruptStatus);
+ }
+ }
+
+ return result;
+}
+
+void LockerImpl::_lockComplete(OperationContext* opCtx,
+ ResourceId resId,
+ LockMode mode,
+ Date_t deadline,
+ const LockTimeoutCallback& onTimeout) {
+ // Operations which are holding open an oplog hole cannot block when acquiring locks. Lock
+ // requests entering this function have been queued up and will be granted the lock as soon as
+ // the lock is released, which is a blocking operation.
+ if (opCtx && !shouldAllowLockAcquisitionOnTimestampedUnitOfWork() &&
+ resId.getType() != RESOURCE_METADATA && resId.getType() != RESOURCE_MUTEX) {
+ invariant(!opCtx->recoveryUnit()->isTimestamped(),
+ str::stream()
+ << "Operation holding open an oplog hole tried to acquire locks. ResourceId: "
+ << resId << ", mode: " << modeName(mode));
+ }
+
+ // Clean up the state on any failed lock attempts.
+ ScopeGuard unlockOnErrorGuard([&] {
+ LockRequestsMap::Iterator it = _requests.find(resId);
+ invariant(it);
+ _unlockImpl(&it);
+ _setWaitingResource(ResourceId());
+ });
+
+ // This failpoint is used to time out non-intent locks if they cannot be granted immediately
+ // for user operations. Testing-only.
+ const bool isUserOperation = opCtx && opCtx->getClient()->isFromUserConnection();
+ if (!_uninterruptibleLocksRequested && isUserOperation &&
+ MONGO_unlikely(failNonIntentLocksIfWaitNeeded.shouldFail())) {
+ uassert(ErrorCodes::LockTimeout,
+ str::stream() << "Cannot immediately acquire lock '" << resId.toString()
+ << "'. Timing out due to failpoint.",
+ (mode == MODE_IS || mode == MODE_IX));
+ }
+
+ LockResult result;
+ Milliseconds timeout;
+ if (deadline == Date_t::max()) {
+ timeout = Milliseconds::max();
+ } else if (deadline <= Date_t()) {
+ timeout = Milliseconds(0);
+ } else {
+ timeout = deadline - Date_t::now();
+ }
+ timeout = std::min(timeout, _maxLockTimeout ? *_maxLockTimeout : Milliseconds::max());
+ if (_uninterruptibleLocksRequested) {
+ timeout = Milliseconds::max();
+ }
+
+ // Don't go sleeping without bound in order to be able to report long waits.
+ Milliseconds waitTime = std::min(timeout, MaxWaitTime);
+ const uint64_t startOfTotalWaitTime = curTimeMicros64();
+ uint64_t startOfCurrentWaitTime = startOfTotalWaitTime;
+
+ while (true) {
+ // It is OK if this call wakes up spuriously, because we re-evaluate the remaining
+ // wait time anyways.
+ // If we have an operation context, we want to use its interruptible wait so that
+ // pending lock acquisitions can be cancelled, so long as no callers have requested an
+ // uninterruptible lock.
+ if (opCtx && _uninterruptibleLocksRequested == 0) {
+ result = _notify.wait(opCtx, waitTime);
+ } else {
+ result = _notify.wait(waitTime);
+ }
+
+ // Account for the time spent waiting on the notification object
+ const uint64_t curTimeMicros = curTimeMicros64();
+ const uint64_t elapsedTimeMicros = curTimeMicros - startOfCurrentWaitTime;
+ startOfCurrentWaitTime = curTimeMicros;
+
+ globalStats.recordWaitTime(_id, resId, mode, elapsedTimeMicros);
+ _stats.recordWaitTime(resId, mode, elapsedTimeMicros);
+
+ if (result == LOCK_OK)
+ break;
+
+ // If infinite timeout was requested, just keep waiting
+ if (timeout == Milliseconds::max()) {
+ continue;
+ }
+
+ const auto totalBlockTime = duration_cast<Milliseconds>(
+ Microseconds(int64_t(curTimeMicros - startOfTotalWaitTime)));
+ waitTime = (totalBlockTime < timeout) ? std::min(timeout - totalBlockTime, MaxWaitTime)
+ : Milliseconds(0);
+
+ // Check if the lock acquisition has timed out. If we have an operation context and client
+ // we can provide additional diagnostics data.
+ if (waitTime == Milliseconds(0)) {
+ if (onTimeout) {
+ onTimeout();
+ }
+ std::string timeoutMessage = str::stream()
+ << "Unable to acquire " << modeName(mode) << " lock on '" << resId.toString()
+ << "' within " << timeout << ".";
+ if (opCtx && opCtx->getClient()) {
+ timeoutMessage = str::stream()
+ << timeoutMessage << " opId: " << opCtx->getOpID()
+ << ", op: " << opCtx->getClient()->desc()
+ << ", connId: " << opCtx->getClient()->getConnectionId() << ".";
+ }
+ uasserted(ErrorCodes::LockTimeout, timeoutMessage);
+ }
+ }
+
+ invariant(result == LOCK_OK);
+ unlockOnErrorGuard.dismiss();
+ _setWaitingResource(ResourceId());
+}
+
+void LockerImpl::getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) {
+ auto ticketholder = FlowControlTicketholder::get(opCtx);
+ if (ticketholder && lockMode == LockMode::MODE_IX && _clientState.load() == kInactive &&
+ _admCtx.getPriority() != AdmissionContext::Priority::kImmediate &&
+ !_uninterruptibleLocksRequested) {
+ // FlowControl only acts when a MODE_IX global lock is being taken. The clientState is only
+ // being modified here to change serverStatus' `globalLock.currentQueue` metrics. This
+ // method must not exit with a side-effect on the clientState. That value is also used for
+ // tracking whether other resources need to be released.
+ _clientState.store(kQueuedWriter);
+ ScopeGuard restoreState([&] { _clientState.store(kInactive); });
+ // Acquiring a ticket is a potentially blocking operation. This must not be called after a
+ // transaction timestamp has been set, indicating this transaction has created an oplog
+ // hole.
+ invariant(!opCtx->recoveryUnit()->isTimestamped());
+ ticketholder->getTicket(opCtx, &_flowControlStats);
+ }
+}
+
+LockResult LockerImpl::lockRSTLBegin(OperationContext* opCtx, LockMode mode) {
+ bool testOnly = false;
+
+ if (MONGO_unlikely(enableTestOnlyFlagforRSTL.shouldFail())) {
+ testOnly = true;
+ }
+
+ invariant(testOnly || mode == MODE_IX || mode == MODE_X);
+ return _lockBegin(opCtx, resourceIdReplicationStateTransitionLock, mode);
+}
+
+void LockerImpl::lockRSTLComplete(OperationContext* opCtx,
+ LockMode mode,
+ Date_t deadline,
+ const LockTimeoutCallback& onTimeout) {
+ _lockComplete(opCtx, resourceIdReplicationStateTransitionLock, mode, deadline, onTimeout);
+}
+
+void LockerImpl::releaseTicket() {
+ invariant(_modeForTicket != MODE_NONE);
+ _releaseTicket();
+}
+
+void LockerImpl::_releaseTicket() {
+ _ticket.reset();
+ _clientState.store(kInactive);
+}
+
+bool LockerImpl::_unlockImpl(LockRequestsMap::Iterator* it) {
+ if (getGlobalLockManager()->unlock(it->objAddr())) {
+ if (it->key() == resourceIdGlobal) {
+ invariant(_modeForTicket != MODE_NONE);
+
+ // We may have already released our ticket through a call to releaseTicket().
+ if (_clientState.load() != kInactive) {
+ _releaseTicket();
+ }
+
+ _modeForTicket = MODE_NONE;
+ }
+
+ scoped_spinlock scopedLock(_lock);
+ it->remove();
+
+ return true;
+ }
+
+ return false;
+}
+
+bool LockerImpl::isGlobalLockedRecursively() {
+ auto globalLockRequest = _requests.find(resourceIdGlobal);
+ return !globalLockRequest.finished() && globalLockRequest->recursiveCount > 1;
+}
+
+bool LockerImpl::canSaveLockState() {
+ // We cannot yield strong global locks.
+ if (_modeForTicket == MODE_S || _modeForTicket == MODE_X) {
+ return false;
+ }
+
+ // If we don't have a global lock, we do not yield.
+ if (_modeForTicket == MODE_NONE) {
+ auto globalRequest = _requests.find(resourceIdGlobal);
+ invariant(!globalRequest);
+
+ // If there's no global lock there isn't really anything to do. Check that.
+ for (auto it = _requests.begin(); !it.finished(); it.next()) {
+ invariant(it.key().getType() == RESOURCE_MUTEX);
+ }
+ return false;
+ }
+
+ for (auto it = _requests.begin(); !it.finished(); it.next()) {
+ const ResourceId resId = it.key();
+ const ResourceType resType = resId.getType();
+ if (resType == RESOURCE_MUTEX)
+ continue;
+
+ // If any lock has been acquired more than once, we're probably somewhere in a
+ // DBDirectClient call. It's not safe to release and reacquire locks -- the context using
+ // the DBDirectClient is probably not prepared for lock release. This logic applies to all
+ // locks in the hierarchy.
+ if (it->recursiveCount > 1) {
+ return false;
+ }
+
+ // We cannot yield any other lock in a strong lock mode.
+ if (it->mode == MODE_S || it->mode == MODE_X) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void LockerImpl::_setWaitingResource(ResourceId resId) {
+ scoped_spinlock scopedLock(_lock);
+
+ _waitingResource = resId;
+}
+
+//
+// Auto classes
+//
+
+namespace {
+/**
+ * Periodically purges unused lock buckets. The first time the lock is used again after
+ * cleanup it needs to be allocated, and similarly, every first use by a client for an intent
+ * mode may need to create a partitioned lock head. Cleanup is done roughly once a minute.
+ */
+class UnusedLockCleaner : PeriodicTask {
+public:
+ std::string taskName() const {
+ return "UnusedLockCleaner";
+ }
+
+ void taskDoWork() {
+ LOGV2_DEBUG(20524, 2, "cleaning up unused lock buckets of the global lock manager");
+ getGlobalLockManager()->cleanupUnusedLocks();
+ }
+} unusedLockCleaner;
+} // namespace
+
+//
+// Standalone functions
+//
+
+void reportGlobalLockingStats(SingleThreadedLockStats* outStats) {
+ globalStats.report(outStats);
+}
+
+void resetGlobalLockStats() {
+ globalStats.reset();
+}
+
+} // namespace mongo