summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/auth/auth_op_observer.h2
-rw-r--r--src/mongo/db/catalog/uuid_catalog.h2
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h2
-rw-r--r--src/mongo/db/op_observer.h5
-rw-r--r--src/mongo/db/op_observer_impl.cpp103
-rw-r--r--src/mongo/db/op_observer_impl.h4
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp177
-rw-r--r--src/mongo/db/op_observer_noop.h2
-rw-r--r--src/mongo/db/op_observer_registry.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp15
-rw-r--r--src/mongo/db/repl/oplog.h17
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp2
-rw-r--r--src/mongo/db/repl/oplog_entry.h1
-rw-r--r--src/mongo/db/repl/oplog_shim.cpp2
-rw-r--r--src/mongo/db/s/config_server_op_observer.h2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h2
-rw-r--r--src/mongo/db/transaction_participant.cpp27
-rw-r--r--src/mongo/db/transaction_participant.h19
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp4
-rw-r--r--src/mongo/db/transaction_participant_test.cpp6
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp91
21 files changed, 144 insertions, 345 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index a31e09cdfe9..a8afc4baefc 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -166,7 +166,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h
index d302e04354f..89dc3bac15a 100644
--- a/src/mongo/db/catalog/uuid_catalog.h
+++ b/src/mongo/db/catalog/uuid_catalog.h
@@ -160,7 +160,7 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 0a2ca1ca1e7..9aec975e8ec 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -167,7 +167,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index dadbe435cc5..a6664937df6 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -309,13 +309,12 @@ public:
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
* called when a transaction is active.
*
- * 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction. The
- * last reserved slot represents the prepareOpTime used for the prepare oplog entry.
+ * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry.
*
* The 'statements' are the list of CRUD operations to be applied in this transaction.
*/
virtual void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) = 0;
/**
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 88ce25e15fb..568d70a504e 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1087,15 +1087,11 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx,
return times;
}
-// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If
-// there are more oplog slots than statements, then only the first n slots are used, where n is the
-// size of 'stmts'.
void logOplogEntriesForTransaction(OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const std::vector<OplogSlot>& oplogSlots) {
invariant(!stmts.empty());
- invariant(stmts.size() <= oplogSlots.size());
-
+ invariant(stmts.size() == oplogSlots.size());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
@@ -1250,7 +1246,6 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1);
auto commitSlot = oplogSlots.back();
oplogSlots.pop_back();
- invariant(oplogSlots.size() == statements.size());
logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
commitOpTime = logCommitForUnpreparedTransaction(
opCtx, statements.size() /* stmtId */, oplogSlots.back().opTime, commitSlot);
@@ -1280,48 +1275,9 @@ void OpObserverImpl::onPreparedTransactionCommit(
shardObserveTransactionCommit(opCtx, statements, commitOplogEntryOpTime.opTime, true);
}
-repl::OpTime logPrepareTransaction(OperationContext* opCtx,
- StmtId stmtId,
- const repl::OpTime& prevOpTime,
- const OplogSlot oplogSlot) {
- const NamespaceString cmdNss{"admin", "$cmd"};
-
- const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
-
- OperationSessionInfo sessionInfo;
- repl::OplogLink oplogLink;
- PrepareTransactionOplogObject cmdObj;
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = prevOpTime;
-
- const auto oplogOpTime = logOperation(opCtx,
- "c",
- cmdNss,
- {} /* uuid */,
- cmdObj.toBSON(),
- nullptr /* o2 */,
- false /* fromMigrate */,
- wallClockTime,
- sessionInfo,
- stmtId,
- oplogLink,
- false /* prepare */,
- false /* inTxn */,
- oplogSlot);
- invariant(oplogSlot.opTime == oplogOpTime);
-
- onWriteOpCompleted(
- opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared);
-
- return oplogSlot.opTime;
-}
-
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) {
- invariant(!reservedSlots.empty());
- const auto prepareOpTime = reservedSlots.back();
invariant(opCtx->getTxnNumber());
invariant(!prepareOpTime.opTime.isNull());
@@ -1335,45 +1291,30 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
// transaction.
// We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
// and allow this transaction to be continued on failover.
- TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+ {
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
- writeConflictRetry(
- opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ writeConflictRetry(
+ opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock.
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
- WriteUnitOfWork wuow(opCtx);
- logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
- wuow.commit();
- });
+ WriteUnitOfWork wuow(opCtx);
+ logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
+ wuow.commit();
+ });
+ }
} else {
- // We should have reserved an extra oplog slot for the prepare oplog entry.
- invariant(reservedSlots.size() == statements.size() + 1);
- TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
-
- // prevOpTime is a null OpTime if there were no operations written other than the prepare
- // oplog entry.
- repl::OpTime prevOpTime;
-
- writeConflictRetry(
- opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
-
- WriteUnitOfWork wuow(opCtx);
- // It is possible that the transaction resulted in no changes, In that case, we
- // should not write any operations other than the prepare oplog entry.
- if (!statements.empty()) {
- logOplogEntriesForTransaction(opCtx, statements, reservedSlots);
-
- // The prevOpTime is the OpTime of the second last entry in the reserved slots.
- prevOpTime = reservedSlots.rbegin()[1].opTime;
- }
- logPrepareTransaction(
- opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime);
- wuow.commit();
- });
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write any operations other than the prepare oplog entry.
+ if (!statements.empty()) {
+ // Reserve all the optimes in advance, so we only need to get the opTime mutex once.
+ // TODO (SERVER-39441): Reserve an extra slot here for the prepare oplog entry.
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+ auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
+ logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
+ }
}
}
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 2b0f55c9c58..86b47b05b31 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -145,8 +145,8 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final;
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>& statements) final;
+ const OplogSlot& prepareOpTime,
+ std::vector<repl::ReplOperation>& statments) final;
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index ef26aa3d60f..549bceec71b 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -746,7 +746,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -821,7 +821,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime);
prepareTimestamp = prepareSlot.opTime.getTimestamp();
opObserver().onTransactionPrepare(
- opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
commitSlot = repl::getNextOpTime(opCtx());
}
@@ -892,7 +892,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
const auto prepareSlot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime);
opObserver().onTransactionPrepare(
- opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -972,7 +972,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) {
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -997,7 +997,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable)
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
prepareOpTime = slot.opTime;
opObserver().onTransactionPrepare(
- opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
}
@@ -1030,7 +1030,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
OplogSlot slot = repl::getNextOpTime(opCtx());
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -1099,7 +1099,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
prepareOpTime = slot.opTime;
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
}
@@ -1557,24 +1557,19 @@ TEST_F(OpObserverMultiEntryTransactionTest,
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 1);
- prepareOpTime = reservedSlots.back().opTime;
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
-
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
- auto prepareEntryObj = oplogEntryObjs.back();
- const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj));
- checkSessionAndTransactionFields(prepareEntryObj, 0);
-
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject());
- txnParticipant.stashTransactionResources(opCtx());
- assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
- txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
+ prepareOpTime = slot.opTime;
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
+ getNOplogEntries(opCtx(), 0);
- ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ // TODO (SERVER-39441): Make sure the prepare oplog entry is written and the durable transaction
+ // state is kPrepared.
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
@@ -1585,6 +1580,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
@@ -1598,31 +1594,26 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
- repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
- prepareOpTime = reservedSlots.back().opTime;
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 5);
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
+ // TODO (SERVER-39441): Account for prepare oplog entry.
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 4) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_TRUE(oplogEntry.getInTxn());
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1649,14 +1640,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject());
ASSERT_FALSE(oplogEntries[3].getObject2());
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject());
- ASSERT_FALSE(oplogEntries[4].getObject2());
-
- ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
- txnParticipant.stashTransactionResources(opCtx());
- assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
- txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+ // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
+ // the durable transaction state is kPrepared.
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
@@ -1685,37 +1670,33 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
updateArgs2.criteria = BSON("_id" << 1);
OplogUpdateEntryArgs update2(std::move(updateArgs2), nss2, uuid2);
+ WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
- repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
- prepareOpTime = reservedSlots.back().opTime;
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ // TODO (SERVER-39441): Account for prepare oplog entry.
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate);
+ ASSERT_TRUE(oplogEntry.getInTxn());
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1738,15 +1719,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
ASSERT_TRUE(oplogEntries[1].getObject2());
ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1));
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
-
- ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
-
- txnParticipant.stashTransactionResources(opCtx());
- assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
- txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+ // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
+ // the durable transaction state is kPrepared.
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
@@ -1758,6 +1732,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "delete");
+ WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().aboutToDelete(opCtx(),
@@ -1771,33 +1746,27 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
- repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
- prepareOpTime = reservedSlots.back().opTime;
- txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- prepareOpTime = reservedSlots.back().opTime;
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ {
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = repl::getNextOpTime(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ // TODO (SERVER-39441): Account for prepare oplog entry.
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT_TRUE(oplogEntry.getInTxn());
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1814,14 +1783,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1));
ASSERT_FALSE(oplogEntries[1].getObject2());
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
-
- ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
- txnParticipant.stashTransactionResources(opCtx());
- assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
- txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+ // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
+ // the durable transaction state is kPrepared.
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 6c72a04aa25..b4290305eca 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -144,7 +144,7 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override{};
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override{};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index d1c0713ab7d..946ee8893e9 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -277,11 +277,11 @@ public:
}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
- observer->onTransactionPrepare(opCtx, reservedSlots, statements);
+ observer->onTransactionPrepare(opCtx, prepareOpTime, statements);
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b588d7f0adc..b383b1b0565 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -819,6 +819,17 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet);
}
+MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot {
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
+ invariant(oplog);
+ OplogSlot os;
+ _getNextOpTimes(opCtx, oplog, 1, &os);
+ return os;
+}
+
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
invariant(oplog);
@@ -828,8 +839,7 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
return os;
}
-MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes)
-(OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> {
+std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) {
// The local oplog collection pointer must already be established by this point.
// We can't establish it here because that would require locking the local database, which would
// be a lock order violation.
@@ -841,6 +851,7 @@ MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes)
return oplogSlots;
}
+
// -------------------------------------
namespace {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 86be44cc30d..d04dab93a78 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -267,21 +267,15 @@ void createIndexForApplyOps(OperationContext* opCtx,
// workaround.
struct GetNextOpTimeClass {
/**
- * Allocates optimes for new entries in the oplog. Returns a vector of OplogSlots, which
- * contain the new optimes along with their terms and newly calculated hash fields.
+ * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of
+ * OplogSlots, which contain the new optimes along with their terms and newly calculated hash
+ * fields.
*/
- static MONGO_DECLARE_SHIM((OperationContext * opCtx, std::size_t count)->std::vector<OplogSlot>)
- getNextOpTimes;
-};
-
-inline std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) {
- return GetNextOpTimeClass::getNextOpTimes(opCtx, count);
+ static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime;
};
inline OplogSlot getNextOpTime(OperationContext* opCtx) {
- auto slots = getNextOpTimes(opCtx, 1);
- invariant(slots.size() == 1);
- return slots.back();
+ return GetNextOpTimeClass::getNextOpTime(opCtx);
}
/**
@@ -296,6 +290,7 @@ inline OplogSlot getNextOpTime(OperationContext* opCtx) {
* prepare generates an oplog entry in a separate unit of work.
*/
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx);
+std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 9b97b178976..41c6df4eee3 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -77,8 +77,6 @@ OplogEntry::CommandType parseCommandType(const BSONObj& objectField) {
return OplogEntry::CommandType::kCommitTransaction;
} else if (commandString == "abortTransaction") {
return OplogEntry::CommandType::kAbortTransaction;
- } else if (commandString == "prepareTransaction") {
- return OplogEntry::CommandType::kPrepareTransaction;
} else {
uasserted(ErrorCodes::BadValue,
str::stream() << "Unknown oplog entry command type: " << commandString
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index de984061e6b..c385e813552 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -62,7 +62,6 @@ public:
kDropIndexes,
kCommitTransaction,
kAbortTransaction,
- kPrepareTransaction,
};
// Current oplog version, should be the value of the v field in all oplog entries.
diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp
index f6efbe31683..24840a3aae9 100644
--- a/src/mongo/db/repl/oplog_shim.cpp
+++ b/src/mongo/db/repl/oplog_shim.cpp
@@ -31,6 +31,6 @@
namespace mongo {
namespace repl {
-MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTimes);
+MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 08e1dfd0cf4..2ab3ea5e89b 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -167,7 +167,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index e39089dbb32..8703db1dfce 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -168,7 +168,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 4cf82057024..1f2301e3765 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -551,8 +551,7 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam
o(lk).transactionMetricsObserver.onChooseReadTimestamp(timestamp);
}
-TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx,
- int numSlotsToReserve)
+TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx)
: _opCtx(opCtx) {
// Stash the transaction on the OperationContext on the stack. At the end of this function it
// will be unstashed onto the OperationContext.
@@ -560,7 +559,7 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o
// Begin a new WUOW and reserve a slot in the oplog.
WriteUnitOfWork wuow(opCtx);
- _oplogSlots = repl::getNextOpTimes(opCtx, numSlotsToReserve);
+ _oplogSlot = repl::getNextOpTime(opCtx);
// Release the WUOW state since this WUOW is no longer in use.
wuow.release();
@@ -937,13 +936,12 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
opCtx->checkForInterrupt();
o(lk).txnState.transitionTo(TransactionState::kPrepared);
}
- std::vector<OplogSlot> reservedSlots;
+
if (prepareOptime) {
// On secondary, we just prepare the transaction and discard the buffered ops.
prepareOplogSlot = OplogSlot(*prepareOptime, 0);
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).prepareOpTime = *prepareOptime;
- reservedSlots.push_back(prepareOplogSlot);
} else {
// On primary, we reserve an optime, prepare the transaction and write the oplog entry.
//
@@ -952,16 +950,8 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
// being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the
// storage-transaction it uses to keep the hole open will abort and the slot (and
// corresponding oplog hole) will vanish.
- if (!gUseMultipleOplogEntryFormatForTransactions) {
- oplogSlotReserver.emplace(opCtx);
- } else {
- const auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx).size();
- // Reserve an extra slot here for the prepare oplog entry.
- oplogSlotReserver.emplace(opCtx, numSlotsToReserve + 1);
- invariant(oplogSlotReserver->getSlots().size() >= 1);
- }
- prepareOplogSlot = oplogSlotReserver->getLastSlot();
- reservedSlots = oplogSlotReserver->getSlots();
+ oplogSlotReserver.emplace(opCtx);
+ prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot();
invariant(o().prepareOpTime.isNull(),
str::stream() << "This transaction has already reserved a prepareOpTime at: "
<< o().prepareOpTime.toString());
@@ -981,8 +971,9 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
}
opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp());
opCtx->getWriteUnitOfWork()->prepare();
+
opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(
- opCtx, reservedSlots, retrieveCompletedTransactionOperations(opCtx));
+ opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx));
abortGuard.dismiss();
@@ -1158,7 +1149,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
if (opCtx->writesAreReplicated()) {
invariant(!commitOplogEntryOpTime);
oplogSlotReserver.emplace(opCtx);
- commitOplogSlot = oplogSlotReserver->getLastSlot();
+ commitOplogSlot = oplogSlotReserver->getReservedOplogSlot();
invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp,
str::stream() << "Commit oplog entry must be greater than or equal to commit "
"timestamp due to causal consistency. commit timestamp: "
@@ -1343,7 +1334,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
boost::optional<OplogSlot> abortOplogSlot;
if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) {
oplogSlotReserver.emplace(opCtx);
- abortOplogSlot = oplogSlotReserver->getLastSlot();
+ abortOplogSlot = oplogSlotReserver->getReservedOplogSlot();
}
// Clean up the transaction resources on the opCtx even if the transaction resources on the
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 6d05089b284..a3673ef544d 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -796,29 +796,22 @@ private:
*/
class OplogSlotReserver {
public:
- OplogSlotReserver(OperationContext* opCtx, int numSlotsToReserve = 1);
+ OplogSlotReserver(OperationContext* opCtx);
~OplogSlotReserver();
/**
- * Returns the latest oplog slot reserved at construction.
+ * Returns the oplog slot reserved at construction.
*/
- OplogSlot getLastSlot() {
- invariant(!_oplogSlots.empty());
- invariant(!_oplogSlots.back().opTime.isNull());
- return getSlots().back();
- }
-
- std::vector<OplogSlot>& getSlots() {
- invariant(!_oplogSlots.empty());
- invariant(!_oplogSlots.back().opTime.isNull());
- return _oplogSlots;
+ OplogSlot getReservedOplogSlot() const {
+ invariant(!_oplogSlot.opTime.isNull());
+ return _oplogSlot;
}
private:
OperationContext* _opCtx;
std::unique_ptr<Locker> _locker;
std::unique_ptr<RecoveryUnit> _recoveryUnit;
- std::vector<OplogSlot> _oplogSlots;
+ OplogSlot _oplogSlot;
};
friend std::ostream& operator<<(std::ostream& s, TransactionState txnState) {
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 6f51d4ddf7c..b78c62518a5 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -87,10 +87,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements);
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index bd92dd240a9..02f9cd938d8 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -93,7 +93,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) override;
bool onTransactionPrepareThrowsException = false;
@@ -136,10 +136,10 @@ public:
};
void OpObserverMock::onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
+ const OplogSlot& prepareOpTime,
std::vector<repl::ReplOperation>& statements) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements);
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 9cdf8834649..d68a567f1e5 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2902,96 +2902,6 @@ protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs;
};
-class PreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
-public:
- PreparedMultiOplogEntryTransaction()
- : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
- gUseMultipleOplogEntryFormatForTransactions = true;
- const auto currentTime = _clock->getClusterTime();
- firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
- secondOplogEntryTs = currentTime.addTicks(2).asTimestamp();
- prepareEntryTs = currentTime.addTicks(3).asTimestamp();
- commitEntryTs = currentTime.addTicks(4).asTimestamp();
- }
-
- ~PreparedMultiOplogEntryTransaction() {
- gUseMultipleOplogEntryFormatForTransactions = false;
- }
-
- void run() {
- auto txnParticipant = TransactionParticipant::get(_opCtx);
- ASSERT(txnParticipant);
- unittest::log() << "PrepareTS: " << prepareEntryTs;
- logTimestamps();
-
- const auto prepareFilter = BSON("ts" << prepareEntryTs);
- const auto commitFilter = BSON("ts" << commitEntryTs);
- {
- AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS);
- auto coll = autoColl.getCollection();
- assertDocumentAtTimestamp(coll, presentTs, BSONObj());
- assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
- assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj());
-
- assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, firstOplogEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, secondOplogEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, false);
-
- assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false);
- }
- txnParticipant.unstashTransactionResources(_opCtx, "insert");
- const BSONObj doc2 = BSON("_id" << 2 << "TestValue" << 2);
- {
- AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX, LockMode::MODE_IX);
- insertDocument(autoColl.getCollection(), InsertStatement(doc2));
- }
- txnParticipant.prepareTransaction(_opCtx, {});
-
- txnParticipant.stashTransactionResources(_opCtx);
- {
- AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS);
- auto coll = autoColl.getCollection();
- const BSONObj query1 = BSON("_id" << 1);
- const BSONObj query2 = BSON("_id" << 2);
- assertDocumentAtTimestamp(coll, presentTs, BSONObj());
- assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
- assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, nullTs, BSONObj());
-
- assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true);
- assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
-
- // We haven't committed the prepared transaction
- assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false);
- }
-
- // TODO (SERVER-39442): Commit the prepared transaction and assert existence of oplogs at
- // commitTimestamp.
- }
-
-protected:
- Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs;
-};
-
class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest {
public:
PreparedMultiDocumentTransaction()
@@ -3235,7 +3145,6 @@ public:
add<CreateCollectionWithSystemIndex>();
add<MultiDocumentTransaction>();
add<MultiOplogEntryTransaction>();
- add<PreparedMultiOplogEntryTransaction>();
add<PreparedMultiDocumentTransaction>();
add<AbortedPreparedMultiDocumentTransaction>();
}