diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 155 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 6 |
4 files changed, 115 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 9289b22ed28..b363dc0ec40 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1196,8 +1196,9 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, StmtId stmtId = 0; writeConflictRetry( opCtx, "logOplogEntriesForTransaction", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); @@ -1289,8 +1290,9 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, writeConflictRetry( opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); const auto oplogOpTime = logOperation(opCtx, @@ -1513,8 +1515,9 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, writeConflictRetry( opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); @@ -1544,8 +1547,9 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, writeConflictRetry( opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); WriteUnitOfWork wuow(opCtx); // It is possible that the transaction resulted in no changes, In that case, we diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 5ef706c3f36..c660fcd30bf 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -758,6 +758,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none); { + Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), slot); @@ -848,12 +849,15 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { opCtx()->lockState()->unsetMaxLockTimeout(); txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); - opObserver().onPreparedTransactionCommit( - opCtx(), - commitSlot, - prepareTimestamp, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + prepareTimestamp, + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } repl::OplogInterfaceLocal oplogInterface(opCtx()); auto oplogIter = oplogInterface.makeIterator(); { @@ -916,7 +920,10 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { // Mimic aborting the transaction. opCtx()->setWriteUnitOfWork(nullptr); opCtx()->lockState()->unsetMaxLockTimeout(); - opObserver().onTransactionAbort(opCtx(), abortSlot); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onTransactionAbort(opCtx(), abortSlot); + } txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); repl::OplogInterfaceLocal oplogInterface(opCtx()); @@ -984,6 +991,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); { + Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), slot); @@ -1009,6 +1017,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) repl::OpTime prepareOpTime; { + Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), slot); @@ -1043,6 +1052,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction OplogSlot abortSlot; { + Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); @@ -1055,9 +1065,11 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction // Mimic aborting the transaction. opCtx()->setWriteUnitOfWork(nullptr); opCtx()->lockState()->unsetMaxLockTimeout(); - opObserver().onTransactionAbort(opCtx(), abortSlot); - txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); - + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onTransactionAbort(opCtx(), abortSlot); + txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); + } txnParticipant.stashTransactionResources(opCtx()); // Abort the storage-transaction without calling the OpObserver. @@ -1111,6 +1123,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti repl::OpTime prepareOpTime; { + Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); prepareOpTime = slot; @@ -1129,12 +1142,14 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti opCtx()->lockState()->unsetMaxLockTimeout(); txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); - opObserver().onPreparedTransactionCommit( - opCtx(), - commitSlot, - prepareOpTime.getTimestamp(), - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + prepareOpTime.getTimestamp(), + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted); } @@ -1631,12 +1646,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 1); - prepareOpTime = reservedSlots.back(); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 1); + prepareOpTime = reservedSlots.back(); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); auto prepareEntryObj = oplogEntryObjs.back(); const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj)); @@ -1676,12 +1693,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); - prepareOpTime = reservedSlots.back(); - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 5); StmtId expectedStmtId = 0; std::vector<OplogEntry> oplogEntries; @@ -1789,12 +1809,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { opObserver().onUpdate(opCtx(), update2); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); - prepareOpTime = reservedSlots.back(); - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); StmtId expectedStmtId = 0; @@ -1876,13 +1899,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); - prepareOpTime = reservedSlots.back(); - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - prepareOpTime = reservedSlots.back(); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + prepareOpTime = reservedSlots.back(); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); StmtId expectedStmtId = 0; @@ -1953,13 +1979,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); - prepareOpTime = reservedSlots.back(); - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); @@ -1996,12 +2025,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); - opObserver().onPreparedTransactionCommit( - opCtx(), - commitSlot, - commitTimestamp, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + commitTimestamp, + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } oplogEntryObjs = getNOplogEntries(opCtx(), 4); const auto commitOplogObj = oplogEntryObjs.back(); // Statement id's for the inserts and prepare should be [0,1] and 2 respectively. @@ -2036,13 +2067,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); - prepareOpTime = reservedSlots.back(); - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); @@ -2069,7 +2103,10 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { // Mimic aborting the transaction by resetting the WUOW. opCtx()->setWriteUnitOfWork(nullptr); opCtx()->lockState()->unsetMaxLockTimeout(); - opObserver().onTransactionAbort(opCtx(), abortSlot); + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + opObserver().onTransactionAbort(opCtx(), abortSlot); + } txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); oplogEntryObjs = getNOplogEntries(opCtx(), 3); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 26b4e81bad0..0593fee0897 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -620,7 +620,7 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx, int numSlotsToReserve) - : _opCtx(opCtx) { + : _opCtx(opCtx), _globalLock(opCtx, MODE_IX) { // Stash the transaction on the OperationContext on the stack. At the end of this function it // will be unstashed onto the OperationContext. TransactionParticipant::SideTransactionBlock sideTxn(opCtx); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 3dd3e27c5b6..9b466083ec8 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -34,6 +34,7 @@ #include <map> #include "mongo/db/commands/txn_cmds_gen.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" @@ -854,6 +855,11 @@ private: private: OperationContext* _opCtx; + // We must hold a global lock in IX mode for the lifetime of the recovery unit. + // The global lock is also used to protect oplog writes. The lock acquisition must be + // before reserving oplogSlots to avoid deadlocks involving the callers of + // waitForAllEarlierOplogWritesToBeVisible(). + Lock::GlobalLock _globalLock; std::unique_ptr<RecoveryUnit> _recoveryUnit; std::vector<OplogSlot> _oplogSlots; }; |