summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/op_observer_impl.cpp20
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp155
-rw-r--r--src/mongo/db/transaction_participant.cpp2
-rw-r--r--src/mongo/db/transaction_participant.h6
4 files changed, 115 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 9289b22ed28..b363dc0ec40 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1196,8 +1196,9 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
StmtId stmtId = 0;
writeConflictRetry(
opCtx, "logOplogEntriesForTransaction", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
@@ -1289,8 +1290,9 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
writeConflictRetry(
opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
const auto oplogOpTime = logOperation(opCtx,
@@ -1513,8 +1515,9 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
writeConflictRetry(
opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -1544,8 +1547,9 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
writeConflictRetry(
opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
WriteUnitOfWork wuow(opCtx);
// It is possible that the transaction resulted in no changes, In that case, we
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 5ef706c3f36..c660fcd30bf 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -758,6 +758,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, boost::none);
{
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
@@ -848,12 +849,15 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
opCtx()->lockState()->unsetMaxLockTimeout();
txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
- opObserver().onPreparedTransactionCommit(
- opCtx(),
- commitSlot,
- prepareTimestamp,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
repl::OplogInterfaceLocal oplogInterface(opCtx());
auto oplogIter = oplogInterface.makeIterator();
{
@@ -916,7 +920,10 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
// Mimic aborting the transaction.
opCtx()->setWriteUnitOfWork(nullptr);
opCtx()->lockState()->unsetMaxLockTimeout();
- opObserver().onTransactionAbort(opCtx(), abortSlot);
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ }
txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
repl::OplogInterfaceLocal oplogInterface(opCtx());
@@ -984,6 +991,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) {
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
{
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
@@ -1009,6 +1017,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable)
repl::OpTime prepareOpTime;
{
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
@@ -1043,6 +1052,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
OplogSlot abortSlot;
{
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp());
@@ -1055,9 +1065,11 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
// Mimic aborting the transaction.
opCtx()->setWriteUnitOfWork(nullptr);
opCtx()->lockState()->unsetMaxLockTimeout();
- opObserver().onTransactionAbort(opCtx(), abortSlot);
- txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
-
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+ }
txnParticipant.stashTransactionResources(opCtx());
// Abort the storage-transaction without calling the OpObserver.
@@ -1111,6 +1123,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
repl::OpTime prepareOpTime;
{
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
prepareOpTime = slot;
@@ -1129,12 +1142,14 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
opCtx()->lockState()->unsetMaxLockTimeout();
txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
- opObserver().onPreparedTransactionCommit(
- opCtx(),
- commitSlot,
- prepareOpTime.getTimestamp(),
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
-
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareOpTime.getTimestamp(),
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted);
}
@@ -1631,12 +1646,14 @@ TEST_F(OpObserverMultiEntryTransactionTest,
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 1);
- prepareOpTime = reservedSlots.back();
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
-
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 1);
+ prepareOpTime = reservedSlots.back();
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
auto prepareEntryObj = oplogEntryObjs.back();
const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj));
@@ -1676,12 +1693,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
- prepareOpTime = reservedSlots.back();
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 5);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
@@ -1789,12 +1809,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
opObserver().onUpdate(opCtx(), update2);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
- prepareOpTime = reservedSlots.back();
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
StmtId expectedStmtId = 0;
@@ -1876,13 +1899,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
- prepareOpTime = reservedSlots.back();
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- prepareOpTime = reservedSlots.back();
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareOpTime = reservedSlots.back();
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
StmtId expectedStmtId = 0;
@@ -1953,13 +1979,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
- prepareOpTime = reservedSlots.back();
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
@@ -1996,12 +2025,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
- opObserver().onPreparedTransactionCommit(
- opCtx(),
- commitSlot,
- commitTimestamp,
- txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
-
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
oplogEntryObjs = getNOplogEntries(opCtx(), 4);
const auto commitOplogObj = oplogEntryObjs.back();
// Statement id's for the inserts and prepare should be [0,1] and 2 respectively.
@@ -2036,13 +2067,16 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
- prepareOpTime = reservedSlots.back();
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
@@ -2069,7 +2103,10 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
// Mimic aborting the transaction by resetting the WUOW.
opCtx()->setWriteUnitOfWork(nullptr);
opCtx()->lockState()->unsetMaxLockTimeout();
- opObserver().onTransactionAbort(opCtx(), abortSlot);
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ }
txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
oplogEntryObjs = getNOplogEntries(opCtx(), 3);
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 26b4e81bad0..0593fee0897 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -620,7 +620,7 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam
TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx,
int numSlotsToReserve)
- : _opCtx(opCtx) {
+ : _opCtx(opCtx), _globalLock(opCtx, MODE_IX) {
// Stash the transaction on the OperationContext on the stack. At the end of this function it
// will be unstashed onto the OperationContext.
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 3dd3e27c5b6..9b466083ec8 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -34,6 +34,7 @@
#include <map>
#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/multi_key_path_tracker.h"
@@ -854,6 +855,11 @@ private:
private:
OperationContext* _opCtx;
+ // We must hold a global lock in IX mode for the lifetime of the recovery unit.
+ // The global lock is also used to protect oplog writes. The lock acquisition must be
+ // before reserving oplogSlots to avoid deadlocks involving the callers of
+ // waitForAllEarlierOplogWritesToBeVisible().
+ Lock::GlobalLock _globalLock;
std::unique_ptr<RecoveryUnit> _recoveryUnit;
std::vector<OplogSlot> _oplogSlots;
};