summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-11-14 18:34:27 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-12-03 18:21:37 -0500
commit55e72b015e2aa7297c00db29e4d93451ea61a7ca (patch)
tree8f91b68f97adc99332688bfcfaa04f9818679851 /src
parent74921ac92c1330f754eed39c8e7148955aca2be9 (diff)
downloadmongo-55e72b015e2aa7297c00db29e4d93451ea61a7ca.tar.gz
SERVER-37199 Yield locks of transactions in secondary application.r4.1.6
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/concurrency/lock_manager.cpp1
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp34
-rw-r--r--src/mongo/db/concurrency/lock_state.h8
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp132
-rw-r--r--src/mongo/db/concurrency/locker.h13
-rw-r--r--src/mongo/db/concurrency/locker_noop.h13
-rw-r--r--src/mongo/db/transaction_participant.cpp31
-rw-r--r--src/mongo/db/transaction_participant.h1
8 files changed, 222 insertions, 11 deletions
diff --git a/src/mongo/db/concurrency/lock_manager.cpp b/src/mongo/db/concurrency/lock_manager.cpp
index 1b8f5f644a5..1a5520494bf 100644
--- a/src/mongo/db/concurrency/lock_manager.cpp
+++ b/src/mongo/db/concurrency/lock_manager.cpp
@@ -1033,6 +1033,7 @@ void LockRequest::initNew(Locker* locker, LockGrantNotification* notify) {
partitioned = false;
mode = MODE_NONE;
convertMode = MODE_NONE;
+ unlockPending = 0;
}
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index e47fd47990f..421b201af00 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -423,6 +423,40 @@ void LockerImpl::endWriteUnitOfWork() {
}
}
+bool LockerImpl::releaseWriteUnitOfWork(LockSnapshot* stateOut) {
+ // Only the global WUOW can be released.
+ invariant(_wuowNestingLevel == 1);
+ --_wuowNestingLevel;
+ invariant(!isGlobalLockedRecursively());
+
+ // All locks should be pending to unlock.
+ invariant(_requests.size() == _numResourcesToUnlockAtEndUnitOfWork);
+ for (auto it = _requests.begin(); it; it.next()) {
+ // No converted lock so we don't need to unlock more than once.
+ invariant(it->unlockPending == 1);
+ }
+ _numResourcesToUnlockAtEndUnitOfWork = 0;
+
+ return saveLockStateAndUnlock(stateOut);
+}
+
+void LockerImpl::restoreWriteUnitOfWork(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) {
+ if (stateToRestore.globalMode != MODE_NONE) {
+ restoreLockState(opCtx, stateToRestore);
+ }
+
+ invariant(_numResourcesToUnlockAtEndUnitOfWork == 0);
+ for (auto it = _requests.begin(); it; it.next()) {
+ invariant(_shouldDelayUnlock(it.key(), (it->mode)));
+ invariant(it->unlockPending == 0);
+ it->unlockPending++;
+ }
+ _numResourcesToUnlockAtEndUnitOfWork = static_cast<unsigned>(_requests.size());
+
+ beginWriteUnitOfWork();
+}
+
LockResult LockerImpl::lock(OperationContext* opCtx,
ResourceId resId,
LockMode mode,
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index 599cb95bd2d..ab21b58bb3a 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -149,8 +149,8 @@ public:
virtual LockResult lockRSTLBegin(OperationContext* opCtx);
virtual LockResult lockRSTLComplete(OperationContext* opCtx, Date_t deadline);
- virtual void beginWriteUnitOfWork();
- virtual void endWriteUnitOfWork();
+ virtual void beginWriteUnitOfWork() override;
+ virtual void endWriteUnitOfWork() override;
virtual bool inAWriteUnitOfWork() const {
return _wuowNestingLevel > 0;
@@ -194,6 +194,10 @@ public:
restoreLockState(nullptr, stateToRestore);
}
+ bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override;
+ void restoreWriteUnitOfWork(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) override;
+
virtual void releaseTicket();
virtual void reacquireTicket(OperationContext* opCtx);
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index a0094d3d485..b731f556057 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -254,6 +254,138 @@ TEST(LockerImpl, saveAndRestoreDBAndCollection) {
ASSERT(locker.unlockGlobal());
}
+TEST(LockerImpl, releaseWriteUnitOfWork) {
+ Locker::LockSnapshot lockInfo;
+
+ LockerImpl locker;
+
+ const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd);
+ const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd);
+
+ locker.beginWriteUnitOfWork();
+ // Lock some stuff.
+ locker.lockGlobal(MODE_IX);
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IX));
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_X));
+ // Unlock them so that they will be pending to unlock.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+
+ ASSERT(locker.releaseWriteUnitOfWork(&lockInfo));
+
+ // Things shouldn't be locked anymore.
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+
+ // Destructor should succeed since the locker's state should be empty.
+}
+
+TEST(LockerImpl, restoreWriteUnitOfWork) {
+ Locker::LockSnapshot lockInfo;
+
+ LockerImpl locker;
+
+ const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd);
+ const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd);
+
+ locker.beginWriteUnitOfWork();
+ // Lock some stuff.
+ locker.lockGlobal(MODE_IX);
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IX));
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_X));
+ // Unlock them so that they will be pending to unlock.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+
+ ASSERT(locker.releaseWriteUnitOfWork(&lockInfo));
+
+ // Things shouldn't be locked anymore.
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+
+ // Restore lock state.
+ locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+
+ // Make sure things were re-locked.
+ ASSERT_EQUALS(MODE_IX, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_X, locker.getLockMode(resIdCollection));
+ ASSERT(locker.isLocked());
+
+ locker.endWriteUnitOfWork();
+
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+}
+
+TEST(LockerImpl, releaseAndRestoreReadOnlyWriteUnitOfWork) {
+ Locker::LockSnapshot lockInfo;
+
+ LockerImpl locker;
+
+ const ResourceId resIdDatabase(RESOURCE_DATABASE, "TestDB"_sd);
+ const ResourceId resIdCollection(RESOURCE_COLLECTION, "TestDB.collection"_sd);
+
+ // Snapshot transactions delay shared locks as well.
+ locker.setSharedLocksShouldTwoPhaseLock(true);
+
+ locker.beginWriteUnitOfWork();
+ // Lock some stuff in IS mode.
+ locker.lockGlobal(MODE_IS);
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdDatabase, MODE_IS));
+ ASSERT_EQUALS(LOCK_OK, locker.lock(resIdCollection, MODE_IS));
+ // Unlock them.
+ ASSERT_FALSE(locker.unlock(resIdCollection));
+ ASSERT_FALSE(locker.unlock(resIdDatabase));
+ ASSERT_FALSE(locker.unlockGlobal());
+ ASSERT_EQ(3u, locker.numResourcesToUnlockAtEndUnitOfWorkForTest());
+
+ // Things shouldn't be locked anymore.
+ ASSERT_TRUE(locker.releaseWriteUnitOfWork(&lockInfo));
+
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+
+ // Restore lock state.
+ locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+
+ ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_IS, locker.getLockMode(resIdCollection));
+ ASSERT_TRUE(locker.isLocked());
+
+ locker.endWriteUnitOfWork();
+
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdDatabase));
+ ASSERT_EQUALS(MODE_NONE, locker.getLockMode(resIdCollection));
+ ASSERT_FALSE(locker.isLocked());
+}
+
+TEST(LockerImpl, releaseAndRestoreEmptyWriteUnitOfWork) {
+ Locker::LockSnapshot lockInfo;
+ LockerImpl locker;
+
+ // Snapshot transactions delay shared locks as well.
+ locker.setSharedLocksShouldTwoPhaseLock(true);
+
+ locker.beginWriteUnitOfWork();
+
+ // Nothing to yield.
+ ASSERT_FALSE(locker.releaseWriteUnitOfWork(&lockInfo));
+ ASSERT_FALSE(locker.isLocked());
+
+ // Restore lock state.
+ locker.restoreWriteUnitOfWork(nullptr, lockInfo);
+ ASSERT_FALSE(locker.isLocked());
+
+ locker.endWriteUnitOfWork();
+ ASSERT_FALSE(locker.isLocked());
+}
+
TEST(LockerImpl, DefaultLocker) {
const ResourceId resId(RESOURCE_DATABASE, "TestDB"_sd);
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index d1ea2830a91..ee4b7118b5d 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -206,8 +206,8 @@ public:
/**
* beginWriteUnitOfWork/endWriteUnitOfWork are called at the start and end of WriteUnitOfWorks.
- * They can be used to implement two-phase locking. Each call to begin should be matched with an
- * eventual call to end.
+ * They can be used to implement two-phase locking. Each call to begin or restore should be
+ * matched with an eventual call to end or release.
*
* endWriteUnitOfWork, if not called in a nested WUOW, will release all two-phase locking held
* lock resources.
@@ -375,6 +375,15 @@ public:
virtual void restoreLockState(const LockSnapshot& stateToRestore) = 0;
/**
+ * releaseWriteUnitOfWork opts out two-phase locking and yield the locks after a WUOW
+ * has been released. restoreWriteUnitOfWork reaquires the locks and resume the two-phase
+ * locking behavior of WUOW.
+ */
+ virtual bool releaseWriteUnitOfWork(LockSnapshot* stateOut) = 0;
+ virtual void restoreWriteUnitOfWork(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) = 0;
+
+ /**
* Releases the ticket associated with the Locker. This allows locks to be held without
* contributing to reader/writer throttling.
*/
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 37abca2c99e..332e410ff91 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -111,9 +111,9 @@ public:
MONGO_UNREACHABLE;
}
- virtual void beginWriteUnitOfWork() {}
+ virtual void beginWriteUnitOfWork() override {}
- virtual void endWriteUnitOfWork() {}
+ virtual void endWriteUnitOfWork() override {}
virtual bool inAWriteUnitOfWork() const {
return false;
@@ -187,6 +187,15 @@ public:
MONGO_UNREACHABLE;
}
+ bool releaseWriteUnitOfWork(LockSnapshot* stateOut) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void restoreWriteUnitOfWork(OperationContext* opCtx,
+ const LockSnapshot& stateToRestore) override {
+ MONGO_UNREACHABLE;
+ };
+
virtual void releaseTicket() {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 05947daa982..cc9726c29b5 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -529,6 +529,12 @@ TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool
}
_locker->unsetThreadId();
+ // On secondaries, we yield the locks for transactions.
+ if (!opCtx->writesAreReplicated()) {
+ _lockSnapshot = std::make_unique<Locker::LockSnapshot>();
+ _locker->releaseWriteUnitOfWork(_lockSnapshot.get());
+ }
+
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
@@ -553,25 +559,35 @@ TransactionParticipant::TxnResources::~TxnResources() {
// when starting a new transaction before completing an old one. So we should
// be at WUOW nesting level 1 (only the top level WriteUnitOfWork).
_recoveryUnit->abortUnitOfWork();
- _locker->endWriteUnitOfWork();
+ // If locks are not yielded, release them.
+ if (!_lockSnapshot) {
+ _locker->endWriteUnitOfWork();
+ }
invariant(!_locker->inAWriteUnitOfWork());
}
}
void TransactionParticipant::TxnResources::release(OperationContext* opCtx) {
// Perform operations that can fail the release before marking the TxnResources as released.
+
+ // Restore locks if they are yielded.
+ if (_lockSnapshot) {
+ invariant(!_locker->isLocked());
+ // opCtx is passed in to enable the restoration to be interrupted.
+ _locker->restoreWriteUnitOfWork(opCtx, *_lockSnapshot);
+ _lockSnapshot.reset(nullptr);
+ }
_locker->reacquireTicket(opCtx);
invariant(!_released);
_released = true;
- // We intentionally do not capture the return value of swapLockState(), which is just an empty
- // locker. At the end of the operation, if the transaction is not complete, we will stash the
- // operation context's locker and replace it with a new empty locker.
-
// It is necessary to lock the client to change the Locker on the OperationContext.
stdx::lock_guard<Client> lk(*opCtx->getClient());
invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive);
+ // We intentionally do not capture the return value of swapLockState(), which is just an empty
+ // locker. At the end of the operation, if the transaction is not complete, we will stash the
+ // operation context's locker and replace it with a new empty locker.
opCtx->swapLockState(std::move(_locker));
opCtx->lockState()->updateThreadIdToCurrentThread();
@@ -701,6 +717,11 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
: SpeculativeTransactionOpTime::kLastApplied);
}
+ // All locks of transactions must be acquired inside the global WUOW so that we can
+ // yield and restore all locks on state transition. Otherwise, we'd have to remember
+ // which locks are managed by WUOW.
+ invariant(!opCtx->lockState()->isLocked());
+
// Stashed transaction resources do not exist for this in-progress multi-document
// transaction. Set up the transaction resources on the opCtx.
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index e7c69a69c76..85b0b0e3611 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -113,6 +113,7 @@ public:
private:
bool _released = false;
std::unique_ptr<Locker> _locker;
+ std::unique_ptr<Locker::LockSnapshot> _lockSnapshot;
std::unique_ptr<RecoveryUnit> _recoveryUnit;
repl::ReadConcernArgs _readConcernArgs;
WriteUnitOfWork::RecoveryUnitState _ruState;