diff options
author | Louis Williams <louis.williams@mongodb.com> | 2018-02-02 17:35:30 -0500 |
---|---|---|
committer | Louis Williams <louis.williams@mongodb.com> | 2018-03-01 11:17:53 -0500 |
commit | 7d7969eb7439b08207e29437e94fc1db5459c205 (patch) | |
tree | f33b98b63c5f3b38fc51f0eed5695267660c3033 /src | |
parent | db6e903221712585f830cacead3269a0fbf44f1b (diff) | |
download | mongo-7d7969eb7439b08207e29437e94fc1db5459c205.tar.gz |
SERVER-32638: Allow lock acquisition to be interruptable
Diffstat (limited to 'src')
30 files changed, 553 insertions, 147 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index cc732182989..ed133252c5b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -473,7 +473,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/logical_session_id', - '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/db/storage/write_unit_of_work', '$BUILD_DIR/mongo/transport/transport_layer_common', '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/util/concurrency/spin_lock', diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp index 06ced3cd0be..b4637e0da34 100644 --- a/src/mongo/db/catalog/drop_database.cpp +++ b/src/mongo/db/catalog/drop_database.cpp @@ -117,6 +117,7 @@ Status dropDatabase(OperationContext* opCtx, const std::string& dbName) { using Result = boost::optional<Status>; // Get an optional result--if it's there, early return; otherwise, wait for collections to drop. auto result = writeConflictRetry(opCtx, "dropDatabase_collection", dbName, [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); Lock::GlobalWrite lk(opCtx); AutoGetDb autoDB(opCtx, dbName, MODE_X); Database* const db = autoDB.getDb(); @@ -197,6 +198,7 @@ Status dropDatabase(OperationContext* opCtx, const std::string& dbName) { // If waitForWriteConcern() returns an error or throws an exception, we should reset the // drop-pending state on Database. auto dropPendingGuardWhileAwaitingReplication = MakeGuard([dbName, opCtx] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); Lock::GlobalWrite lk(opCtx); AutoGetDb autoDB(opCtx, dbName, MODE_X); if (auto db = autoDB.getDb()) { @@ -247,6 +249,7 @@ Status dropDatabase(OperationContext* opCtx, const std::string& dbName) { dropPendingGuardWhileAwaitingReplication.Dismiss(); return writeConflictRetry(opCtx, "dropDatabase_database", dbName, [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); Lock::GlobalWrite lk(opCtx); AutoGetDb autoDB(opCtx, dbName, MODE_X); auto db = autoDB.getDb(); diff --git a/src/mongo/db/catalog/index_create_impl.cpp b/src/mongo/db/catalog/index_create_impl.cpp index 15210df625c..a783c41e97e 100644 --- a/src/mongo/db/catalog/index_create_impl.cpp +++ b/src/mongo/db/catalog/index_create_impl.cpp @@ -460,7 +460,7 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection(std::set<RecordId>* d } if (_buildInBackground) { - _opCtx->lockState()->restoreLockState(lockInfo); + _opCtx->lockState()->restoreLockState(_opCtx, lockInfo); _opCtx->recoveryUnit()->abandonSnapshot(); return Status(ErrorCodes::OperationFailed, "background index build aborted due to failpoint"); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index a687ec89a29..80f31bc4021 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -364,6 +364,9 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) { * Clean up the temporary and incremental collections */ void State::dropTempCollections() { + // The cleanup handler should not be interruptable. + UninterruptableLockGuard noInterrupt(_opCtx->lockState()); + if (!_config.tempNamespace.isEmpty()) { writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.tempNamespace.ns(), [this] { AutoGetDb autoDb(_opCtx, _config.tempNamespace.db(), MODE_X); @@ -1011,6 +1014,7 @@ void State::bailFromJS() { Collection* State::getCollectionOrUassert(OperationContext* opCtx, Database* db, const NamespaceString& nss) { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); Collection* out = db ? db->getCollection(opCtx, nss) : NULL; uassert(18697, "Collection unexpectedly disappeared: " + nss.ns(), out); return out; @@ -1397,6 +1401,8 @@ public: string& errmsg, BSONObjBuilder& result) { Timer t; + // Don't let a lock acquisition in map-reduce get interrupted. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmd)) @@ -1729,6 +1735,9 @@ public: << " which lives on config servers")); } + // Don't let any lock acquisitions get interrupted. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); + boost::optional<DisableDocumentValidation> maybeDisableValidation; if (shouldBypassDocumentValidationForCommand(cmdObj)) maybeDisableValidation.emplace(opCtx); diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp index bda7c274100..7981cfaa2a7 100644 --- a/src/mongo/db/concurrency/d_concurrency.cpp +++ b/src/mongo/db/concurrency/d_concurrency.cpp @@ -171,7 +171,7 @@ void Lock::GlobalLock::_enqueue(LockMode lockMode, Date_t deadline) { void Lock::GlobalLock::waitForLockUntil(Date_t deadline) { if (_result == LOCK_WAITING) { - _result = _opCtx->lockState()->lockGlobalComplete(deadline); + _result = _opCtx->lockState()->lockGlobalComplete(_opCtx, deadline); } if (_result != LOCK_OK && _opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()) { @@ -208,7 +208,7 @@ Lock::DBLock::DBLock(OperationContext* opCtx, StringData db, LockMode mode, Date _mode = MODE_X; } - _result = _opCtx->lockState()->lock(_id, _mode, deadline); + _result = _opCtx->lockState()->lock(_opCtx, _id, _mode, deadline); invariant(_result == LOCK_OK || deadline != Date_t::max()); } @@ -238,7 +238,7 @@ void Lock::DBLock::relockWithMode(LockMode newMode) { _opCtx->lockState()->unlock(_id); _mode = newMode; - invariant(LOCK_OK == _opCtx->lockState()->lock(_id, _mode)); + invariant(LOCK_OK == _opCtx->lockState()->lock(_opCtx, _id, _mode)); } diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index 95177c687bc..1f4fb976f06 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/storage/recovery_unit_noop.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" @@ -46,6 +47,7 @@ #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/progress_meter.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" namespace mongo { @@ -110,6 +112,26 @@ public: return clients; } + stdx::future<void> runTaskAndKill(OperationContext* opCtx, + stdx::function<void()> fn, + stdx::function<void()> postKill = nullptr) { + auto task = stdx::packaged_task<void()>(fn); + auto result = task.get_future(); + stdx::thread taskThread{std::move(task)}; + + auto taskThreadJoiner = MakeGuard([&] { taskThread.join(); }); + + { + stdx::lock_guard<Client> clientLock(*opCtx->getClient()); + opCtx->markKilled(); + } + + if (postKill) + postKill(); + + return result; + } + /** * Calls fn the given number of iterations, spread out over up to maxThreads threads. * The threadNr passed is an integer between 0 and maxThreads exclusive. Logs timing @@ -634,6 +656,101 @@ TEST_F(DConcurrencyTestFixture, TempReleaseRecursive) { ASSERT(lockState->isW()); } +TEST_F(DConcurrencyTestFixture, GlobalLockWaitIsInterruptable) { + auto clients = makeKClientsWithLockers<DefaultLockerImpl>(2); + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + + // The main thread takes an exclusive lock, causing the spawned thread to wait when it attempts + // to acquire a conflicting lock. + Lock::GlobalLock GlobalLock(opCtx1, MODE_X, Date_t::max()); + + auto result = runTaskAndKill(opCtx2, [&]() { + // Killing the lock wait should throw an exception. + Lock::GlobalLock g(opCtx2, MODE_S, Date_t::max()); + }); + + ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted); +} + +TEST_F(DConcurrencyTestFixture, GlobalLockWaitIsInterruptableMMAP) { + auto clients = makeKClientsWithLockers<MMAPV1LockerImpl>(2); + + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + + // The main thread takes an exclusive lock, causing the spawned thread to wait when it attempts + // to acquire a conflicting lock. + Lock::GlobalLock GlobalLock(opCtx1, MODE_X, Date_t::max()); + + // This thread attemps to acquire a conflicting lock, which will block until the first + // unlocks. + auto result = runTaskAndKill(opCtx2, [&]() { + // Killing the lock wait should throw an exception. + Lock::GlobalLock g(opCtx2, MODE_S, Date_t::max()); + }); + + ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted); +} + +TEST_F(DConcurrencyTestFixture, DBLockWaitIsInterruptable) { + auto clients = makeKClientsWithLockers<DefaultLockerImpl>(2); + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + + // The main thread takes an exclusive lock, causing the spawned thread to wait when it attempts + // to acquire a conflicting lock. + Lock::DBLock dbLock(opCtx1, "db", MODE_X); + + auto result = runTaskAndKill(opCtx2, [&]() { + // This lock conflicts with the other DBLock. + Lock::DBLock d(opCtx2, "db", MODE_S); + }); + + ASSERT_THROWS_CODE(result.get(), AssertionException, ErrorCodes::Interrupted); +} + +TEST_F(DConcurrencyTestFixture, GlobalLockWaitIsNotInterruptableWithLockGuard) { + auto clients = makeKClientsWithLockers<DefaultLockerImpl>(2); + + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + // The main thread takes an exclusive lock, causing the spawned thread wait when it attempts to + // acquire a conflicting lock. + boost::optional<Lock::GlobalLock> globalLock = Lock::GlobalLock(opCtx1, MODE_X, Date_t::max()); + + // Killing the lock wait should not interrupt it. + auto result = runTaskAndKill(opCtx2, + [&]() { + UninterruptableLockGuard noInterrupt(opCtx2->lockState()); + Lock::GlobalLock g(opCtx2, MODE_S, Date_t::max()); + }, + [&]() { globalLock.reset(); }); + // Should not throw an exception. + result.get(); +} + +TEST_F(DConcurrencyTestFixture, DBLockWaitIsNotInterruptableWithLockGuard) { + auto clients = makeKClientsWithLockers<DefaultLockerImpl>(2); + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + + // The main thread takes an exclusive lock, causing the spawned thread to wait when it attempts + // to acquire a conflicting lock. + boost::optional<Lock::DBLock> dbLock = Lock::DBLock(opCtx1, "db", MODE_X); + + // Killing the lock wait should not interrupt it. + auto result = runTaskAndKill(opCtx2, + [&]() { + UninterruptableLockGuard noInterrupt(opCtx2->lockState()); + Lock::DBLock d(opCtx2, "db", MODE_S); + }, + [&] { dbLock.reset(); }); + // Should not throw an exception. + result.get(); +} + + TEST_F(DConcurrencyTestFixture, DBLockTakesS) { auto opCtx = makeOpCtx(); opCtx->swapLockState(stdx::make_unique<MMAPV1LockerImpl>()); diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index a6dac2d5d5f..42d9d553c80 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -43,6 +43,7 @@ #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace { @@ -223,6 +224,15 @@ LockResult CondVarLockGrantNotification::wait(Milliseconds timeout) { : LOCK_TIMEOUT; } +LockResult CondVarLockGrantNotification::wait(OperationContext* opCtx, Milliseconds timeout) { + invariant(opCtx); + stdx::unique_lock<stdx::mutex> lock(_mutex); + return opCtx->waitForConditionOrInterruptFor( + _cond, lock, timeout, [this] { return _result != LOCK_INVALID; }) + ? _result + : LOCK_TIMEOUT; +} + void CondVarLockGrantNotification::notify(ResourceId resId, LockResult result) { stdx::unique_lock<stdx::mutex> lock(_mutex); invariant(_result == LOCK_INVALID); @@ -282,11 +292,11 @@ Locker::ClientState LockerImpl<IsForMMAPV1>::getClientState() const { } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockGlobal(LockMode mode) { +LockResult LockerImpl<IsForMMAPV1>::lockGlobal(OperationContext* opCtx, LockMode mode) { LockResult result = _lockGlobalBegin(mode, Date_t::max()); if (result == LOCK_WAITING) { - result = lockGlobalComplete(Date_t::max()); + result = lockGlobalComplete(opCtx, Date_t::max()); } if (result == LOCK_OK) { @@ -342,8 +352,8 @@ LockResult LockerImpl<IsForMMAPV1>::_lockGlobalBegin(LockMode mode, Date_t deadl } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(Date_t deadline) { - return lockComplete(resourceIdGlobal, getLockMode(resourceIdGlobal), deadline, false); +LockResult LockerImpl<IsForMMAPV1>::lockGlobalComplete(OperationContext* opCtx, Date_t deadline) { + return lockComplete(opCtx, resourceIdGlobal, getLockMode(resourceIdGlobal), deadline, false); } template <bool IsForMMAPV1> @@ -439,10 +449,9 @@ void LockerImpl<IsForMMAPV1>::endWriteUnitOfWork() { } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId, - LockMode mode, - Date_t deadline, - bool checkDeadlock) { +LockResult LockerImpl<IsForMMAPV1>::lock( + OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { + const LockResult result = lockBegin(resId, mode); // Fast, uncontended path @@ -453,7 +462,7 @@ LockResult LockerImpl<IsForMMAPV1>::lock(ResourceId resId, // unsuccessful result that the lock manager would return is LOCK_WAITING. invariant(result == LOCK_WAITING); - return lockComplete(resId, mode, deadline, checkDeadlock); + return lockComplete(opCtx, resId, mode, deadline, checkDeadlock); } template <bool IsForMMAPV1> @@ -470,6 +479,9 @@ bool LockerImpl<IsForMMAPV1>::unlock(ResourceId resId) { return false; } + // Don't attempt to unlock twice. This can happen when an interrupted global lock is destructed. + if (it.finished()) + return false; return _unlockImpl(&it); } @@ -643,7 +655,8 @@ bool LockerImpl<IsForMMAPV1>::saveLockStateAndUnlock(Locker::LockSnapshot* state } template <bool IsForMMAPV1> -void LockerImpl<IsForMMAPV1>::restoreLockState(const Locker::LockSnapshot& state) { +void LockerImpl<IsForMMAPV1>::restoreLockState(OperationContext* opCtx, + const Locker::LockSnapshot& state) { // We shouldn't be saving and restoring lock state from inside a WriteUnitOfWork. invariant(!inAWriteUnitOfWork()); invariant(_modeForTicket == MODE_NONE); @@ -651,11 +664,11 @@ void LockerImpl<IsForMMAPV1>::restoreLockState(const Locker::LockSnapshot& state std::vector<OneLock>::const_iterator it = state.locks.begin(); // If we locked the PBWM, it must be locked before the resourceIdGlobal resource. if (it != state.locks.end() && it->resourceId == resourceIdParallelBatchWriterMode) { - invariant(LOCK_OK == lock(it->resourceId, it->mode)); + invariant(LOCK_OK == lock(opCtx, it->resourceId, it->mode)); it++; } - invariant(LOCK_OK == lockGlobal(state.globalMode)); + invariant(LOCK_OK == lockGlobal(opCtx, state.globalMode)); for (; it != state.locks.end(); it++) { // This is a sanity check that lockGlobal restored the MMAP V1 flush lock in the // expected mode. @@ -729,10 +742,8 @@ LockResult LockerImpl<IsForMMAPV1>::lockBegin(ResourceId resId, LockMode mode) { } template <bool IsForMMAPV1> -LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, - LockMode mode, - Date_t deadline, - bool checkDeadlock) { +LockResult LockerImpl<IsForMMAPV1>::lockComplete( + OperationContext* opCtx, ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { // Under MMAP V1 engine a deadlock can occur if a thread goes to sleep waiting on // DB lock, while holding the flush lock, so it has to be released. This is only // correct to do if not in a write unit of work. @@ -742,6 +753,13 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, if (yieldFlushLock) { invariant(unlock(resourceIdMMAPV1Flush)); } + auto relockFlushLockGuard = MakeGuard([&] { + if (yieldFlushLock) { + // We cannot obey the timeout here, because it is not correct to return from the lock + // request with the flush lock released. + invariant(LOCK_OK == lock(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); + } + }); LockResult result; Milliseconds timeout; @@ -759,10 +777,23 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, const uint64_t startOfTotalWaitTime = curTimeMicros64(); uint64_t startOfCurrentWaitTime = startOfTotalWaitTime; + // Clean up the state on any failed lock attempts. + auto unlockOnErrorGuard = MakeGuard([&] { + LockRequestsMap::Iterator it = _requests.find(resId); + _unlockImpl(&it); + }); + while (true) { // It is OK if this call wakes up spuriously, because we re-evaluate the remaining // wait time anyways. - result = _notify.wait(waitTime); + // If we have an operation context, we want to use its interruptable wait so that + // pending lock acquisitions can be cancelled, so long as no callers have requested an + // uninterruptable lock. + if (opCtx && _uninterruptableLocksRequested == 0) { + result = _notify.wait(opCtx, waitTime); + } else { + result = _notify.wait(waitTime); + } // Account for the time spent waiting on the notification object const uint64_t curTimeMicros = curTimeMicros64(); @@ -803,21 +834,12 @@ LockResult LockerImpl<IsForMMAPV1>::lockComplete(ResourceId resId, } } - // Cleanup the state, since this is an unused lock now. // Note: in case of the _notify object returning LOCK_TIMEOUT, it is possible to find that the // lock was still granted after all, but we don't try to take advantage of that and will return // a timeout. - if (result != LOCK_OK) { - LockRequestsMap::Iterator it = _requests.find(resId); - _unlockImpl(&it); - } - - if (yieldFlushLock) { - // We cannot obey the timeout here, because it is not correct to return from the lock - // request with the flush lock released. - invariant(LOCK_OK == lock(resourceIdMMAPV1Flush, _getModeForMMAPV1FlushLock())); + if (result == LOCK_OK) { + unlockOnErrorGuard.Dismiss(); } - return result; } diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 645a9c094ae..71448987d92 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -32,6 +32,7 @@ #include "mongo/db/concurrency/fast_map_noalloc.h" #include "mongo/db/concurrency/locker.h" +#include "mongo/db/operation_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/util/concurrency/spin_lock.h" @@ -59,6 +60,15 @@ public: */ LockResult wait(Milliseconds timeout); + /** + * Interruptible blocking method, which waits for the notification to fire or an interrupt from + * the operation context. + * + * @param opCtx OperationContext to wait on for an interrupt. + * @param timeout How many milliseconds to wait before returning LOCK_TIMEOUT. + */ + LockResult wait(OperationContext* opCtx, Milliseconds timeout); + private: virtual void notify(ResourceId resId, LockResult result); @@ -105,11 +115,17 @@ public: _sharedLocksShouldTwoPhaseLock = sharedLocksShouldTwoPhaseLock; } - virtual LockResult lockGlobal(LockMode mode); + virtual LockResult lockGlobal(OperationContext* opCtx, LockMode mode); + virtual LockResult lockGlobal(LockMode mode) { + return lockGlobal(nullptr, mode); + } virtual LockResult lockGlobalBegin(LockMode mode, Date_t deadline) { return _lockGlobalBegin(mode, deadline); } - virtual LockResult lockGlobalComplete(Date_t deadline); + virtual LockResult lockGlobalComplete(OperationContext* opCtx, Date_t deadline); + virtual LockResult lockGlobalComplete(Date_t deadline) { + return lockGlobalComplete(nullptr, deadline); + } virtual void lockMMAPV1Flush(); virtual void downgradeGlobalXtoSForMMAPV1(); @@ -122,11 +138,25 @@ public: return _wuowNestingLevel > 0; } - virtual LockResult lock(ResourceId resId, + /** + * Requests a lock for resource 'resId' with mode 'mode'. An OperationContext 'opCtx' must be + * provided to interrupt waiting on the locker condition variable that indicates status of + * the lock acquisition. A lock operation would otherwise wait until a timeout or the lock is + * granted. + */ + virtual LockResult lock(OperationContext* opCtx, + ResourceId resId, LockMode mode, Date_t deadline = Date_t::max(), bool checkDeadlock = false); + virtual LockResult lock(ResourceId resId, + LockMode mode, + Date_t deadline = Date_t::max(), + bool checkDeadlock = false) { + return lock(nullptr, resId, mode, deadline, checkDeadlock); + } + virtual void downgrade(ResourceId resId, LockMode newMode); virtual bool unlock(ResourceId resId); @@ -142,7 +172,10 @@ public: virtual bool saveLockStateAndUnlock(LockSnapshot* stateOut); - virtual void restoreLockState(const LockSnapshot& stateToRestore); + virtual void restoreLockState(OperationContext* opCtx, const LockSnapshot& stateToRestore); + virtual void restoreLockState(const LockSnapshot& stateToRestore) { + restoreLockState(nullptr, stateToRestore); + } virtual void releaseTicket(); @@ -175,13 +208,23 @@ public: * Waits for the completion of a lock, previously requested through lockBegin or * lockGlobalBegin. Must only be called, if lockBegin returned LOCK_WAITING. * + * @param opCtx Operation context that, if not null, will be used to allow interruptable lock + * acquisition. * @param resId Resource id which was passed to an earlier lockBegin call. Must match. * @param mode Mode which was passed to an earlier lockBegin call. Must match. * @param deadline The absolute time point when this lock acquisition will time out, if not yet * granted. * @param checkDeadlock whether to perform deadlock detection while waiting. */ - LockResult lockComplete(ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock); + LockResult lockComplete(OperationContext* opCtx, + ResourceId resId, + LockMode mode, + Date_t deadline, + bool checkDeadlock); + + LockResult lockComplete(ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { + return lockComplete(nullptr, resId, mode, deadline, checkDeadlock); + } private: friend class AutoYieldFlushLockForMMAPV1Commit; diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index 700f2b6856b..913d47eac9e 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -33,6 +33,7 @@ #include "mongo/db/concurrency/lock_manager.h" #include "mongo/db/concurrency/lock_stats.h" +#include "mongo/db/operation_context.h" #include "mongo/stdx/thread.h" namespace mongo { @@ -46,6 +47,8 @@ namespace mongo { class Locker { MONGO_DISALLOW_COPYING(Locker); + friend class UninterruptableLockGuard; + public: virtual ~Locker() {} @@ -108,6 +111,7 @@ public: * This method can be called recursively, but each call to lockGlobal must be accompanied * by a call to unlockGlobal. * + * @param opCtx OperationContext used to interrupt the lock waiting, if provided. * @param mode Mode in which the global lock should be acquired. Also indicates the intent * of the operation. * @@ -115,6 +119,7 @@ public: * acquired within the specified time bound. Otherwise, the respective failure * code and neither lock will be acquired. */ + virtual LockResult lockGlobal(OperationContext* opCtx, LockMode mode) = 0; virtual LockResult lockGlobal(LockMode mode) = 0; /** @@ -126,6 +131,12 @@ public: * method has a deadline for use with the TicketHolder, if there is one. */ virtual LockResult lockGlobalBegin(LockMode mode, Date_t deadline) = 0; + + /** + * Calling lockGlobalComplete without an OperationContext does not allow the lock acquisition + * to be interrupted. + */ + virtual LockResult lockGlobalComplete(OperationContext* opCtx, Date_t deadline) = 0; virtual LockResult lockGlobalComplete(Date_t deadline) = 0; /** @@ -175,6 +186,7 @@ public: * of the lock. Therefore, each call, which returns LOCK_OK must be matched with a * corresponding call to unlock. * + * @param opCtx If provided, will be used to interrupt a LOCK_WAITING state. * @param resId Id of the resource to be locked. * @param mode Mode in which the resource should be locked. Lock upgrades are allowed. * @param deadline How long to wait for the lock to be granted, before @@ -187,6 +199,16 @@ public: * * @return All LockResults except for LOCK_WAITING, because it blocks. */ + virtual LockResult lock(OperationContext* opCtx, + ResourceId resId, + LockMode mode, + Date_t deadline = Date_t::max(), + bool checkDeadlock = false) = 0; + + /** + * Calling lock without an OperationContext does not allow LOCK_WAITING states to be + * interrupted. + */ virtual LockResult lock(ResourceId resId, LockMode mode, Date_t deadline = Date_t::max(), @@ -297,7 +319,9 @@ public: /** * Re-locks all locks whose state was stored in 'stateToRestore'. + * @param opCtx An operation context that enables the restoration to be interrupted. */ + virtual void restoreLockState(OperationContext* opCtx, const LockSnapshot& stateToRestore) = 0; virtual void restoreLockState(const LockSnapshot& stateToRestore) = 0; /** @@ -362,9 +386,50 @@ public: protected: Locker() {} + /** + * The number of callers that are guarding from lock interruptions. + * When 0, all lock acquisitions are interruptable. When positive, no lock acquisitions + * are interruptable. This is only true for database and global locks. Collection locks are + * never interruptable. + */ + int _uninterruptableLocksRequested = 0; + private: bool _shouldConflictWithSecondaryBatchApplication = true; bool _shouldAcquireTicket = true; }; +/** + * This class prevents lock acquisitions from being interrupted when it is in scope. + * The default behavior of acquisitions depends on the type of lock that is being requested. + * Use this in the unlikely case that waiting for a lock can't be interrupted. + * + * Lock acquisitions can still return LOCK_TIMEOUT, just not if the parent operation + * context is killed first. + * + * It is possible that multiple callers are requesting uninterruptable behavior, so the guard + * increments a counter on the Locker class to indicate how may guards are active. + */ +class UninterruptableLockGuard { +public: + /* + * Accepts a Locker, and increments the _uninterruptableLocksRequested. Decrements the + * counter when destoyed. + */ + explicit UninterruptableLockGuard(Locker* locker) : _locker(locker) { + invariant(_locker); + invariant(_locker->_uninterruptableLocksRequested >= 0); + invariant(_locker->_uninterruptableLocksRequested < std::numeric_limits<int>::max()); + _locker->_uninterruptableLocksRequested += 1; + } + + ~UninterruptableLockGuard() { + invariant(_locker->_uninterruptableLocksRequested > 0); + _locker->_uninterruptableLocksRequested -= 1; + } + +private: + Locker* const _locker; +}; + } // namespace mongo diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index ac1419c0597..c29a66d1bc3 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -46,43 +46,50 @@ public: } virtual ClientState getClientState() const { - invariant(false); + MONGO_UNREACHABLE; } virtual LockerId getId() const { - invariant(false); + MONGO_UNREACHABLE; } stdx::thread::id getThreadId() const override { - invariant(false); + MONGO_UNREACHABLE; } void setSharedLocksShouldTwoPhaseLock(bool sharedLocksShouldTwoPhaseLock) override { invariant(false); } + virtual LockResult lockGlobal(OperationContext* opCtx, LockMode mode) { + MONGO_UNREACHABLE; + } + virtual LockResult lockGlobal(LockMode mode) { - invariant(false); + MONGO_UNREACHABLE; } virtual LockResult lockGlobalBegin(LockMode mode, Date_t deadline) { - invariant(false); + MONGO_UNREACHABLE; } + virtual LockResult lockGlobalComplete(OperationContext* opCtx, Date_t deadline) { + MONGO_UNREACHABLE; + } virtual LockResult lockGlobalComplete(Date_t deadline) { - invariant(false); + MONGO_UNREACHABLE; } virtual void lockMMAPV1Flush() { - invariant(false); + MONGO_UNREACHABLE; } virtual bool unlockGlobal() { - invariant(false); + MONGO_UNREACHABLE; } virtual void downgradeGlobalXtoSForMMAPV1() { - invariant(false); + MONGO_UNREACHABLE; } virtual void beginWriteUnitOfWork() {} @@ -90,7 +97,15 @@ public: virtual void endWriteUnitOfWork() {} virtual bool inAWriteUnitOfWork() const { - invariant(false); + MONGO_UNREACHABLE; + } + + virtual LockResult lock(OperationContext* opCtx, + ResourceId resId, + LockMode mode, + Date_t deadline, + bool checkDeadlock) { + return LockResult::LOCK_OK; } virtual LockResult lock(ResourceId resId, LockMode mode, Date_t deadline, bool checkDeadlock) { @@ -98,7 +113,7 @@ public: } virtual void downgrade(ResourceId resId, LockMode newMode) { - invariant(false); + MONGO_UNREACHABLE; } virtual bool unlock(ResourceId resId) { @@ -106,7 +121,7 @@ public: } virtual LockMode getLockMode(ResourceId resId) const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool isLockHeldForMode(ResourceId resId, LockMode mode) const { @@ -122,19 +137,22 @@ public: } virtual ResourceId getWaitingResource() const { - invariant(false); + MONGO_UNREACHABLE; } virtual void getLockerInfo(LockerInfo* lockerInfo) const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool saveLockStateAndUnlock(LockSnapshot* stateOut) { - invariant(false); + MONGO_UNREACHABLE; } + virtual void restoreLockState(OperationContext* opCtx, const LockSnapshot& stateToRestore) { + MONGO_UNREACHABLE; + } virtual void restoreLockState(const LockSnapshot& stateToRestore) { - invariant(false); + MONGO_UNREACHABLE; } virtual void releaseTicket() { @@ -146,15 +164,15 @@ public: } virtual void dump() const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool isW() const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool isR() const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool isLocked() const { @@ -166,11 +184,11 @@ public: } virtual bool isReadLocked() const { - invariant(false); + MONGO_UNREACHABLE; } virtual bool hasLockPending() const { - invariant(false); + MONGO_UNREACHABLE; } bool isGlobalLockedRecursively() override { diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 52efb4948f8..b6fa5158000 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -167,9 +167,13 @@ OldClientContext::OldClientContext( } OldClientContext::~OldClientContext() { - // Lock must still be held - invariant(_opCtx->lockState()->isLocked()); + // If in an interrupt, don't record any stats. + // It is possible to have no lock after saving the lock state and being interrupted while + // waiting to restore. + if (_opCtx->getKillStatus() != ErrorCodes::OK) + return; + invariant(_opCtx->lockState()->isLocked()); auto currentOp = CurOp::get(_opCtx); Top::get(_opCtx->getClient()->getServiceContext()) .record(_opCtx, diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index cb39e4cbec6..7c7862c6629 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -37,8 +37,8 @@ #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/storage/recovery_unit.h" -#include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/db/storage/write_unit_of_work.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" @@ -531,84 +531,6 @@ private: bool _hasStashedCursor = false; }; -class WriteUnitOfWork { - MONGO_DISALLOW_COPYING(WriteUnitOfWork); - -public: - WriteUnitOfWork() = default; - - WriteUnitOfWork(OperationContext* opCtx) - : _opCtx(opCtx), _toplevel(opCtx->_ruState == OperationContext::kNotInUnitOfWork) { - uassert(ErrorCodes::IllegalOperation, - "Cannot execute a write operation in read-only mode", - !storageGlobalParams.readOnly); - _opCtx->lockState()->beginWriteUnitOfWork(); - if (_toplevel) { - _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx); - _opCtx->_ruState = OperationContext::kActiveUnitOfWork; - } - } - - ~WriteUnitOfWork() { - dassert(!storageGlobalParams.readOnly); - if (!_released && !_committed) { - invariant(_opCtx->_ruState != OperationContext::kNotInUnitOfWork); - if (_toplevel) { - _opCtx->recoveryUnit()->abortUnitOfWork(); - _opCtx->_ruState = OperationContext::kNotInUnitOfWork; - } else { - _opCtx->_ruState = OperationContext::kFailedUnitOfWork; - } - _opCtx->lockState()->endWriteUnitOfWork(); - } - } - - /** - * Creates a top-level WriteUnitOfWork without changing RecoveryUnit or Locker state. For use - * when the RecoveryUnit and Locker are already in an active state. - */ - static std::unique_ptr<WriteUnitOfWork> createForSnapshotResume(OperationContext* opCtx) { - auto wuow = stdx::make_unique<WriteUnitOfWork>(); - wuow->_opCtx = opCtx; - wuow->_toplevel = true; - wuow->_opCtx->_ruState = OperationContext::kActiveUnitOfWork; - return wuow; - } - - /** - * Releases the OperationContext RecoveryUnit and Locker objects from management without - * changing state. Allows for use of these objects beyond the WriteUnitOfWork lifespan. - */ - void release() { - invariant(_opCtx->_ruState == OperationContext::kActiveUnitOfWork); - invariant(!_committed); - invariant(_toplevel); - - _released = true; - _opCtx->_ruState = OperationContext::kNotInUnitOfWork; - } - - void commit() { - invariant(!_committed); - invariant(!_released); - invariant(_opCtx->_ruState == OperationContext::kActiveUnitOfWork); - if (_toplevel) { - _opCtx->recoveryUnit()->commitUnitOfWork(); - _opCtx->_ruState = OperationContext::kNotInUnitOfWork; - } - _opCtx->lockState()->endWriteUnitOfWork(); - _committed = true; - } - -private: - OperationContext* _opCtx; - - bool _toplevel; - - bool _committed = false; - bool _released = false; -}; - namespace repl { /** * RAII-style class to turn off replicated writes. Writes do not create oplog entries while the diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 99d2a39740f..9e6f2f0bd50 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -265,6 +265,7 @@ void DocumentSourceCursor::cleanupExecutor() { // return nullptr if the collection has since turned into a view. In this case, '_exec' will // already have been marked as killed when the collection was dropped, and we won't need to // access the CursorManager to properly dispose of it. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS); Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS); auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr; diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 9bd3656b958..8c7856907fd 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -262,6 +262,7 @@ Message getMore(OperationContext* opCtx, // Note that we acquire our locks before our ClientCursorPin, in order to ensure that the pin's // destructor is called before the lock's destructor (if there is one) so that the cursor // cleanup can occur under the lock. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); boost::optional<AutoGetCollectionForRead> readLock; boost::optional<AutoStatsTracker> statsTracker; CursorManager* cursorManager; diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp index 68df4a0cb52..cd5d513517c 100644 --- a/src/mongo/db/query/query_yield.cpp +++ b/src/mongo/db/query/query_yield.cpp @@ -83,7 +83,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx, whileYieldingFn(); } - locker->restoreLockState(snapshot); + UninterruptableLockGuard noInterrupt(locker); + locker->restoreLockState(opCtx, snapshot); } } // namespace mongo diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index c5086061412..8437283e60c 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -882,6 +882,9 @@ int ReplSource::_sync_pullOpLog(OperationContext* opCtx, int& nApplied) { bool tailing = true; oplogReader.tailCheck(); + // Due to the lack of exception handlers, don't allow lock interrupts. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); + bool initial = syncedTo.isNull(); if (!oplogReader.haveCursor() || initial) { @@ -1301,6 +1304,7 @@ static void replMasterThread() { OperationContext& opCtx = *opCtxPtr; AuthorizationSession::get(opCtx.getClient())->grantInternalAuthorization(); + UninterruptableLockGuard noInterrupt(opCtx.lockState()); Lock::GlobalWrite globalWrite(&opCtx, Date_t::now() + Milliseconds(1)); if (globalWrite.isLocked()) { toSleep = 10; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b2a5947e34e..ea7cf1d03e0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1585,6 +1585,7 @@ Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, const Date_t stepDownUntil = startTime + stepdownTime; const Date_t waitUntil = startTime + waitTime; + UninterruptableLockGuard noInterrupt(opCtx->lockState()); if (!getMemberState().primary()) { // Note this check is inherently racy - it's always possible for the node to // stepdown from some other path before we acquire the global exclusive lock. This check diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index f6cba98804b..89276f8b31d 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -398,6 +398,7 @@ Status StorageInterfaceImpl::createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { return writeConflictRetry(opCtx, "StorageInterfaceImpl::createCollection", nss.ns(), [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetOrCreateDb databaseWriteGuard(opCtx, nss.db(), MODE_X); auto db = databaseWriteGuard.getDb(); invariant(db); @@ -420,6 +421,7 @@ Status StorageInterfaceImpl::createCollection(OperationContext* opCtx, Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetDb autoDB(opCtx, nss.db(), MODE_X); if (!autoDB.getDb()) { // Database does not exist - nothing to do. diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 8de246d264a..24e4d784085 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -102,6 +102,7 @@ boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( auto notification = DeleteNotification(); { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto* const collection = autoColl.getCollection(); @@ -240,6 +241,8 @@ boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( LOG(0) << "Error when waiting for write concern after removing " << nss << " range " << redact(range->toString()) << " : " << redact(status.reason()); + // Don't allow lock interrupts while cleaning up. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto* const css = CollectionShardingState::get(opCtx, nss); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index c4a8d99f24e..a1d8568c2c4 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -1103,6 +1103,7 @@ void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, return; // no documents can have been moved in, so there is nothing to clean up. } + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 55b503948a8..6a5d2f35462 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -151,6 +151,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, // Snapshot the committed metadata from the time the migration starts const auto collectionMetadataAndUUID = [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); uassert(ErrorCodes::InvalidOptions, "cannot move chunks for a collection that doesn't exist", @@ -232,6 +233,7 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { { // Register for notifications from the replication subsystem + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); auto css = CollectionShardingState::get(opCtx, getNss()); @@ -288,6 +290,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { { const auto metadata = [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata(); }(); @@ -313,6 +316,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { // The critical section must be entered with collection X lock in order to ensure there are // no writes which could have entered and passed the version check just before we entered // the crticial section, but managed to complete after we left it. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); // IMPORTANT: After this line, the critical section is in place and needs to be signaled @@ -382,6 +386,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC { const auto metadata = [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata(); }(); @@ -418,6 +423,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // Read operations must begin to wait on the critical section just before we send the commit // operation to the config server { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); _readsShouldWaitOnCritSec = true; } @@ -478,6 +484,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // metadata for this collection, forcing subsequent callers to do a full refresh. Check if // this node can accept writes for this collection as a proxy for it being primary. if (!status.isOK()) { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) { CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr); @@ -514,6 +521,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC }(); if (!refreshStatus.isOK()) { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr); @@ -534,6 +542,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC } auto refreshedMetadata = [&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); return CollectionShardingState::get(opCtx, getNss())->getMetadata(); }(); @@ -583,6 +592,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC auto notification = [&] { auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingState::kNow : CollectionShardingState::kDelayed; + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); return CollectionShardingState::get(opCtx, getNss())->cleanUpRange(range, whenToClean); }(); @@ -660,6 +670,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( auto const serviceContext = opCtx->getClient()->getServiceContext(); + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); writeConflictRetry( opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { @@ -675,6 +686,7 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) { auto cloneDriver = [&]() { // Unregister from the collection's sharding state + UninterruptableLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); auto css = CollectionShardingState::get(opCtx, getNss()); diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 170cf8b7594..2e8492bf1a8 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -187,6 +187,17 @@ env.Library( ], ) +env.Library( + target="write_unit_of_work", + source=[ + "write_unit_of_work.cpp", + ], + LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/base", + "$BUILD_DIR/mongo/db/storage/storage_options", + ], +) + env.CppUnitTest( target= 'storage_engine_lock_file_test', source = 'storage_engine_lock_file_test.cpp', diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 646a6784a08..28b69441724 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1042,7 +1042,7 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* oplogStones->awaitHasExcessStonesOrDead(); // Reacquire the locks that were released. - locker->restoreLockState(snapshot); + locker->restoreLockState(opCtx, snapshot); return !oplogStones->isDead(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp index 5778754e815..03934922aef 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_mongod.cpp @@ -103,6 +103,8 @@ public: return false; // Oplog went away. } rs->reclaimOplog(&opCtx); + } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + return false; } catch (const std::exception& e) { severe() << "error in WiredTigerRecordStoreThread: " << e.what(); fassertFailedNoTrace(!"error in WiredTigerRecordStoreThread"); diff --git a/src/mongo/db/storage/write_unit_of_work.cpp b/src/mongo/db/storage/write_unit_of_work.cpp new file mode 100644 index 00000000000..eb5226821e5 --- /dev/null +++ b/src/mongo/db/storage/write_unit_of_work.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/storage/write_unit_of_work.h" + +namespace mongo { + +WriteUnitOfWork::WriteUnitOfWork(OperationContext* opCtx) + : _opCtx(opCtx), _toplevel(opCtx->_ruState == OperationContext::kNotInUnitOfWork) { + uassert(ErrorCodes::IllegalOperation, + "Cannot execute a write operation in read-only mode", + !storageGlobalParams.readOnly); + _opCtx->lockState()->beginWriteUnitOfWork(); + if (_toplevel) { + _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx); + _opCtx->_ruState = OperationContext::kActiveUnitOfWork; + } +} + +WriteUnitOfWork::~WriteUnitOfWork() { + dassert(!storageGlobalParams.readOnly); + if (!_released && !_committed) { + invariant(_opCtx->_ruState != OperationContext::kNotInUnitOfWork); + if (_toplevel) { + _opCtx->recoveryUnit()->abortUnitOfWork(); + _opCtx->_ruState = OperationContext::kNotInUnitOfWork; + } else { + _opCtx->_ruState = OperationContext::kFailedUnitOfWork; + } + _opCtx->lockState()->endWriteUnitOfWork(); + } +} + +/** + * Creates a top-level WriteUnitOfWork without changing RecoveryUnit or Locker state. For use + * when the RecoveryUnit and Locker are already in an active state. + */ +std::unique_ptr<WriteUnitOfWork> WriteUnitOfWork::createForSnapshotResume(OperationContext* opCtx) { + auto wuow = stdx::make_unique<WriteUnitOfWork>(); + wuow->_opCtx = opCtx; + wuow->_toplevel = true; + wuow->_opCtx->_ruState = OperationContext::kActiveUnitOfWork; + return wuow; +} + +/** + * Releases the OperationContext RecoveryUnit and Locker objects from management without + * changing state. Allows for use of these objects beyond the WriteUnitOfWork lifespan. + */ +void WriteUnitOfWork::release() { + invariant(_opCtx->_ruState == OperationContext::kActiveUnitOfWork); + invariant(!_committed); + invariant(_toplevel); + + _released = true; + _opCtx->_ruState = OperationContext::kNotInUnitOfWork; +} + +void WriteUnitOfWork::commit() { + invariant(!_committed); + invariant(!_released); + invariant(_opCtx->_ruState == OperationContext::kActiveUnitOfWork); + if (_toplevel) { + _opCtx->recoveryUnit()->commitUnitOfWork(); + _opCtx->_ruState = OperationContext::kNotInUnitOfWork; + } + _opCtx->lockState()->endWriteUnitOfWork(); + _committed = true; +} + +} // namespace mongo diff --git a/src/mongo/db/storage/write_unit_of_work.h b/src/mongo/db/storage/write_unit_of_work.h new file mode 100644 index 00000000000..d7f53af6abd --- /dev/null +++ b/src/mongo/db/storage/write_unit_of_work.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" + +namespace mongo { + +class WriteUnitOfWork { + MONGO_DISALLOW_COPYING(WriteUnitOfWork); + +public: + WriteUnitOfWork() = default; + + WriteUnitOfWork(OperationContext* opCtx); + + ~WriteUnitOfWork(); + + static std::unique_ptr<WriteUnitOfWork> createForSnapshotResume(OperationContext* opCtx); + void release(); + void commit(); + +private: + OperationContext* _opCtx; + + bool _toplevel; + + bool _committed = false; + bool _released = false; +}; + +} // namespace mongo diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index f6ad2104559..12516d034f1 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -154,7 +154,10 @@ int handleBatchHelper(SessionsCollection* sessionsCollection, Locker::LockSnapshot snapshot; invariant(locker->saveLockStateAndUnlock(&snapshot)); - const auto guard = MakeGuard([&] { locker->restoreLockState(snapshot); }); + const auto guard = MakeGuard([&] { + UninterruptableLockGuard noInterrupt(opCtx->lockState()); + locker->restoreLockState(opCtx, snapshot); + }); // Top-level locks are freed, release any potential low-level (storage engine-specific // locks). If we are yielding, we are at a safe place to do so. diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 6f61e5f63dd..2bf3e717535 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -134,6 +134,7 @@ private: // Get all TTL indexes from every collection. for (const std::string& collectionNS : ttlCollections) { + UninterruptableLockGuard noInterrupt(opCtx.lockState()); NamespaceString collectionNSS(collectionNS); AutoGetCollection autoGetCollection(&opCtx, collectionNSS, MODE_IS); Collection* coll = autoGetCollection.getCollection(); diff --git a/src/mongo/s/catalog/replset_dist_lock_manager.cpp b/src/mongo/s/catalog/replset_dist_lock_manager.cpp index 4b8a3e1bca7..6e9871c40b4 100644 --- a/src/mongo/s/catalog/replset_dist_lock_manager.cpp +++ b/src/mongo/s/catalog/replset_dist_lock_manager.cpp @@ -104,6 +104,8 @@ void ReplSetDistLockManager::shutDown(OperationContext* opCtx) { _execThread.reset(); } + // Don't allow interrupts while cleaning up. + UninterruptableLockGuard noInterrupt(opCtx->lockState()); auto status = _catalog->stopPing(opCtx, _processID); if (!status.isOK()) { warning() << "error encountered while cleaning up distributed ping entry for " << _processID diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index b79770eb2a4..c11edd7c4c8 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -887,6 +887,7 @@ var ReplSetTest = function(opts) { // if a heartbeat times out during the quorum check. // They may also fail with NewReplicaSetConfigurationIncompatible on similar timeout // during the config validation stage while deducing isSelf(). + // This can fail with an InterruptedDueToReplStateChange error when interrupted. // We retry three times to reduce the chance of failing this way. assert.retry(() => { var res; @@ -905,10 +906,13 @@ var ReplSetTest = function(opts) { throw e; } - assert.commandFailedWithCode( - res, - [ErrorCodes.NodeNotFound, ErrorCodes.NewReplicaSetConfigurationIncompatible], - "replSetReconfig during initiate failed"); + assert.commandFailedWithCode(res, + [ + ErrorCodes.NodeNotFound, + ErrorCodes.NewReplicaSetConfigurationIncompatible, + ErrorCodes.InterruptedDueToReplStateChange + ], + "replSetReconfig during initiate failed"); return false; }, "replSetReconfig during initiate failed", 3, 5 * 1000); } |