summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2019-04-24 20:09:52 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2019-05-03 20:10:27 -0400
commit4fb71c5a1c79b745ef56d53a8264ef5fdd202dda (patch)
treeaf44a7d5320b7ef6b2f7778ad52defbfc38defdb /src/mongo
parent51c1c3495e5583e3e570313eb0e0f68d304241e9 (diff)
downloadmongo-4fb71c5a1c79b745ef56d53a8264ef5fdd202dda.tar.gz
SERVER-40498 Side transaction keeps the same locker
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h6
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp30
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp52
-rw-r--r--src/mongo/db/concurrency/lock_state.h9
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp128
-rw-r--r--src/mongo/db/concurrency/locker.h33
-rw-r--r--src/mongo/db/concurrency/locker_noop.h14
-rw-r--r--src/mongo/db/transaction_participant.cpp75
-rw-r--r--src/mongo/db/transaction_participant.h13
-rw-r--r--src/mongo/db/transaction_participant_test.cpp12
10 files changed, 298 insertions, 74 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 5e9bd6a94a2..8d731a1dfaf 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -220,6 +220,8 @@ public:
EnqueueOnly enqueueOnly);
~GlobalLock() {
+ // Preserve the original lock result which will be overridden by unlock().
+ auto lockResult = _result;
if (isLocked()) {
// Abandon our snapshot if destruction of the GlobalLock object results in actually
// unlocking the global lock. Recursive locking and the two-phase locking protocol
@@ -231,7 +233,9 @@ public:
}
_unlock();
}
- _opCtx->lockState()->unlock(resourceIdReplicationStateTransitionLock);
+ if (lockResult == LOCK_OK || lockResult == LOCK_WAITING) {
+ _opCtx->lockState()->unlock(resourceIdReplicationStateTransitionLock);
+ }
}
/**
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index cbe7d62bf35..3d400a9bceb 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -983,6 +983,36 @@ TEST_F(DConcurrencyTestFixture,
MODE_NONE);
}
+TEST_F(DConcurrencyTestFixture, FailedGlobalLockShouldUnlockRSTLOnlyOnce) {
+ auto clients = makeKClientsWithLockers(2);
+ auto opCtx1 = clients[0].second.get();
+ auto opCtx2 = clients[1].second.get();
+
+ auto resourceRSTL = resourceIdReplicationStateTransitionLock;
+
+ // Take the exclusive lock with the first caller.
+ Lock::GlobalLock globalLock(opCtx1, MODE_X);
+
+ opCtx2->lockState()->beginWriteUnitOfWork();
+ // Set a max timeout on the second caller that will override provided lock request
+ // deadlines.
+ // Then requesting a lock with Date_t::max() should cause a LockTimeout error to be thrown.
+ opCtx2->lockState()->setMaxLockTimeout(Milliseconds(100));
+
+ ASSERT_THROWS_CODE(
+ Lock::GlobalLock(opCtx2, MODE_IX, Date_t::max(), Lock::InterruptBehavior::kThrow),
+ DBException,
+ ErrorCodes::LockTimeout);
+ auto opCtx2Locker = static_cast<LockerImpl*>(opCtx2->lockState());
+ // GlobalLock failed, but the RSTL should be successfully acquired and pending unlocked.
+ ASSERT(opCtx2Locker->getRequestsForTest().find(resourceIdGlobal).finished());
+ ASSERT_EQ(opCtx2Locker->getRequestsForTest().find(resourceRSTL).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(opCtx2Locker->getRequestsForTest().find(resourceRSTL).objAddr()->recursiveCount, 1U);
+ opCtx2->lockState()->endWriteUnitOfWork();
+ ASSERT_EQ(opCtx1->lockState()->getLockMode(resourceRSTL), MODE_IX);
+ ASSERT_EQ(opCtx2->lockState()->getLockMode(resourceRSTL), MODE_NONE);
+}
+
TEST_F(DConcurrencyTestFixture, DBLockWaitIsInterruptible) {
auto clients = makeKClientsWithLockers(2);
auto opCtx1 = clients[0].second.get();
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 8f1ec5c6c21..4fef4eadb9e 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -433,8 +433,43 @@ void LockerImpl::endWriteUnitOfWork() {
}
}
-bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) {
- // Only the global WUOW can be released.
+void LockerImpl::releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) {
+ stateOut->wuowNestingLevel = _wuowNestingLevel;
+ _wuowNestingLevel = 0;
+
+ for (auto it = _requests.begin(); _numResourcesToUnlockAtEndUnitOfWork > 0; it.next()) {
+ if (it->unlockPending) {
+ while (it->unlockPending) {
+ it->unlockPending--;
+ stateOut->unlockPendingLocks.push_back({it.key(), it->mode});
+ }
+ _numResourcesToUnlockAtEndUnitOfWork--;
+ }
+ }
+}
+
+void LockerImpl::restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) {
+ invariant(_numResourcesToUnlockAtEndUnitOfWork == 0);
+ invariant(!inAWriteUnitOfWork());
+
+ for (auto& lock : stateToRestore.unlockPendingLocks) {
+ auto it = _requests.begin();
+ while (it && !(it.key() == lock.resourceId && it->mode == lock.mode)) {
+ it.next();
+ }
+ invariant(!it.finished());
+ if (!it->unlockPending) {
+ _numResourcesToUnlockAtEndUnitOfWork++;
+ }
+ it->unlockPending++;
+ }
+ // Equivalent to call beginWriteUnitOfWork() multiple times.
+ _wuowNestingLevel = stateToRestore.wuowNestingLevel;
+}
+
+bool LockerImpl::releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) {
+ // Only the global WUOW can be released, since we never need to release and restore
+ // nested WUOW's. Thus we don't have to remember the nesting level.
invariant(_wuowNestingLevel == 1);
--_wuowNestingLevel;
invariant(!isGlobalLockedRecursively());
@@ -444,14 +479,15 @@ bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) {
for (auto it = _requests.begin(); it; it.next()) {
// No converted lock so we don't need to unlock more than once.
invariant(it->unlockPending == 1);
+ it->unlockPending--;
}
_numResourcesToUnlockAtEndUnitOfWork = 0;
return saveLockStateAndUnlock(stateOut);
}
-void LockerImpl::restoreWriteUnitOfWork(OperationContext* opCtx,
- const LockSnapshot& stateToRestore) {
+void LockerImpl::restoreWriteUnitOfWorkAndLock(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) {
if (stateToRestore.globalMode != MODE_NONE) {
restoreLockState(opCtx, stateToRestore);
}
@@ -498,10 +534,9 @@ bool LockerImpl::unlock(ResourceId resId) {
_numResourcesToUnlockAtEndUnitOfWork++;
}
it->unlockPending++;
- // unlockPending will only be incremented if a lock is converted and unlock() is called
- // multiple times on one ResourceId.
- invariant(it->unlockPending < LockModesCount);
-
+ // unlockPending will be incremented if a lock is converted or acquired in the same mode
+ // recursively, and unlock() is called multiple times on one ResourceId.
+ invariant(it->unlockPending <= it->recursiveCount);
return false;
}
@@ -518,6 +553,7 @@ bool LockerImpl::unlockRSTLforPrepare() {
// If the RSTL is 'unlockPending' and we are fully unlocking it, then we do not want to
// attempt to unlock the RSTL when the WUOW ends, since it will already be unlocked.
if (rstlRequest->unlockPending) {
+ rstlRequest->unlockPending = 0;
_numResourcesToUnlockAtEndUnitOfWork--;
}
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index 0b9741633d5..7a592d1ac11 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -204,9 +204,12 @@ public:
restoreLockState(nullptr, stateToRestore);
}
- bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override;
- void restoreWriteUnitOfWork(OperationContext* opCtx,
- const LockSnapshot& stateToRestore) override;
+ bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) override;
+ void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) override;
+
+ void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) override;
+ void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) override;
virtual void releaseTicket();
virtual void reacquireTicket(OperationContext* opCtx);
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index 4f6a3628536..7c6cd0a7895 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -334,7 +334,7 @@ TEST_F(LockerImplTest, releaseWriteUnitOfWork) {
ASSERT_FALSE(locker.unlock(resIdDatabase));
ASSERT_FALSE(locker.unlockGlobal());
- ASSERT(locker.releaseWriteUnitOfWork(&lockInfo));
+ ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo));
// Things shouldn't be locked anymore.
ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
@@ -362,7 +362,7 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) {
ASSERT_FALSE(locker.unlock(resIdDatabase));
ASSERT_FALSE(locker.unlockGlobal());
- ASSERT(locker.releaseWriteUnitOfWork(&lockInfo));
+ ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo));
// Things shouldn't be locked anymore.
ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
@@ -370,7 +370,7 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) {
ASSERT_FALSE(locker.isLocked());
// Restore lock state.
- locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+ locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo);
// Make sure things were re-locked.
ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
@@ -384,6 +384,120 @@ TEST_F(LockerImplTest, restoreWriteUnitOfWork) {
ASSERT_FALSE(locker.isLocked());
}
+TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
+ Locker::WUOWLockSnapshot lockInfo;
+
+ LockerImpl locker;
+
+ const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd);
+ const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd);
+ const ResourceId resIdCollection2(RESOURCE_COLLECTION, "TestDB.collection2"_sd);
+
+ locker.beginWriteUnitOfWork();
+ // Lock some stuff.
+ locker.lockGlobal(MODE_IX);
+ locker.lock(resIdDatabase, MODE_IX);
+ locker.lock(resIdCollection, MODE_X);
+
+ // Recursive global lock.
+ locker.lockGlobal(MODE_IX);
+ ASSERT_FALSE(locker.unlockGlobal());
+
+ // Unlock them so that they will be pending to unlock.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+
+ locker.releaseWriteUnitOfWork(&lockInfo);
+ ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 4UL);
+
+ // Things should still be locked.
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT(locker.isLocked());
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+
+ // The locker is no longer participating the two-phase locking.
+ ASSERT_FALSE(locker.inAWriteUnitOfWork());
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0UL);
+
+ // Start a new WUOW with the same locker. Any new locks acquired in the new WUOW
+ // should participate two-phase locking.
+ {
+ locker.beginWriteUnitOfWork();
+
+ // Grab new locks inside the new WUOW.
+ locker.lockGlobal(MODE_IX);
+ locker.lock(resIdDatabase, MODE_IX);
+ locker.lock(resIdCollection2, MODE_IX);
+
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdCollection2));
+ ASSERT(locker.isLocked());
+
+ locker.unlock(resIdCollection2);
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U);
+ locker.unlock(resIdDatabase);
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 2UL);
+ // The DB lock has been locked twice, but only once in this WUOW.
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U);
+ locker.unlockGlobal();
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL);
+ // The global lock has been locked 3 times, but only 1 of them is part of this WUOW.
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 3U);
+ locker.endWriteUnitOfWork();
+ }
+ ASSERT_FALSE(locker.inAWriteUnitOfWork());
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0UL);
+
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT(locker.isLocked());
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ // The new locks has been released.
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2));
+
+ // Restore lock state.
+ locker.restoreWriteUnitOfWork(lockInfo);
+
+ ASSERT_TRUE(locker.inAWriteUnitOfWork());
+
+ // Make sure things are still locked.
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT(locker.isLocked());
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+
+ locker.endWriteUnitOfWork();
+
+ ASSERT_FALSE(locker.inAWriteUnitOfWork());
+
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2));
+ ASSERT_FALSE(locker.isLocked());
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 0U);
+ ASSERT(locker.getRequestsForTest().find(resourceIdGlobal).finished());
+}
+
TEST_F(LockerImplTest, releaseAndRestoreReadOnlyWriteUnitOfWork) {
Locker::LockSnapshot lockInfo;
@@ -407,14 +521,14 @@ TEST_F(LockerImplTest, releaseAndRestoreReadOnlyWriteUnitOfWork) {
ASSERT_EQ(3u, locker.numResourcesToUnlockAtEndUnitOfWorkForTest());
// Things shouldn't be locked anymore.
- ASSERT_TRUE(locker.releaseWriteUnitOfWork(&lockInfo));
+ ASSERT_TRUE(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo));
ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
ASSERT_FALSE(locker.isLocked());
// Restore lock state.
- locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+ locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo);
ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdDatabase));
ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdCollection));
@@ -437,11 +551,11 @@ TEST_F(LockerImplTest, releaseAndRestoreEmptyWriteUnitOfWork) {
locker.beginWriteUnitOfWork();
// Nothing to yield.
- ASSERT_FALSE(locker.releaseWriteUnitOfWork(&lockInfo));
+ ASSERT_FALSE(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo));
ASSERT_FALSE(locker.isLocked());
// Restore lock state.
- locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+ locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo);
ASSERT_FALSE(locker.isLocked());
locker.endWriteUnitOfWork();
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index 8f3575057a3..166bf0d124f 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -391,6 +391,18 @@ public:
};
/**
+ * WUOWLockSnapshot captures all resources that have pending unlocks when releasing the write
+ * unit of work. If a lock has more than one pending unlock, it appears more than once here.
+ */
+ struct WUOWLockSnapshot {
+ // Nested WUOW can be released and restored all together.
+ int wuowNestingLevel = 0;
+
+ // The order of locks doesn't matter in this vector.
+ std::vector<OneLock> unlockPendingLocks;
+ };
+
+ /**
* Retrieves all locks held by this transaction, other than RESOURCE_MUTEX locks, and what mode
* they're held in.
* Stores these locks in 'stateOut', destroying any previous state. Unlocks all locks
@@ -417,13 +429,22 @@ public:
virtual void restoreLockState(const LockSnapshot& stateToRestore) = 0;
/**
- * releaseWriteUnitOfWork opts out two-phase locking and yield the locks after a WUOW
- * has been released. restoreWriteUnitOfWork reaquires the locks and resume the two-phase
- * locking behavior of WUOW.
+ * releaseWriteUnitOfWorkAndUnlock opts out of two-phase locking and yields the locks after a
+ * WUOW has been released. restoreWriteUnitOfWorkAndLock reacquires the locks and resumes the
+ * two-phase locking behavior of WUOW.
+ */
+ virtual bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) = 0;
+ virtual void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) = 0;
+
+
+ /**
+ * releaseWriteUnitOfWork opts out of two-phase locking of the current locks held but keeps
+ * holding these locks.
+ * restoreWriteUnitOfWork resumes the two-phase locking behavior of WUOW.
*/
- virtual bool releaseWriteUnitOfWork(LockSnapshot* stateOut) = 0;
- virtual void restoreWriteUnitOfWork(OperationContext* opCtx,
- const LockSnapshot& stateToRestore) = 0;
+ virtual void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) = 0;
+ virtual void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) = 0;
/**
* Releases the ticket associated with the Locker. This allows locks to be held without
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 886510cb16c..3780e4f5c0b 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -197,12 +197,20 @@ public:
MONGO_UNREACHABLE;
}
- bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override {
+ bool releaseWriteUnitOfWorkAndUnlock(LockSnapshot* stateOut) override {
MONGO_UNREACHABLE;
}
- void restoreWriteUnitOfWork(OperationContext* opCtx,
- const LockSnapshot& stateToRestore) override {
+ void restoreWriteUnitOfWorkAndLock(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) override {
+ MONGO_UNREACHABLE;
+ };
+
+ void releaseWriteUnitOfWork(WUOWLockSnapshot* stateOut) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void restoreWriteUnitOfWork(const WUOWLockSnapshot& stateToRestore) override {
MONGO_UNREACHABLE;
};
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 50202fe433d..6f22faf4cf1 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -635,32 +635,14 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o
// We must lock the Client to change the Locker on the OperationContext.
stdx::lock_guard<Client> lk(*opCtx->getClient());
-
- // The new transaction should have an empty locker, and thus we do not need to save it.
- invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive);
- _locker = opCtx->swapLockState(stdx::make_unique<LockerImpl>());
- // Inherit the locking setting from the original one.
- opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(
- _locker->shouldConflictWithSecondaryBatchApplication());
- _locker->unsetThreadId();
- if (opCtx->getLogicalSessionId()) {
- _locker->setDebugInfo("lsid: " + opCtx->getLogicalSessionId()->toBSON().toString());
- }
-
- // OplogSlotReserver is only used by primary, so always set max transaction lock timeout.
- invariant(opCtx->writesAreReplicated());
- // This thread must still respect the transaction lock timeout, since it can prevent the
- // transaction from making progress.
- auto maxTransactionLockMillis = gMaxTransactionLockRequestTimeoutMillis.load();
- if (maxTransactionLockMillis >= 0) {
- opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
- }
-
// Save the RecoveryUnit from the new transaction and replace it with an empty one.
_recoveryUnit = opCtx->releaseRecoveryUnit();
opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(
opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+
+ // End two-phase locking on locker manually since the WUOW has been released.
+ _opCtx->lockState()->endWriteUnitOfWork();
}
TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() {
@@ -676,8 +658,6 @@ TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() {
// We should be at WUOW nesting level 1, only the top level WUOW for the oplog reservation
// side transaction.
_recoveryUnit->abortUnitOfWork();
- _locker->endWriteUnitOfWork();
- invariant(!_locker->inAWriteUnitOfWork());
}
// After releasing the oplog hole, the "all committed timestamp" can advance past
@@ -701,9 +681,7 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl,
// Inherit the locking setting from the original one.
opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(
_locker->shouldConflictWithSecondaryBatchApplication());
- if (stashStyle != StashStyle::kSideTransaction) {
- _locker->releaseTicket();
- }
+ _locker->releaseTicket();
_locker->unsetThreadId();
if (opCtx->getLogicalSessionId()) {
_locker->setDebugInfo("lsid: " + opCtx->getLogicalSessionId()->toBSON().toString());
@@ -712,13 +690,13 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl,
// On secondaries, we yield the locks for transactions.
if (stashStyle == StashStyle::kSecondary) {
_lockSnapshot = std::make_unique<Locker::LockSnapshot>();
- _locker->releaseWriteUnitOfWork(_lockSnapshot.get());
+ _locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get());
}
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = gMaxTransactionLockRequestTimeoutMillis.load();
- if (stashStyle != StashStyle::kSecondary && maxTransactionLockMillis >= 0) {
+ if (stashStyle == StashStyle::kPrimary && maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
@@ -754,7 +732,7 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) {
if (_lockSnapshot) {
invariant(!_locker->isLocked());
// opCtx is passed in to enable the restoration to be interrupted.
- _locker->restoreWriteUnitOfWork(opCtx, *_lockSnapshot);
+ _locker->restoreWriteUnitOfWorkAndLock(opCtx, *_lockSnapshot);
_lockSnapshot.reset(nullptr);
}
_locker->reacquireTicket(opCtx);
@@ -784,18 +762,43 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) {
TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx)
: _opCtx(opCtx) {
- if (_opCtx->getWriteUnitOfWork()) {
- stdx::lock_guard<Client> lk(*_opCtx->getClient());
- _txnResources = TransactionParticipant::TxnResources(
- lk, _opCtx, TxnResources::StashStyle::kSideTransaction);
+ if (!_opCtx->getWriteUnitOfWork()) {
+ return;
}
+
+ // Release WUOW.
+ _ruState = opCtx->getWriteUnitOfWork()->release();
+ opCtx->setWriteUnitOfWork(nullptr);
+
+ // Remember the locking state of WUOW, opt out two-phase locking, but don't release locks.
+ opCtx->lockState()->releaseWriteUnitOfWork(&_WUOWLockSnapshot);
+
+ // Release recovery unit, saving the recovery unit off to the side, keeping open the storage
+ // transaction.
+ _recoveryUnit = opCtx->releaseRecoveryUnit();
+ opCtx->setRecoveryUnit(std::unique_ptr<RecoveryUnit>(
+ opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()),
+ WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
}
TransactionParticipant::SideTransactionBlock::~SideTransactionBlock() {
- if (_txnResources) {
- _txnResources->release(_opCtx);
+ if (!_recoveryUnit) {
+ return;
}
+
+ // Restore locker's state about WUOW.
+ _opCtx->lockState()->restoreWriteUnitOfWork(_WUOWLockSnapshot);
+
+ // Restore recovery unit.
+ auto oldState = _opCtx->setRecoveryUnit(std::move(_recoveryUnit),
+ WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+ invariant(oldState == WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork,
+ str::stream() << "RecoveryUnit state was " << oldState);
+
+ // Restore WUOW.
+ _opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(_opCtx, _ruState));
}
+
void TransactionParticipant::Participant::_stashActiveTransaction(OperationContext* opCtx) {
if (p().inShutdown) {
return;
@@ -893,6 +896,8 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC
// yield and restore all locks on state transition. Otherwise, we'd have to remember
// which locks are managed by WUOW.
invariant(!opCtx->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isRSTLLocked());
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
// Stashed transaction resources do not exist for this in-progress multi-document
// transaction. Set up the transaction resources on the opCtx.
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 505e9df3f42..3dd3e27c5b6 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -197,7 +197,7 @@ public:
*/
class TxnResources {
public:
- enum class StashStyle { kPrimary, kSecondary, kSideTransaction };
+ enum class StashStyle { kPrimary, kSecondary };
/**
* Stashes transaction state from 'opCtx' in the newly constructed TxnResources.
@@ -241,8 +241,10 @@ public:
};
/**
- * An RAII object that stashes `TxnResouces` from the `opCtx` onto the stack. At destruction
- * it unstashes the `TxnResources` back onto the `opCtx`.
+ * An RAII object that stashes the recovery unit from the `opCtx` onto the stack and keeps
+ * using the same locker of `opCtx`. The locker opts out of two-phase locking of the
+ * current WUOW. At destruction it unstashes the recovery unit back onto the `opCtx` and
+ * restores the locker state relevant to the original WUOW.
*/
class SideTransactionBlock {
public:
@@ -250,7 +252,9 @@ public:
~SideTransactionBlock();
private:
- boost::optional<TxnResources> _txnResources;
+ Locker::WUOWLockSnapshot _WUOWLockSnapshot;
+ std::unique_ptr<RecoveryUnit> _recoveryUnit;
+ WriteUnitOfWork::RecoveryUnitState _ruState;
OperationContext* _opCtx;
};
@@ -850,7 +854,6 @@ private:
private:
OperationContext* _opCtx;
- std::unique_ptr<Locker> _locker;
std::unique_ptr<RecoveryUnit> _recoveryUnit;
std::vector<OplogSlot> _oplogSlots;
};
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 00d04754702..ab26ab2f1a3 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -571,7 +571,7 @@ TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransac
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
@@ -605,7 +605,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithCommitTimestampFailsOnUnprepared
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction(opCtx(), commitTimestamp, {}),
AssertionException,
ErrorCodes::InvalidOptions);
@@ -626,7 +626,7 @@ TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnprepare
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant.commitUnpreparedTransaction(opCtx());
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
@@ -643,7 +643,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithoutCommitTimestampFailsOnPrepare
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant.prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant.commitUnpreparedTransaction(opCtx()),
AssertionException,
@@ -657,7 +657,7 @@ TEST_F(TxnParticipantTest, CommitTransactionWithNullCommitTimestampFailsOnPrepar
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant.prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction(opCtx(), Timestamp(), {}),
AssertionException,
@@ -672,7 +672,7 @@ TEST_F(TxnParticipantTest,
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
- Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant.commitPreparedTransaction(
opCtx(), Timestamp(prepareTimestamp.getSecs() - 1, 1), {}),