diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2019-04-24 20:09:52 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2019-05-03 20:10:27 -0400 |
commit | 4fb71c5a1c79b745ef56d53a8264ef5fdd202dda (patch) | |
tree | af44a7d5320b7ef6b2f7778ad52defbfc38defdb /src/mongo | |
parent | 51c1c3495e5583e3e570313eb0e0f68d304241e9 (diff) | |
download | mongo-4fb71c5a1c79b745ef56d53a8264ef5fdd202dda.tar.gz |
SERVER-40498 Side transaction keeps the same locker
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency.h | 6 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency_test.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.h | 9 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state_test.cpp | 128 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 33 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker_noop.h | 14 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 13 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 12 |
10 files changed, 298 insertions, 74 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index 5e9bd6a94a2..8d731a1dfaf 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -220,6 +220,8 @@ public: EnqueueOnly enqueueOnly); ~GlobalLock() { + // Preserve the original lock result which will be overridden by unlock(). + auto lockResult = _result; if (isLocked()) { // Abandon our snapshot if destruction of the GlobalLock object results in actually // unlocking the global lock. Recursive locking and the two-phase locking protocol @@ -231,7 +233,9 @@ public: } _unlock(); } - _opCtx->lockState()->unlock(resourceIdReplicationStateTransitionLock); + if (lockResult == LOCK_OK || lockResult == LOCK_WAITING) { + _opCtx->lockState()->unlock(resourceIdReplicationStateTransitionLock); + } } /** diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index cbe7d62bf35..3d400a9bceb 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -983,6 +983,36 @@ TEST_F(DConcurrencyTestFixture, MODE_NONE); } +TEST_F(DConcurrencyTestFixture, FailedGlobalLockShouldUnlockRSTLOnlyOnce) { + auto clients = makeKClientsWithLockers(2); + auto opCtx1 = clients[0].second.get(); + auto opCtx2 = clients[1].second.get(); + + auto resourceRSTL = resourceIdReplicationStateTransitionLock; + + // Take the exclusive lock with the first caller. + Lock::GlobalLock globalLock(opCtx1, MODE_X); + + opCtx2->lockState()->beginWriteUnitOfWork(); + // Set a max timeout on the second caller that will override provided lock request + // deadlines. + // Then requesting a lock with Date_t::max() should cause a LockTimeout error to be thrown. + opCtx2->lockState()->setMaxLockTimeout(Milliseconds(100)); + + ASSERT_THROWS_CODE( + Lock::GlobalLock(opCtx2, MODE_IX, Date_t::max(), Lock::InterruptBehavior::kThrow), + DBException, + ErrorCodes::LockTimeout); + auto opCtx2Locker = static_cast<LockerImpl*>(opCtx2->lockState()); + // GlobalLock failed, but the RSTL should be successfully acquired and pending unlocked. + ASSERT(opCtx2Locker->getRequestsForTest().find(resourceIdGlobal).finished()); + ASSERT_EQ(opCtx2Locker->getRequestsForTest().find(resourceRSTL).objAddr()->unlockPending, 1U); + ASSERT_EQ(opCtx2Locker->getRequestsForTest().find(resourceRSTL).objAddr()->recursiveCount, 1U); + opCtx2->lockState()->endWriteUnitOfWork(); + ASSERT_EQ(opCtx1->lockState()->getLockMode(resourceRSTL), MODE_IX); + ASSERT_EQ(opCtx2->lockState()->getLockMode(resourceRSTL), MODE_NONE); +} + TEST_F(DConcurrencyTestFixture, DBLockWaitIsInterruptible) { auto clients = makeKClientsWithLockers(2); auto opCtx1 = clients[0].second.get(); diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 8f1ec5c6c21..4fef4eadb9e 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -433,8 +433,43 @@ void LockerImpl::endWriteUnitOfWork() { } } -bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) { - // Only the global WUOW can be released. +void LockerImpl::releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) { + stateOut->wuowNestingLevel = _wuowNestingLevel; + _wuowNestingLevel = 0; + + for (auto it = _requests.begin(); _numResourcesToUnlockAtEndUnitOfWork > 0; it.next()) { + if (it->unlockPending) { + while (it->unlockPending) { + it->unlockPending--; + stateOut->unlockPendingLocks.push_back({it.key(), it->mode}); + } + _numResourcesToUnlockAtEndUnitOfWork--; + } + } +} + +void LockerImpl::restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) { + invariant(_numResourcesToUnlockAtEndUnitOfWork == 0); + invariant(!inAWriteUnitOfWork()); + + for (auto& lock : stateToRestore.unlockPendingLocks) { + auto it = _requests.begin(); + while (it && !(it.key() == lock.resourceId && it->mode == lock.mode)) { + it.next(); + } + invariant(!it.finished()); + if (!it->unlockPending) { + _numResourcesToUnlockAtEndUnitOfWork++; + } + it->unlockPending++; + } + // Equivalent to call beginWriteUnitOfWork() multiple times. + _wuowNestingLevel = stateToRestore.wuowNestingLevel; +} + +bool LockerImpl::releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) { + // Only the global WUOW can be released, since we never need to release and restore + // nested WUOW's. Thus we don't have to remember the nesting level. invariant(_wuowNestingLevel == 1); --_wuowNestingLevel; invariant(!isGlobalLockedRecursively()); @@ -444,14 +479,15 @@ bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) { for (auto it = _requests.begin(); it; it.next()) { // No converted lock so we don't need to unlock more than once. invariant(it->unlockPending == 1); + it->unlockPending--; } _numResourcesToUnlockAtEndUnitOfWork = 0; return saveLockStateAndUnlock(stateOut); } -void LockerImpl::restoreWriteUnitOfWork(OperationContext* opCtx, - const LockSnapshot& stateToRestore) { +void LockerImpl::restoreWriteUnitOfWorkAndLock(OperationContext* opCtx, + const LockSnapshot& stateToRestore) { if (stateToRestore.globalMode != MODE_NONE) { restoreLockState(opCtx, stateToRestore); } @@ -498,10 +534,9 @@ bool LockerImpl::unlock(ResourceId resId) { _numResourcesToUnlockAtEndUnitOfWork++; } it->unlockPending++; - // unlockPending will only be incremented if a lock is converted and unlock() is called - // multiple times on one ResourceId. - invariant(it->unlockPending < LockModesCount); - + // unlockPending will be incremented if a lock is converted or acquired in the same mode + // recursively, and unlock() is called multiple times on one ResourceId. + invariant(it->unlockPending <= it->recursiveCount); return false; } @@ -518,6 +553,7 @@ bool LockerImpl::unlockRSTLforPrepare() { // If the RSTL is 'unlockPending' and we are fully unlocking it, then we do not want to // attempt to unlock the RSTL when the WUOW ends, since it will already be unlocked. if (rstlRequest->unlockPending) { + rstlRequest->unlockPending = 0; _numResourcesToUnlockAtEndUnitOfWork--; } diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 0b9741633d5..7a592d1ac11 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -204,9 +204,12 @@ public: restoreLockState(nullptr, stateToRestore); } - bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override; - void restoreWriteUnitOfWork(OperationContext* opCtx, - const LockSnapshot& stateToRestore) override; + bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) override; + void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx, + const LockSnapshot& stateToRestore) override; + + void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) override; + void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) override; virtual void releaseTicket(); virtual void reacquireTicket(OperationContext* opCtx); diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp index 4f6a3628536..7c6cd0a7895 100644 --- a/src/mongo/db/concurrency/lock_state_test.cpp +++ b/src/mongo/db/concurrency/lock_state_test.cpp @@ -334,7 +334,7 @@ TEST_F(LockerImplTest, releaseWriteUnitOfWork) { ASSERT_FALSE(locker.unlock(resIdDatabase)); ASSERT_FALSE(locker.unlockGlobal()); - ASSERT(locker.releaseWriteUnitOfWork(&lockInfo)); + ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo)); // Things shouldn't be locked anymore. ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); @@ -362,7 +362,7 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) { ASSERT_FALSE(locker.unlock(resIdDatabase)); ASSERT_FALSE(locker.unlockGlobal()); - ASSERT(locker.releaseWriteUnitOfWork(&lockInfo)); + ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo)); // Things shouldn't be locked anymore. ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); @@ -370,7 +370,7 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) { ASSERT_FALSE(locker.isLocked()); // Restore lock state. - locker.restoreWriteUnitOfWork(nullptr, lockInfo); + locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo); // Make sure things were re-locked. ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); @@ -384,6 +384,120 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) { ASSERT_FALSE(locker.isLocked()); } +TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { + Locker::WUOWLockSnapshot lockInfo; + + LockerImpl locker; + + const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd); + const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd); + const ResourceId resIdCollection2(RESOURCE_COLLECTION, "TestDB.collection2"_sd); + + locker.beginWriteUnitOfWork(); + // Lock some stuff. + locker.lockGlobal(MODE_IX); + locker.lock(resIdDatabase, MODE_IX); + locker.lock(resIdCollection, MODE_X); + + // Recursive global lock. + locker.lockGlobal(MODE_IX); + ASSERT_FALSE(locker.unlockGlobal()); + + // Unlock them so that they will be pending to unlock. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + + locker.releaseWriteUnitOfWork(&lockInfo); + ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 4UL); + + // Things should still be locked. + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT(locker.isLocked()); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + + // The locker is no longer participating the two-phase locking. + ASSERT_FALSE(locker.inAWriteUnitOfWork()); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0UL); + + // Start a new WUOW with the same locker. Any new locks acquired in the new WUOW + // should participate two-phase locking. + { + locker.beginWriteUnitOfWork(); + + // Grab new locks inside the new WUOW. + locker.lockGlobal(MODE_IX); + locker.lock(resIdDatabase, MODE_IX); + locker.lock(resIdCollection2, MODE_IX); + + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdCollection2)); + ASSERT(locker.isLocked()); + + locker.unlock(resIdCollection2); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U); + locker.unlock(resIdDatabase); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 2UL); + // The DB lock has been locked twice, but only once in this WUOW. + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U); + locker.unlockGlobal(); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL); + // The global lock has been locked 3 times, but only 1 of them is part of this WUOW. + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 3U); + locker.endWriteUnitOfWork(); + } + ASSERT_FALSE(locker.inAWriteUnitOfWork()); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0UL); + + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT(locker.isLocked()); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + // The new locks has been released. + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2)); + + // Restore lock state. + locker.restoreWriteUnitOfWork(lockInfo); + + ASSERT_TRUE(locker.inAWriteUnitOfWork()); + + // Make sure things are still locked. + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT(locker.isLocked()); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + + locker.endWriteUnitOfWork(); + + ASSERT_FALSE(locker.inAWriteUnitOfWork()); + + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2)); + ASSERT_FALSE(locker.isLocked()); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0U); + ASSERT(locker.getRequestsForTest().find(resourceIdGlobal).finished()); +} + TEST_F(LockerImplTest, releaseAndRestoreReadOnlyWriteUnitOfWork) { Locker::LockSnapshot lockInfo; @@ -407,14 +521,14 @@ TEST_F(LockerImplTest, releaseAndRestoreReadOnlyWriteUnitOfWork) { ASSERT_EQ(3u, locker.numResourcesToUnlockAtEndUnitOfWorkForTest()); // Things shouldn't be locked anymore. - ASSERT_TRUE(locker.releaseWriteUnitOfWork(&lockInfo)); + ASSERT_TRUE(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo)); ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); ASSERT_FALSE(locker.isLocked()); // Restore lock state. - locker.restoreWriteUnitOfWork(nullptr, lockInfo); + locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo); ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdDatabase)); ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdCollection)); @@ -437,11 +551,11 @@ TEST_F(LockerImplTest, releaseAndRestoreEmptyWriteUnitOfWork) { locker.beginWriteUnitOfWork(); // Nothing to yield. - ASSERT_FALSE(locker.releaseWriteUnitOfWork(&lockInfo)); + ASSERT_FALSE(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo)); ASSERT_FALSE(locker.isLocked()); // Restore lock state. - locker.restoreWriteUnitOfWork(nullptr, lockInfo); + locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo); ASSERT_FALSE(locker.isLocked()); locker.endWriteUnitOfWork(); diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index 8f3575057a3..166bf0d124f 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -391,6 +391,18 @@ public: }; /** + * WUOWLockSnapshot captures all resources that have pending unlocks when releasing the write + * unit of work. If a lock has more than one pending unlock, it appears more than once here. + */ + struct WUOWLockSnapshot { + // Nested WUOW can be released and restored all together. + int wuowNestingLevel = 0; + + // The order of locks doesn't matter in this vector. + std::vector<OneLock> unlockPendingLocks; + }; + + /** * Retrieves all locks held by this transaction, other than RESOURCE_MUTEX locks, and what mode * they're held in. * Stores these locks in 'stateOut', destroying any previous state. Unlocks all locks @@ -417,13 +429,22 @@ public: virtual void restoreLockState(const LockSnapshot& stateToRestore) = 0; /** - * releaseWriteUnitOfWork opts out two-phase locking and yield the locks after a WUOW - * has been released. restoreWriteUnitOfWork reaquires the locks and resume the two-phase - * locking behavior of WUOW. + * releaseWriteUnitOfWorkAndUnlock opts out of two-phase locking and yields the locks after a + * WUOW has been released. restoreWriteUnitOfWorkAndLock reacquires the locks and resumes the + * two-phase locking behavior of WUOW. + */ + virtual bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) = 0; + virtual void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx, + const LockSnapshot& stateToRestore) = 0; + + + /** + * releaseWriteUnitOfWork opts out of two-phase locking of the current locks held but keeps + * holding these locks. + * restoreWriteUnitOfWork resumes the two-phase locking behavior of WUOW. */ - virtual bool releaseWriteUnitOfWork(LockSnapshot* stateOut) = 0; - virtual void restoreWriteUnitOfWork(OperationContext* opCtx, - const LockSnapshot& stateToRestore) = 0; + virtual void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) = 0; + virtual void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) = 0; /** * Releases the ticket associated with the Locker. This allows locks to be held without diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index 886510cb16c..3780e4f5c0b 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -197,12 +197,20 @@ public: MONGO_UNREACHABLE; } - bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override { + bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) override { MONGO_UNREACHABLE; } - void restoreWriteUnitOfWork(OperationContext* opCtx, - const LockSnapshot& stateToRestore) override { + void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx, + const LockSnapshot& stateToRestore) override { + MONGO_UNREACHABLE; + }; + + void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) override { + MONGO_UNREACHABLE; + } + + void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) override { MONGO_UNREACHABLE; }; diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 50202fe433d..6f22faf4cf1 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -635,32 +635,14 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o // We must lock the Client to change the Locker on the OperationContext. stdx::lock_guard<Client> lk(*opCtx->getClient()); - - // The new transaction should have an empty locker, and thus we do not need to save it. - invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); - _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>()); - // Inherit the locking setting from the original one. - opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( - _locker->shouldConflictWithSecondaryBatchApplication()); - _locker->unsetThreadId(); - if (opCtx->getLogicalSessionId()) { - _locker->setDebugInfo("lsid: " + opCtx->getLogicalSessionId()->toBSON().toString()); - } - - // OplogSlotReserver is only used by primary, so always set max transaction lock timeout. - invariant(opCtx->writesAreReplicated()); - // This thread must still respect the transaction lock timeout, since it can prevent the - // transaction from making progress. - auto maxTransactionLockMillis = gMaxTransactionLockRequestTimeoutMillis.load(); - if (maxTransactionLockMillis >= 0) { - opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); - } - // Save the RecoveryUnit from the new transaction and replace it with an empty one. _recoveryUnit = opCtx->releaseRecoveryUnit(); opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); + + // End two-phase locking on locker manually since the WUOW has been released. + _opCtx->lockState()->endWriteUnitOfWork(); } TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { @@ -676,8 +658,6 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { // We should be at WUOW nesting level 1, only the top level WUOW for the oplog reservation // side transaction. _recoveryUnit->abortUnitOfWork(); - _locker->endWriteUnitOfWork(); - invariant(!_locker->inAWriteUnitOfWork()); } // After releasing the oplog hole, the "all committed timestamp" can advance past @@ -701,9 +681,7 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl, // Inherit the locking setting from the original one. opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( _locker->shouldConflictWithSecondaryBatchApplication()); - if (stashStyle != StashStyle::kSideTransaction) { - _locker->releaseTicket(); - } + _locker->releaseTicket(); _locker->unsetThreadId(); if (opCtx->getLogicalSessionId()) { _locker->setDebugInfo("lsid: " + opCtx->getLogicalSessionId()->toBSON().toString()); @@ -712,13 +690,13 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl, // On secondaries, we yield the locks for transactions. if (stashStyle == StashStyle::kSecondary) { _lockSnapshot = std::make_unique<Locker::LockSnapshot>(); - _locker->releaseWriteUnitOfWork(_lockSnapshot.get()); + _locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get()); } // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = gMaxTransactionLockRequestTimeoutMillis.load(); - if (stashStyle != StashStyle::kSecondary && maxTransactionLockMillis >= 0) { + if (stashStyle == StashStyle::kPrimary && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } @@ -754,7 +732,7 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { if (_lockSnapshot) { invariant(!_locker->isLocked()); // opCtx is passed in to enable the restoration to be interrupted. - _locker->restoreWriteUnitOfWork(opCtx, *_lockSnapshot); + _locker->restoreWriteUnitOfWorkAndLock(opCtx, *_lockSnapshot); _lockSnapshot.reset(nullptr); } _locker->reacquireTicket(opCtx); @@ -784,18 +762,43 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) { - if (_opCtx->getWriteUnitOfWork()) { - stdx::lock_guard<Client> lk(*_opCtx->getClient()); - _txnResources = TransactionParticipant::TxnResources( - lk, _opCtx, TxnResources::StashStyle::kSideTransaction); + if (!_opCtx->getWriteUnitOfWork()) { + return; } + + // Release WUOW. + _ruState = opCtx->getWriteUnitOfWork()->release(); + opCtx->setWriteUnitOfWork(nullptr); + + // Remember the locking state of WUOW, opt out two-phase locking, but don't release locks. + opCtx->lockState()->releaseWriteUnitOfWork(&_WUOWLockSnapshot); + + // Release recovery unit, saving the recovery unit off to the side, keeping open the storage + // transaction. + _recoveryUnit = opCtx->releaseRecoveryUnit(); + opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>( + opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), + WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); } TransactionParticipant::SideTransactionBlock::~SideTransactionBlock() { - if (_txnResources) { - _txnResources->release(_opCtx); + if (!_recoveryUnit) { + return; } + + // Restore locker's state about WUOW. + _opCtx->lockState()->restoreWriteUnitOfWork(_WUOWLockSnapshot); + + // Restore recovery unit. + auto oldState = _opCtx->setRecoveryUnit(std::move(_recoveryUnit), + WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); + invariant(oldState == WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork, + str::stream() << "RecoveryUnit state was " << oldState); + + // Restore WUOW. + _opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(_opCtx, _ruState)); } + void TransactionParticipant::Participant::_stashActiveTransaction(OperationContext* opCtx) { if (p().inShutdown) { return; @@ -893,6 +896,8 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC // yield and restore all locks on state transition. Otherwise, we'd have to remember // which locks are managed by WUOW. invariant(!opCtx->lockState()->isLocked()); + invariant(!opCtx->lockState()->isRSTLLocked()); + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Stashed transaction resources do not exist for this in-progress multi-document // transaction. Set up the transaction resources on the opCtx. diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 505e9df3f42..3dd3e27c5b6 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -197,7 +197,7 @@ public: */ class TxnResources { public: - enum class StashStyle { kPrimary, kSecondary, kSideTransaction }; + enum class StashStyle { kPrimary, kSecondary }; /** * Stashes transaction state from 'opCtx' in the newly constructed TxnResources. @@ -241,8 +241,10 @@ public: }; /** - * An RAII object that stashes `TxnResouces` from the `opCtx` onto the stack. At destruction - * it unstashes the `TxnResources` back onto the `opCtx`. + * An RAII object that stashes the recovery unit from the `opCtx` onto the stack and keeps + * using the same locker of `opCtx`. The locker opts out of two-phase locking of the + * current WUOW. At destruction it unstashes the recovery unit back onto the `opCtx` and + * restores the locker state relevant to the original WUOW. */ class SideTransactionBlock { public: @@ -250,7 +252,9 @@ public: ~SideTransactionBlock(); private: - boost::optional<TxnResources> _txnResources; + Locker::WUOWLockSnapshot _WUOWLockSnapshot; + std::unique_ptr<RecoveryUnit> _recoveryUnit; + WriteUnitOfWork::RecoveryUnitState _ruState; OperationContext* _opCtx; }; @@ -850,7 +854,6 @@ private: private: OperationContext* _opCtx; - std::unique_ptr<Locker> _locker; std::unique_ptr<RecoveryUnit> _recoveryUnit; std::vector<OplogSlot> _oplogSlots; }; diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 00d04754702..ab26ab2f1a3 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -571,7 +571,7 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); @@ -605,7 +605,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithCommitTimestampFailsOnUnprepared txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction(opCtx(), commitTimestamp, {}), AssertionException, ErrorCodes::InvalidOptions); @@ -626,7 +626,7 @@ TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnprepare txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.commitUnpreparedTransaction(opCtx()); ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull()); @@ -643,7 +643,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithoutCommitTimestampFailsOnPrepare txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()), AssertionException, @@ -657,7 +657,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithNullCommitTimestampFailsOnPrepar txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction(opCtx(), Timestamp(), {}), AssertionException, @@ -672,7 +672,7 @@ TEST_F(TxnParticipantTest, txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. - Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction( opCtx(), Timestamp(prepareTimestamp.getSecs() - 1, 1), {}), |