diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-11-14 18:34:27 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-12-03 18:21:37 -0500 |
commit | 55e72b015e2aa7297c00db29e4d93451ea61a7ca (patch) | |
tree | 8f91b68f97adc99332688bfcfaa04f9818679851 /src | |
parent | 74921ac92c1330f754eed39c8e7148955aca2be9 (diff) | |
download | mongo-55e72b015e2aa7297c00db29e4d93451ea61a7ca.tar.gz |
SERVER-37199 Yield locks of transactions in secondary application.r4.1.6
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/concurrency/lock_manager.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.h | 8 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state_test.cpp | 132 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 13 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker_noop.h | 13 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 1 |
8 files changed, 222 insertions, 11 deletions
diff --git a/src/mongo/db/concurrency/lock_manager.cpp b/src/mongo/db/concurrency/lock_manager.cpp index 1b8f5f644a5..1a5520494bf 100644 --- a/src/mongo/db/concurrency/lock_manager.cpp +++ b/src/mongo/db/concurrency/lock_manager.cpp @@ -1033,6 +1033,7 @@ void LockRequest::initNew(Locker* locker, LockGrantNotification* notify) { partitioned = false; mode = MODE_NONE; convertMode = MODE_NONE; + unlockPending = 0; } diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index e47fd47990f..421b201af00 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -423,6 +423,40 @@ void LockerImpl::endWriteUnitOfWork() { } } +bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) { + // Only the global WUOW can be released. + invariant(_wuowNestingLevel == 1); + --_wuowNestingLevel; + invariant(!isGlobalLockedRecursively()); + + // All locks should be pending to unlock. + invariant(_requests.size() == _numResourcesToUnlockAtEndUnitOfWork); + for (auto it = _requests.begin(); it; it.next()) { + // No converted lock so we don't need to unlock more than once. + invariant(it->unlockPending == 1); + } + _numResourcesToUnlockAtEndUnitOfWork = 0; + + return saveLockStateAndUnlock(stateOut); +} + +void LockerImpl::restoreWriteUnitOfWork(OperationContext* opCtx, + const LockSnapshot& stateToRestore) { + if (stateToRestore.globalMode != MODE_NONE) { + restoreLockState(opCtx, stateToRestore); + } + + invariant(_numResourcesToUnlockAtEndUnitOfWork == 0); + for (auto it = _requests.begin(); it; it.next()) { + invariant(_shouldDelayUnlock(it.key(), (it->mode))); + invariant(it->unlockPending == 0); + it->unlockPending++; + } + _numResourcesToUnlockAtEndUnitOfWork = static_cast<unsigned>(_requests.size()); + + beginWriteUnitOfWork(); +} + LockResult LockerImpl::lock(OperationContext* opCtx, ResourceId resId, LockMode mode, diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 599cb95bd2d..ab21b58bb3a 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -149,8 +149,8 @@ public: virtual LockResult lockRSTLBegin(OperationContext* opCtx); virtual LockResult lockRSTLComplete(OperationContext* opCtx, Date_t deadline); - virtual void beginWriteUnitOfWork(); - virtual void endWriteUnitOfWork(); + virtual void beginWriteUnitOfWork() override; + virtual void endWriteUnitOfWork() override; virtual bool inAWriteUnitOfWork() const { return _wuowNestingLevel > 0; @@ -194,6 +194,10 @@ public: restoreLockState(nullptr, stateToRestore); } + bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override; + void restoreWriteUnitOfWork(OperationContext* opCtx, + const LockSnapshot& stateToRestore) override; + virtual void releaseTicket(); virtual void reacquireTicket(OperationContext* opCtx); diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp index a0094d3d485..b731f556057 100644 --- a/src/mongo/db/concurrency/lock_state_test.cpp +++ b/src/mongo/db/concurrency/lock_state_test.cpp @@ -254,6 +254,138 @@ TEST(LockerImpl, saveAndRestoreDBAndCollection) { ASSERT(locker.unlockGlobal()); } +TEST(LockerImpl, releaseWriteUnitOfWork) { + Locker::LockSnapshot lockInfo; + + LockerImpl locker; + + const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd); + const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd); + + locker.beginWriteUnitOfWork(); + // Lock some stuff. + locker.lockGlobal(MODE_IX); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IX)); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_X)); + // Unlock them so that they will be pending to unlock. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + + ASSERT(locker.releaseWriteUnitOfWork(&lockInfo)); + + // Things shouldn't be locked anymore. + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); + + // Destructor should succeed since the locker's state should be empty. +} + +TEST(LockerImpl, restoreWriteUnitOfWork) { + Locker::LockSnapshot lockInfo; + + LockerImpl locker; + + const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd); + const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd); + + locker.beginWriteUnitOfWork(); + // Lock some stuff. + locker.lockGlobal(MODE_IX); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IX)); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_X)); + // Unlock them so that they will be pending to unlock. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + + ASSERT(locker.releaseWriteUnitOfWork(&lockInfo)); + + // Things shouldn't be locked anymore. + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); + + // Restore lock state. + locker.restoreWriteUnitOfWork(nullptr, lockInfo); + + // Make sure things were re-locked. + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT(locker.isLocked()); + + locker.endWriteUnitOfWork(); + + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); +} + +TEST(LockerImpl, releaseAndRestoreReadOnlyWriteUnitOfWork) { + Locker::LockSnapshot lockInfo; + + LockerImpl locker; + + const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd); + const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd); + + // Snapshot transactions delay shared locks as well. + locker.setSharedLocksShouldTwoPhaseLock(true); + + locker.beginWriteUnitOfWork(); + // Lock some stuff in IS mode. + locker.lockGlobal(MODE_IS); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IS)); + ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_IS)); + // Unlock them. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + ASSERT_EQ(3u, locker.numResourcesToUnlockAtEndUnitOfWorkForTest()); + + // Things shouldn't be locked anymore. + ASSERT_TRUE(locker.releaseWriteUnitOfWork(&lockInfo)); + + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); + + // Restore lock state. + locker.restoreWriteUnitOfWork(nullptr, lockInfo); + + ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdCollection)); + ASSERT_TRUE(locker.isLocked()); + + locker.endWriteUnitOfWork(); + + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); +} + +TEST(LockerImpl, releaseAndRestoreEmptyWriteUnitOfWork) { + Locker::LockSnapshot lockInfo; + LockerImpl locker; + + // Snapshot transactions delay shared locks as well. + locker.setSharedLocksShouldTwoPhaseLock(true); + + locker.beginWriteUnitOfWork(); + + // Nothing to yield. + ASSERT_FALSE(locker.releaseWriteUnitOfWork(&lockInfo)); + ASSERT_FALSE(locker.isLocked()); + + // Restore lock state. + locker.restoreWriteUnitOfWork(nullptr, lockInfo); + ASSERT_FALSE(locker.isLocked()); + + locker.endWriteUnitOfWork(); + ASSERT_FALSE(locker.isLocked()); +} + TEST(LockerImpl, DefaultLocker) { const ResourceId resId(RESOURCE_DATABASE, "TestDB"_sd); diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index d1ea2830a91..ee4b7118b5d 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -206,8 +206,8 @@ public: /** * beginWriteUnitOfWork/endWriteUnitOfWork are called at the start and end of WriteUnitOfWorks. - * They can be used to implement two-phase locking. Each call to begin should be matched with an - * eventual call to end. + * They can be used to implement two-phase locking. Each call to begin or restore should be + * matched with an eventual call to end or release. * * endWriteUnitOfWork, if not called in a nested WUOW, will release all two-phase locking held * lock resources. @@ -375,6 +375,15 @@ public: virtual void restoreLockState(const LockSnapshot& stateToRestore) = 0; /** + * releaseWriteUnitOfWork opts out two-phase locking and yield the locks after a WUOW + * has been released. restoreWriteUnitOfWork reaquires the locks and resume the two-phase + * locking behavior of WUOW. + */ + virtual bool releaseWriteUnitOfWork(LockSnapshot* stateOut) = 0; + virtual void restoreWriteUnitOfWork(OperationContext* opCtx, + const LockSnapshot& stateToRestore) = 0; + + /** * Releases the ticket associated with the Locker. This allows locks to be held without * contributing to reader/writer throttling. */ diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index 37abca2c99e..332e410ff91 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -111,9 +111,9 @@ public: MONGO_UNREACHABLE; } - virtual void beginWriteUnitOfWork() {} + virtual void beginWriteUnitOfWork() override {} - virtual void endWriteUnitOfWork() {} + virtual void endWriteUnitOfWork() override {} virtual bool inAWriteUnitOfWork() const { return false; @@ -187,6 +187,15 @@ public: MONGO_UNREACHABLE; } + bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override { + MONGO_UNREACHABLE; + } + + void restoreWriteUnitOfWork(OperationContext* opCtx, + const LockSnapshot& stateToRestore) override { + MONGO_UNREACHABLE; + }; + virtual void releaseTicket() { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 05947daa982..cc9726c29b5 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -529,6 +529,12 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool } _locker->unsetThreadId(); + // On secondaries, we yield the locks for transactions. + if (!opCtx->writesAreReplicated()) { + _lockSnapshot = std::make_unique<Locker::LockSnapshot>(); + _locker->releaseWriteUnitOfWork(_lockSnapshot.get()); + } + // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); @@ -553,25 +559,35 @@ TransactionParticipant::TxnResources::~TxnResources() { // when starting a new transaction before completing an old one. So we should // be at WUOW nesting level 1 (only the top level WriteUnitOfWork). _recoveryUnit->abortUnitOfWork(); - _locker->endWriteUnitOfWork(); + // If locks are not yielded, release them. + if (!_lockSnapshot) { + _locker->endWriteUnitOfWork(); + } invariant(!_locker->inAWriteUnitOfWork()); } } void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { // Perform operations that can fail the release before marking the TxnResources as released. + + // Restore locks if they are yielded. + if (_lockSnapshot) { + invariant(!_locker->isLocked()); + // opCtx is passed in to enable the restoration to be interrupted. + _locker->restoreWriteUnitOfWork(opCtx, *_lockSnapshot); + _lockSnapshot.reset(nullptr); + } _locker->reacquireTicket(opCtx); invariant(!_released); _released = true; - // We intentionally do not capture the return value of swapLockState(), which is just an empty - // locker. At the end of the operation, if the transaction is not complete, we will stash the - // operation context's locker and replace it with a new empty locker. - // It is necessary to lock the client to change the Locker on the OperationContext. stdx::lock_guard<Client> lk(*opCtx->getClient()); invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); + // We intentionally do not capture the return value of swapLockState(), which is just an empty + // locker. At the end of the operation, if the transaction is not complete, we will stash the + // operation context's locker and replace it with a new empty locker. opCtx->swapLockState(std::move(_locker)); opCtx->lockState()->updateThreadIdToCurrentThread(); @@ -701,6 +717,11 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx : SpeculativeTransactionOpTime::kLastApplied); } + // All locks of transactions must be acquired inside the global WUOW so that we can + // yield and restore all locks on state transition. Otherwise, we'd have to remember + // which locks are managed by WUOW. + invariant(!opCtx->lockState()->isLocked()); + // Stashed transaction resources do not exist for this in-progress multi-document // transaction. Set up the transaction resources on the opCtx. opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index e7c69a69c76..85b0b0e3611 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -113,6 +113,7 @@ public: private: bool _released = false; std::unique_ptr<Locker> _locker; + std::unique_ptr<Locker::LockSnapshot> _lockSnapshot; std::unique_ptr<RecoveryUnit> _recoveryUnit; repl::ReadConcernArgs _readConcernArgs; WriteUnitOfWork::RecoveryUnitState _ruState; |