diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-05-07 10:10:02 -0400 |
---|---|---|
committer | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-05-08 18:08:16 -0400 |
commit | 44f49a9a97b382e07beca0d3fabf5991eb3a41fa (patch) | |
tree | 3b10ba122e620d256fd23a6d2239680101bc0c04 | |
parent | c24b5c1df7d946dd1c931f5c93c7098c9cf8545d (diff) | |
download | mongo-44f49a9a97b382e07beca0d3fabf5991eb3a41fa.tar.gz |
SERVER-40705: No need to delay unlocking if lock is recursively acquired
-rw-r--r-- | jstests/replsets/prepared_transaction_on_failover.js | 64 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state_test.cpp | 123 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 3 |
4 files changed, 172 insertions, 26 deletions
diff --git a/jstests/replsets/prepared_transaction_on_failover.js b/jstests/replsets/prepared_transaction_on_failover.js index a87abbfc512..48c75700a42 100644 --- a/jstests/replsets/prepared_transaction_on_failover.js +++ b/jstests/replsets/prepared_transaction_on_failover.js @@ -16,7 +16,7 @@ const collName = "coll"; const otherDbName = dbName + "_other"; - function testTransactionsWithFailover(stepDownFunction) { + function testTransactionsWithFailover(doWork, stepDown, postCommit) { const primary = replTest.getPrimary(); const newPrimary = replTest.getSecondary(); const testDB = primary.getDB(dbName); @@ -27,17 +27,15 @@ jsTestLog("Starting transaction"); const session = primary.startSession({causalConsistency: false}); - const sessionDB = session.getDatabase(dbName); session.startTransaction({writeConcern: {w: "majority"}}); - const doc = {_id: "txn on primary " + primary}; - assert.commandWorked(sessionDB.getCollection(collName).insert(doc)); + doWork(primary, session); jsTestLog("Putting transaction into prepare"); const prepareTimestamp = PrepareHelpers.prepareTransaction(session); replTest.awaitReplication(); - stepDownFunction(); + stepDown(); reconnect(primary); jsTestLog("Waiting for the other node to run for election and become primary"); @@ -61,8 +59,7 @@ assert.commandWorked(PrepareHelpers.commitTransaction(newSession, prepareTimestamp)); replTest.awaitReplication(); - assert.docEq(doc, testDB.getCollection(collName).findOne()); - assert.docEq(doc, newPrimary.getDB(dbName).getCollection(collName).findOne()); + postCommit(primary, newPrimary); jsTestLog("Running another transaction on the new primary"); const secondSession = newPrimary.startSession({causalConsistency: false}); @@ -70,19 +67,68 @@ assert.commandWorked( secondSession.getDatabase(dbName).getCollection(collName).insert({_id: "second-doc"})); secondSession.commitTransaction(); + + // Unfreeze the original primary so that it can stand for election again for the next test. + assert.commandWorked(primary.adminCommand({replSetFreeze: 0})); + } + + function doInsert(primary, session) { + const doc = {_id: "txn on primary " + primary}; + jsTestLog("Inserting a document in a transaction."); + assert.commandWorked(session.getDatabase(dbName).getCollection(collName).insert(doc)); + } + function postInsert(primary, newPrimary) { + const doc = {_id: "txn on primary " + primary}; + assert.docEq(doc, primary.getDB(dbName).getCollection(collName).findOne()); + assert.docEq(doc, newPrimary.getDB(dbName).getCollection(collName).findOne()); + } + + function doInsertTextSearch(primary, session) { + // Create an index outside of the transaction. + assert.commandWorked( + primary.getDB(dbName).getCollection(collName).createIndex({text: "text"})); + + // Do the followings in a transaction. + jsTestLog("Inserting a document in a transaction."); + assert.commandWorked( + session.getDatabase(dbName).getCollection(collName).insert({text: "text"})); + // Text search will recursively acquire the global lock. This tests that yielding + // recursively held locks works on step down. + jsTestLog("Doing a text search in a transaction."); + assert.eq(1, + session.getDatabase(dbName) + .getCollection(collName) + .find({$text: {$search: "text"}}) + .itcount()); + } + function postInsertTextSearch(primary, newPrimary) { + assert.eq(1, + primary.getDB(dbName) + .getCollection(collName) + .find({$text: {$search: "text"}}) + .itcount()); + assert.eq(1, + newPrimary.getDB(dbName) + .getCollection(collName) + .find({$text: {$search: "text"}}) + .itcount()); } function stepDownViaHeartbeat() { jsTestLog("Stepping down primary via heartbeat"); replTest.stepUp(replTest.getSecondary()); } - testTransactionsWithFailover(stepDownViaHeartbeat); function stepDownViaCommand() { jsTestLog("Stepping down primary via command"); assert.commandWorked(replTest.getPrimary().adminCommand({replSetStepDown: 10})); } - testTransactionsWithFailover(stepDownViaCommand); + + testTransactionsWithFailover(doInsert, stepDownViaHeartbeat, postInsert); + testTransactionsWithFailover(doInsert, stepDownViaCommand, postInsert); + + testTransactionsWithFailover(doInsertTextSearch, stepDownViaHeartbeat, postInsertTextSearch); + testTransactionsWithFailover(doInsertTextSearch, stepDownViaCommand, postInsertTextSearch); replTest.stopSet(); })(); diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 4fef4eadb9e..75a6a771eea 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -530,6 +530,14 @@ bool LockerImpl::unlock(ResourceId resId) { return false; if (inAWriteUnitOfWork() && _shouldDelayUnlock(it.key(), (it->mode))) { + // Only delay unlocking if the lock is not acquired more than once. Otherwise, we can simply + // call _unlockImpl to decrement recursiveCount instead of incrementing unlockPending. This + // is safe because the lock is still being held in the strongest mode necessary. + if (it->recursiveCount > 1) { + // Invariant that the lock is still being held. + invariant(!_unlockImpl(&it)); + return false; + } if (!it->unlockPending) { _numResourcesToUnlockAtEndUnitOfWork++; } diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp index 7c6cd0a7895..7c7f25418ff 100644 --- a/src/mongo/db/concurrency/lock_state_test.cpp +++ b/src/mongo/db/concurrency/lock_state_test.cpp @@ -401,6 +401,8 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { // Recursive global lock. locker.lockGlobal(MODE_IX); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_FALSE(locker.unlockGlobal()); // Unlock them so that they will be pending to unlock. @@ -410,11 +412,11 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL); ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); locker.releaseWriteUnitOfWork(&lockInfo); - ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 4UL); + ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 3UL); // Things should still be locked. ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); @@ -423,7 +425,7 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); ASSERT(locker.isLocked()); ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); // The locker is no longer participating the two-phase locking. ASSERT_FALSE(locker.inAWriteUnitOfWork()); @@ -448,15 +450,13 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U); locker.unlock(resIdDatabase); - ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 2UL); - // The DB lock has been locked twice, but only once in this WUOW. - ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); - ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); locker.unlockGlobal(); - ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL); - // The global lock has been locked 3 times, but only 1 of them is part of this WUOW. - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 3U); + ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); locker.endWriteUnitOfWork(); } ASSERT_FALSE(locker.inAWriteUnitOfWork()); @@ -468,7 +468,7 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); ASSERT(locker.isLocked()); ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); // The new locks has been released. ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2)); @@ -483,8 +483,8 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) { ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); ASSERT(locker.isLocked()); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U); - ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); locker.endWriteUnitOfWork(); @@ -562,6 +562,89 @@ TEST_F(LockerImplTest, releaseAndRestoreEmptyWriteUnitOfWork) { ASSERT_FALSE(locker.isLocked()); } +TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithRecursiveLocks) { + Locker::LockSnapshot lockInfo; + + LockerImpl locker; + + const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd); + const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd); + + locker.beginWriteUnitOfWork(); + // Lock some stuff. + locker.lockGlobal(MODE_IX); + locker.lock(resIdDatabase, MODE_IX); + locker.lock(resIdCollection, MODE_X); + // Recursively lock them again with a weaker mode. + locker.lockGlobal(MODE_IS); + locker.lock(resIdDatabase, MODE_IS); + locker.lock(resIdCollection, MODE_S); + + // Make sure locks are converted. + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_TRUE(locker.isWriteLocked()); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 2U); + + // Unlock them so that they will be pending to unlock. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + // Make sure locks are still acquired in the correct mode. + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_TRUE(locker.isWriteLocked()); + // Make sure unlocking converted locks decrements the locks' recursiveCount instead of + // incrementing unlockPending. + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 0U); + + // Unlock again so unlockPending == recursiveCount. + ASSERT_FALSE(locker.unlock(resIdCollection)); + ASSERT_FALSE(locker.unlock(resIdDatabase)); + ASSERT_FALSE(locker.unlockGlobal()); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 1U); + + ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo)); + + // Things shouldn't be locked anymore. + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); + + // Restore lock state. + locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo); + + // Make sure things were re-locked in the correct mode. + ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection)); + ASSERT_TRUE(locker.isWriteLocked()); + // Make sure locks were coalesced after restore and are pending to unlock as before. + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U); + ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 1U); + + locker.endWriteUnitOfWork(); + + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase)); + ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection)); + ASSERT_FALSE(locker.isLocked()); +} + TEST_F(LockerImplTest, DefaultLocker) { const ResourceId resId(RESOURCE_DATABASE, "TestDB"_sd); @@ -1005,11 +1088,13 @@ TEST_F(LockerImplTest, ConvertLockPendingUnlock) { ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_IX)); ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1); ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1); // Convert lock pending unlock. locker.lock(resId, MODE_X); ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1); ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 2); locker.endWriteUnitOfWork(); @@ -1035,16 +1120,22 @@ TEST_F(LockerImplTest, ConvertLockPendingUnlockAndUnlock) { ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_IX)); ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1); ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1); // Convert lock pending unlock. locker.lock(resId, MODE_X); ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1); ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 2); // Unlock the lock conversion. ASSERT_FALSE(locker.unlock(resId)); ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1); - ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 2); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1); + // Make sure we still hold X lock and unlock the weaker mode to decrement recursiveCount instead + // of incrementing unlockPending. + ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_X)); + ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1); locker.endWriteUnitOfWork(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 43e4dc37233..26b4e81bad0 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -690,7 +690,8 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl, // On secondaries, we yield the locks for transactions. if (stashStyle == StashStyle::kSecondary) { _lockSnapshot = std::make_unique<Locker::LockSnapshot>(); - _locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get()); + // Transactions have at least a global IX lock. Invariant that we have something to release. + invariant(_locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get())); } // This thread must still respect the transaction lock timeout, since it can prevent the |