summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-05-07 10:10:02 -0400
committerLingzhi Deng <lingzhi.deng@mongodb.com>2019-05-08 18:08:16 -0400
commit44f49a9a97b382e07beca0d3fabf5991eb3a41fa (patch)
tree3b10ba122e620d256fd23a6d2239680101bc0c04
parentc24b5c1df7d946dd1c931f5c93c7098c9cf8545d (diff)
downloadmongo-44f49a9a97b382e07beca0d3fabf5991eb3a41fa.tar.gz
SERVER-40705: No need to delay unlocking if lock is recursively acquired
-rw-r--r--jstests/replsets/prepared_transaction_on_failover.js64
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp8
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp123
-rw-r--r--src/mongo/db/transaction_participant.cpp3
4 files changed, 172 insertions, 26 deletions
diff --git a/jstests/replsets/prepared_transaction_on_failover.js b/jstests/replsets/prepared_transaction_on_failover.js
index a87abbfc512..48c75700a42 100644
--- a/jstests/replsets/prepared_transaction_on_failover.js
+++ b/jstests/replsets/prepared_transaction_on_failover.js
@@ -16,7 +16,7 @@
const collName = "coll";
const otherDbName = dbName + "_other";
- function testTransactionsWithFailover(stepDownFunction) {
+ function testTransactionsWithFailover(doWork, stepDown, postCommit) {
const primary = replTest.getPrimary();
const newPrimary = replTest.getSecondary();
const testDB = primary.getDB(dbName);
@@ -27,17 +27,15 @@
jsTestLog("Starting transaction");
const session = primary.startSession({causalConsistency: false});
- const sessionDB = session.getDatabase(dbName);
session.startTransaction({writeConcern: {w: "majority"}});
- const doc = {_id: "txn on primary " + primary};
- assert.commandWorked(sessionDB.getCollection(collName).insert(doc));
+ doWork(primary, session);
jsTestLog("Putting transaction into prepare");
const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
replTest.awaitReplication();
- stepDownFunction();
+ stepDown();
reconnect(primary);
jsTestLog("Waiting for the other node to run for election and become primary");
@@ -61,8 +59,7 @@
assert.commandWorked(PrepareHelpers.commitTransaction(newSession, prepareTimestamp));
replTest.awaitReplication();
- assert.docEq(doc, testDB.getCollection(collName).findOne());
- assert.docEq(doc, newPrimary.getDB(dbName).getCollection(collName).findOne());
+ postCommit(primary, newPrimary);
jsTestLog("Running another transaction on the new primary");
const secondSession = newPrimary.startSession({causalConsistency: false});
@@ -70,19 +67,68 @@
assert.commandWorked(
secondSession.getDatabase(dbName).getCollection(collName).insert({_id: "second-doc"}));
secondSession.commitTransaction();
+
+ // Unfreeze the original primary so that it can stand for election again for the next test.
+ assert.commandWorked(primary.adminCommand({replSetFreeze: 0}));
+ }
+
+ function doInsert(primary, session) {
+ const doc = {_id: "txn on primary " + primary};
+ jsTestLog("Inserting a document in a transaction.");
+ assert.commandWorked(session.getDatabase(dbName).getCollection(collName).insert(doc));
+ }
+ function postInsert(primary, newPrimary) {
+ const doc = {_id: "txn on primary " + primary};
+ assert.docEq(doc, primary.getDB(dbName).getCollection(collName).findOne());
+ assert.docEq(doc, newPrimary.getDB(dbName).getCollection(collName).findOne());
+ }
+
+ function doInsertTextSearch(primary, session) {
+ // Create an index outside of the transaction.
+ assert.commandWorked(
+ primary.getDB(dbName).getCollection(collName).createIndex({text: "text"}));
+
+ // Do the followings in a transaction.
+ jsTestLog("Inserting a document in a transaction.");
+ assert.commandWorked(
+ session.getDatabase(dbName).getCollection(collName).insert({text: "text"}));
+ // Text search will recursively acquire the global lock. This tests that yielding
+ // recursively held locks works on step down.
+ jsTestLog("Doing a text search in a transaction.");
+ assert.eq(1,
+ session.getDatabase(dbName)
+ .getCollection(collName)
+ .find({$text: {$search: "text"}})
+ .itcount());
+ }
+ function postInsertTextSearch(primary, newPrimary) {
+ assert.eq(1,
+ primary.getDB(dbName)
+ .getCollection(collName)
+ .find({$text: {$search: "text"}})
+ .itcount());
+ assert.eq(1,
+ newPrimary.getDB(dbName)
+ .getCollection(collName)
+ .find({$text: {$search: "text"}})
+ .itcount());
}
function stepDownViaHeartbeat() {
jsTestLog("Stepping down primary via heartbeat");
replTest.stepUp(replTest.getSecondary());
}
- testTransactionsWithFailover(stepDownViaHeartbeat);
function stepDownViaCommand() {
jsTestLog("Stepping down primary via command");
assert.commandWorked(replTest.getPrimary().adminCommand({replSetStepDown: 10}));
}
- testTransactionsWithFailover(stepDownViaCommand);
+
+ testTransactionsWithFailover(doInsert, stepDownViaHeartbeat, postInsert);
+ testTransactionsWithFailover(doInsert, stepDownViaCommand, postInsert);
+
+ testTransactionsWithFailover(doInsertTextSearch, stepDownViaHeartbeat, postInsertTextSearch);
+ testTransactionsWithFailover(doInsertTextSearch, stepDownViaCommand, postInsertTextSearch);
replTest.stopSet();
})();
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 4fef4eadb9e..75a6a771eea 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -530,6 +530,14 @@ bool LockerImpl::unlock(ResourceId resId) {
return false;
if (inAWriteUnitOfWork() && _shouldDelayUnlock(it.key(), (it->mode))) {
+ // Only delay unlocking if the lock is not acquired more than once. Otherwise, we can simply
+ // call _unlockImpl to decrement recursiveCount instead of incrementing unlockPending. This
+ // is safe because the lock is still being held in the strongest mode necessary.
+ if (it->recursiveCount > 1) {
+ // Invariant that the lock is still being held.
+ invariant(!_unlockImpl(&it));
+ return false;
+ }
if (!it->unlockPending) {
_numResourcesToUnlockAtEndUnitOfWork++;
}
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index 7c6cd0a7895..7c7f25418ff 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -401,6 +401,8 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
// Recursive global lock.
locker.lockGlobal(MODE_IX);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+
ASSERT_FALSE(locker.unlockGlobal());
// Unlock them so that they will be pending to unlock.
@@ -410,11 +412,11 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL);
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
locker.releaseWriteUnitOfWork(&lockInfo);
- ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 4UL);
+ ASSERT_EQ(lockInfo.unlockPendingLocks.size(), 3UL);
// Things should still be locked.
ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
@@ -423,7 +425,7 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
ASSERT(locker.isLocked());
ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
// The locker is no longer participating the two-phase locking.
ASSERT_FALSE(locker.inAWriteUnitOfWork());
@@ -448,15 +450,13 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U);
locker.unlock(resIdDatabase);
- ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 2UL);
- // The DB lock has been locked twice, but only once in this WUOW.
- ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
- ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
locker.unlockGlobal();
- ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 3UL);
- // The global lock has been locked 3 times, but only 1 of them is part of this WUOW.
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 3U);
+ ASSERT_EQ(locker.numResourcesToUnlockAtEndUnitOfWorkForTest(), 1UL);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
locker.endWriteUnitOfWork();
}
ASSERT_FALSE(locker.inAWriteUnitOfWork());
@@ -468,7 +468,7 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
ASSERT(locker.isLocked());
ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
// The new locks has been released.
ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection2));
@@ -483,8 +483,8 @@ TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithoutUnlock) {
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
ASSERT(locker.isLocked());
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 2U);
- ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
locker.endWriteUnitOfWork();
@@ -562,6 +562,89 @@ TEST_F(LockerImplTest, releaseAndRestoreEmptyWriteUnitOfWork) {
ASSERT_FALSE(locker.isLocked());
}
+TEST_F(LockerImplTest, releaseAndRestoreWriteUnitOfWorkWithRecursiveLocks) {
+ Locker::LockSnapshot lockInfo;
+
+ LockerImpl locker;
+
+ const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd);
+ const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd);
+
+ locker.beginWriteUnitOfWork();
+ // Lock some stuff.
+ locker.lockGlobal(MODE_IX);
+ locker.lock(resIdDatabase, MODE_IX);
+ locker.lock(resIdCollection, MODE_X);
+ // Recursively lock them again with a weaker mode.
+ locker.lockGlobal(MODE_IS);
+ locker.lock(resIdDatabase, MODE_IS);
+ locker.lock(resIdCollection, MODE_S);
+
+ // Make sure locks are converted.
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_TRUE(locker.isWriteLocked());
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 2U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 2U);
+
+ // Unlock them so that they will be pending to unlock.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+ // Make sure locks are still acquired in the correct mode.
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_TRUE(locker.isWriteLocked());
+ // Make sure unlocking converted locks decrements the locks' recursiveCount instead of
+ // incrementing unlockPending.
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 0U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 0U);
+
+ // Unlock again so unlockPending == recursiveCount.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 1U);
+
+ ASSERT(locker.releaseWriteUnitOfWorkAndUnlock(&lockInfo));
+
+ // Things shouldn't be locked anymore.
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+
+ // Restore lock state.
+ locker.restoreWriteUnitOfWorkAndLock(nullptr, lockInfo);
+
+ // Make sure things were re-locked in the correct mode.
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT_TRUE(locker.isWriteLocked());
+ // Make sure locks were coalesced after restore and are pending to unlock as before.
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resourceIdGlobal).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdDatabase).objAddr()->unlockPending, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->recursiveCount, 1U);
+ ASSERT_EQ(locker.getRequestsForTest().find(resIdCollection).objAddr()->unlockPending, 1U);
+
+ locker.endWriteUnitOfWork();
+
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+}
+
TEST_F(LockerImplTest, DefaultLocker) {
const ResourceId resId(RESOURCE_DATABASE, "TestDB"_sd);
@@ -1005,11 +1088,13 @@ TEST_F(LockerImplTest, ConvertLockPendingUnlock) {
ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_IX));
ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1);
ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1);
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1);
// Convert lock pending unlock.
locker.lock(resId, MODE_X);
ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1);
ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1);
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 2);
locker.endWriteUnitOfWork();
@@ -1035,16 +1120,22 @@ TEST_F(LockerImplTest, ConvertLockPendingUnlockAndUnlock) {
ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_IX));
ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1);
ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1);
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1);
// Convert lock pending unlock.
locker.lock(resId, MODE_X);
ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1);
ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1);
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 2);
// Unlock the lock conversion.
ASSERT_FALSE(locker.unlock(resId));
ASSERT(locker.numResourcesToUnlockAtEndUnitOfWorkForTest() == 1);
- ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 2);
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->unlockPending == 1);
+ // Make sure we still hold X lock and unlock the weaker mode to decrement recursiveCount instead
+ // of incrementing unlockPending.
+ ASSERT_TRUE(locker.isLockHeldForMode(resId, MODE_X));
+ ASSERT(locker.getRequestsForTest().find(resId).objAddr()->recursiveCount == 1);
locker.endWriteUnitOfWork();
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 43e4dc37233..26b4e81bad0 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -690,7 +690,8 @@ TransactionParticipant::TxnResources::TxnResources(WithLock wl,
// On secondaries, we yield the locks for transactions.
if (stashStyle == StashStyle::kSecondary) {
_lockSnapshot = std::make_unique<Locker::LockSnapshot>();
- _locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get());
+ // Transactions have at least a global IX lock. Invariant that we have something to release.
+ invariant(_locker->releaseWriteUnitOfWorkAndUnlock(_lockSnapshot.get()));
}
// This thread must still respect the transaction lock timeout, since it can prevent the