summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-03-07 14:55:20 -0500
committerJason Chan <jason.chan@10gen.com>2019-03-07 14:58:33 -0500
commit86c1120002b6f28183f024f373ecc58123624a46 (patch)
treeeadb58739759d50b000cbc2299851a2de63079c7 /src/mongo
parent9245a51842f1f8b9da42cadf27627642c4d94fd4 (diff)
downloadmongo-86c1120002b6f28183f024f373ecc58123624a46.tar.gz
SERVER-39441 Write the new 'prepareTransaction' command on primary
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h2
-rw-r--r--src/mongo/db/catalog/uuid_catalog.h2
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h2
-rw-r--r--src/mongo/db/op_observer.h5
-rw-r--r--src/mongo/db/op_observer_impl.cpp103
-rw-r--r--src/mongo/db/op_observer_impl.h4
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp177
-rw-r--r--src/mongo/db/op_observer_noop.h2
-rw-r--r--src/mongo/db/op_observer_registry.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp15
-rw-r--r--src/mongo/db/repl/oplog.h17
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp2
-rw-r--r--src/mongo/db/repl/oplog_entry.h1
-rw-r--r--src/mongo/db/repl/oplog_shim.cpp2
-rw-r--r--src/mongo/db/s/config_server_op_observer.h2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h2
-rw-r--r--src/mongo/db/transaction_participant.cpp27
-rw-r--r--src/mongo/db/transaction_participant.h19
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp4
-rw-r--r--src/mongo/db/transaction_participant_test.cpp6
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp91
21 files changed, 345 insertions, 144 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index a8afc4baefc..a31e09cdfe9 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -166,7 +166,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h
index 89dc3bac15a..d302e04354f 100644
--- a/src/mongo/db/catalog/uuid_catalog.h
+++ b/src/mongo/db/catalog/uuid_catalog.h
@@ -160,7 +160,7 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 9aec975e8ec..0a2ca1ca1e7 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -167,7 +167,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept final {}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) final {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index a6664937df6..dadbe435cc5 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -309,12 +309,13 @@ public:
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
* called when a transaction is active.
*
- * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry.
+ * 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction. The
+ * last reserved slot represents the prepareOpTime used for the prepare oplog entry.
*
* The 'statements' are the list of CRUD operations to be applied in this transaction.
*/
virtual void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) = 0;
/**
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 568d70a504e..88ce25e15fb 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1087,11 +1087,15 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx,
return times;
}
+// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If
+// there are more oplog slots than statements, then only the first n slots are used, where n is the
+// size of 'stmts'.
void logOplogEntriesForTransaction(OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const std::vector<OplogSlot>& oplogSlots) {
invariant(!stmts.empty());
- invariant(stmts.size() == oplogSlots.size());
+ invariant(stmts.size() <= oplogSlots.size());
+
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
@@ -1246,6 +1250,7 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1);
auto commitSlot = oplogSlots.back();
oplogSlots.pop_back();
+ invariant(oplogSlots.size() == statements.size());
logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
commitOpTime = logCommitForUnpreparedTransaction(
opCtx, statements.size() /* stmtId */, oplogSlots.back().opTime, commitSlot);
@@ -1275,9 +1280,48 @@ void OpObserverImpl::onPreparedTransactionCommit(
shardObserveTransactionCommit(opCtx, statements, commitOplogEntryOpTime.opTime, true);
}
+repl::OpTime logPrepareTransaction(OperationContext* opCtx,
+ StmtId stmtId,
+ const repl::OpTime& prevOpTime,
+ const OplogSlot oplogSlot) {
+ const NamespaceString cmdNss{"admin", "$cmd"};
+
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+
+ OperationSessionInfo sessionInfo;
+ repl::OplogLink oplogLink;
+ PrepareTransactionOplogObject cmdObj;
+ sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
+ sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = prevOpTime;
+
+ const auto oplogOpTime = logOperation(opCtx,
+ "c",
+ cmdNss,
+ {} /* uuid */,
+ cmdObj.toBSON(),
+ nullptr /* o2 */,
+ false /* fromMigrate */,
+ wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink,
+ false /* prepare */,
+ false /* inTxn */,
+ oplogSlot);
+ invariant(oplogSlot.opTime == oplogOpTime);
+
+ onWriteOpCompleted(
+ opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared);
+
+ return oplogSlot.opTime;
+}
+
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) {
+ invariant(!reservedSlots.empty());
+ const auto prepareOpTime = reservedSlots.back();
invariant(opCtx->getTxnNumber());
invariant(!prepareOpTime.opTime.isNull());
@@ -1291,30 +1335,45 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
// transaction.
// We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
// and allow this transaction to be continued on failover.
- {
- TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
- writeConflictRetry(
- opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ writeConflictRetry(
+ opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock.
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ // Writes to the oplog only require a Global intent lock.
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
- WriteUnitOfWork wuow(opCtx);
- logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
- wuow.commit();
- });
- }
+ WriteUnitOfWork wuow(opCtx);
+ logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
+ wuow.commit();
+ });
} else {
- // It is possible that the transaction resulted in no changes. In that case, we should
- // not write any operations other than the prepare oplog entry.
- if (!statements.empty()) {
- // Reserve all the optimes in advance, so we only need to get the opTime mutex once.
- // TODO (SERVER-39441): Reserve an extra slot here for the prepare oplog entry.
- TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
- auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
- logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
- }
+ // We should have reserved an extra oplog slot for the prepare oplog entry.
+ invariant(reservedSlots.size() == statements.size() + 1);
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+
+ // prevOpTime is a null OpTime if there were no operations written other than the prepare
+ // oplog entry.
+ repl::OpTime prevOpTime;
+
+ writeConflictRetry(
+ opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ // Writes to the oplog only require a Global intent lock.
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
+
+ WriteUnitOfWork wuow(opCtx);
+ // It is possible that the transaction resulted in no changes, In that case, we
+ // should not write any operations other than the prepare oplog entry.
+ if (!statements.empty()) {
+ logOplogEntriesForTransaction(opCtx, statements, reservedSlots);
+
+ // The prevOpTime is the OpTime of the second last entry in the reserved slots.
+ prevOpTime = reservedSlots.rbegin()[1].opTime;
+ }
+ logPrepareTransaction(
+ opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime);
+ wuow.commit();
+ });
}
}
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 86b47b05b31..2b0f55c9c58 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -145,8 +145,8 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final;
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
- std::vector<repl::ReplOperation>& statments) final;
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>& statements) final;
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 549bceec71b..ef26aa3d60f 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -746,7 +746,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -821,7 +821,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime);
prepareTimestamp = prepareSlot.opTime.getTimestamp();
opObserver().onTransactionPrepare(
- opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
commitSlot = repl::getNextOpTime(opCtx());
}
@@ -892,7 +892,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
const auto prepareSlot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime);
opObserver().onTransactionPrepare(
- opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -972,7 +972,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) {
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -997,7 +997,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable)
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
prepareOpTime = slot.opTime;
opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
}
@@ -1030,7 +1030,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
OplogSlot slot = repl::getNextOpTime(opCtx());
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -1099,7 +1099,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
prepareOpTime = slot.opTime;
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
}
@@ -1557,19 +1557,24 @@ TEST_F(OpObserverMultiEntryTransactionTest,
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
repl::OpTime prepareOpTime;
- {
- WriteUnitOfWork wuow(opCtx());
- OplogSlot slot = repl::getNextOpTime(opCtx());
- txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
- prepareOpTime = slot.opTime;
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- }
- getNOplogEntries(opCtx(), 0);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 1);
+ prepareOpTime = reservedSlots.back().opTime;
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
+ auto prepareEntryObj = oplogEntryObjs.back();
+ const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj));
+ checkSessionAndTransactionFields(prepareEntryObj, 0);
- // TODO (SERVER-39441): Make sure the prepare oplog entry is written and the durable transaction
- // state is kPrepared.
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
@@ -1580,7 +1585,6 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
- WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
@@ -1594,26 +1598,31 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
- {
- WriteUnitOfWork wuow(opCtx());
- OplogSlot slot = repl::getNextOpTime(opCtx());
- txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- }
- // TODO (SERVER-39441): Account for prepare oplog entry.
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 5);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_TRUE(oplogEntry.getInTxn());
+ if (expectedStmtId++ < 4) {
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_TRUE(oplogEntry.getInTxn());
+ } else {
+ ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
+ ASSERT_TRUE(oplogEntry.isCommand());
+ ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
+ oplogEntry.getCommandType());
+ }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1640,8 +1649,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject());
ASSERT_FALSE(oplogEntries[3].getObject2());
- // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
- // the durable transaction state is kPrepared.
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject());
+ ASSERT_FALSE(oplogEntries[4].getObject2());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
@@ -1670,33 +1685,37 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
updateArgs2.criteria = BSON("_id" << 1);
OplogUpdateEntryArgs update2(std::move(updateArgs2), nss2, uuid2);
- WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
- {
- WriteUnitOfWork wuow(opCtx());
- OplogSlot slot = repl::getNextOpTime(opCtx());
- txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- }
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- // TODO (SERVER-39441): Account for prepare oplog entry.
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate);
- ASSERT_TRUE(oplogEntry.getInTxn());
+ if (expectedStmtId++ < 2) {
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate);
+ ASSERT_TRUE(oplogEntry.getInTxn());
+ } else {
+ ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
+ ASSERT_TRUE(oplogEntry.isCommand());
+ ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
+ oplogEntry.getCommandType());
+ }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1719,8 +1738,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
ASSERT_TRUE(oplogEntries[1].getObject2());
ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1));
- // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
- // the durable transaction state is kPrepared.
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
+ ASSERT_FALSE(oplogEntries[2].getObject2());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
@@ -1732,7 +1758,6 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "delete");
- WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().aboutToDelete(opCtx(),
@@ -1746,27 +1771,33 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
- {
- WriteUnitOfWork wuow(opCtx());
- OplogSlot slot = repl::getNextOpTime(opCtx());
- txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
- opObserver().onTransactionPrepare(
- opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- }
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareOpTime = reservedSlots.back().opTime;
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- // TODO (SERVER-39441): Account for prepare oplog entry.
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
for (const auto& oplogEntryObj : oplogEntryObjs) {
- checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++);
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete);
- ASSERT_TRUE(oplogEntry.getInTxn());
+ if (expectedStmtId++ < 2) {
+ ASSERT_TRUE(oplogEntry.isCrudOpType());
+ ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT_TRUE(oplogEntry.getInTxn());
+ } else {
+ ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
+ ASSERT_TRUE(oplogEntry.isCommand());
+ ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
+ oplogEntry.getCommandType());
+ }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
@@ -1783,8 +1814,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1));
ASSERT_FALSE(oplogEntries[1].getObject2());
- // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and
- // the durable transaction state is kPrepared.
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
+ ASSERT_FALSE(oplogEntries[2].getObject2());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index b4290305eca..6c72a04aa25 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -144,7 +144,7 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override{};
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override{};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 946ee8893e9..d1c0713ab7d 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -277,11 +277,11 @@ public:
}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
- observer->onTransactionPrepare(opCtx, prepareOpTime, statements);
+ observer->onTransactionPrepare(opCtx, reservedSlots, statements);
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 06dbcdc93a0..6838330d601 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -818,17 +818,6 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet);
}
-MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot {
- // The local oplog collection pointer must already be established by this point.
- // We can't establish it here because that would require locking the local database, which would
- // be a lock order violation.
- auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
- invariant(oplog);
- OplogSlot os;
- _getNextOpTimes(opCtx, oplog, 1, &os);
- return os;
-}
-
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog;
invariant(oplog);
@@ -838,7 +827,8 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) {
return os;
}
-std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) {
+MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes)
+(OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> {
// The local oplog collection pointer must already be established by this point.
// We can't establish it here because that would require locking the local database, which would
// be a lock order violation.
@@ -850,7 +840,6 @@ std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count
return oplogSlots;
}
-
// -------------------------------------
namespace {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index d04dab93a78..86be44cc30d 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -267,15 +267,21 @@ void createIndexForApplyOps(OperationContext* opCtx,
// workaround.
struct GetNextOpTimeClass {
/**
- * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of
- * OplogSlots, which contain the new optimes along with their terms and newly calculated hash
- * fields.
+ * Allocates optimes for new entries in the oplog. Returns a vector of OplogSlots, which
+ * contain the new optimes along with their terms and newly calculated hash fields.
*/
- static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime;
+ static MONGO_DECLARE_SHIM((OperationContext * opCtx, std::size_t count)->std::vector<OplogSlot>)
+ getNextOpTimes;
+};
+
+inline std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) {
+ return GetNextOpTimeClass::getNextOpTimes(opCtx, count);
};
inline OplogSlot getNextOpTime(OperationContext* opCtx) {
- return GetNextOpTimeClass::getNextOpTime(opCtx);
+ auto slots = getNextOpTimes(opCtx, 1);
+ invariant(slots.size() == 1);
+ return slots.back();
}
/**
@@ -290,7 +296,6 @@ inline OplogSlot getNextOpTime(OperationContext* opCtx) {
* prepare generates an oplog entry in a separate unit of work.
*/
OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx);
-std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 41c6df4eee3..9b97b178976 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -77,6 +77,8 @@ OplogEntry::CommandType parseCommandType(const BSONObj& objectField) {
return OplogEntry::CommandType::kCommitTransaction;
} else if (commandString == "abortTransaction") {
return OplogEntry::CommandType::kAbortTransaction;
+ } else if (commandString == "prepareTransaction") {
+ return OplogEntry::CommandType::kPrepareTransaction;
} else {
uasserted(ErrorCodes::BadValue,
str::stream() << "Unknown oplog entry command type: " << commandString
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index c385e813552..de984061e6b 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -62,6 +62,7 @@ public:
kDropIndexes,
kCommitTransaction,
kAbortTransaction,
+ kPrepareTransaction,
};
// Current oplog version, should be the value of the v field in all oplog entries.
diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp
index 24840a3aae9..f6efbe31683 100644
--- a/src/mongo/db/repl/oplog_shim.cpp
+++ b/src/mongo/db/repl/oplog_shim.cpp
@@ -31,6 +31,6 @@
namespace mongo {
namespace repl {
-MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime);
+MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTimes);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 2ab3ea5e89b..08e1dfd0cf4 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -167,7 +167,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 8703db1dfce..e39089dbb32 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -168,7 +168,7 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept override {}
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override {}
void onTransactionAbort(OperationContext* opCtx,
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 1f2301e3765..4cf82057024 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -551,7 +551,8 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam
o(lk).transactionMetricsObserver.onChooseReadTimestamp(timestamp);
}
-TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx)
+TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx,
+ int numSlotsToReserve)
: _opCtx(opCtx) {
// Stash the transaction on the OperationContext on the stack. At the end of this function it
// will be unstashed onto the OperationContext.
@@ -559,7 +560,7 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o
// Begin a new WUOW and reserve a slot in the oplog.
WriteUnitOfWork wuow(opCtx);
- _oplogSlot = repl::getNextOpTime(opCtx);
+ _oplogSlots = repl::getNextOpTimes(opCtx, numSlotsToReserve);
// Release the WUOW state since this WUOW is no longer in use.
wuow.release();
@@ -936,12 +937,13 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
opCtx->checkForInterrupt();
o(lk).txnState.transitionTo(TransactionState::kPrepared);
}
-
+ std::vector<OplogSlot> reservedSlots;
if (prepareOptime) {
// On secondary, we just prepare the transaction and discard the buffered ops.
prepareOplogSlot = OplogSlot(*prepareOptime, 0);
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).prepareOpTime = *prepareOptime;
+ reservedSlots.push_back(prepareOplogSlot);
} else {
// On primary, we reserve an optime, prepare the transaction and write the oplog entry.
//
@@ -950,8 +952,16 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
// being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the
// storage-transaction it uses to keep the hole open will abort and the slot (and
// corresponding oplog hole) will vanish.
- oplogSlotReserver.emplace(opCtx);
- prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot();
+ if (!gUseMultipleOplogEntryFormatForTransactions) {
+ oplogSlotReserver.emplace(opCtx);
+ } else {
+ const auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx).size();
+ // Reserve an extra slot here for the prepare oplog entry.
+ oplogSlotReserver.emplace(opCtx, numSlotsToReserve + 1);
+ invariant(oplogSlotReserver->getSlots().size() >= 1);
+ }
+ prepareOplogSlot = oplogSlotReserver->getLastSlot();
+ reservedSlots = oplogSlotReserver->getSlots();
invariant(o().prepareOpTime.isNull(),
str::stream() << "This transaction has already reserved a prepareOpTime at: "
<< o().prepareOpTime.toString());
@@ -971,9 +981,8 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
}
opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp());
opCtx->getWriteUnitOfWork()->prepare();
-
opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(
- opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx));
+ opCtx, reservedSlots, retrieveCompletedTransactionOperations(opCtx));
abortGuard.dismiss();
@@ -1149,7 +1158,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
if (opCtx->writesAreReplicated()) {
invariant(!commitOplogEntryOpTime);
oplogSlotReserver.emplace(opCtx);
- commitOplogSlot = oplogSlotReserver->getReservedOplogSlot();
+ commitOplogSlot = oplogSlotReserver->getLastSlot();
invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp,
str::stream() << "Commit oplog entry must be greater than or equal to commit "
"timestamp due to causal consistency. commit timestamp: "
@@ -1334,7 +1343,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
boost::optional<OplogSlot> abortOplogSlot;
if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) {
oplogSlotReserver.emplace(opCtx);
- abortOplogSlot = oplogSlotReserver->getReservedOplogSlot();
+ abortOplogSlot = oplogSlotReserver->getLastSlot();
}
// Clean up the transaction resources on the opCtx even if the transaction resources on the
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index a3673ef544d..6d05089b284 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -796,22 +796,29 @@ private:
*/
class OplogSlotReserver {
public:
- OplogSlotReserver(OperationContext* opCtx);
+ OplogSlotReserver(OperationContext* opCtx, int numSlotsToReserve = 1);
~OplogSlotReserver();
/**
- * Returns the oplog slot reserved at construction.
+ * Returns the latest oplog slot reserved at construction.
*/
- OplogSlot getReservedOplogSlot() const {
- invariant(!_oplogSlot.opTime.isNull());
- return _oplogSlot;
+ OplogSlot getLastSlot() {
+ invariant(!_oplogSlots.empty());
+ invariant(!_oplogSlots.back().opTime.isNull());
+ return getSlots().back();
+ }
+
+ std::vector<OplogSlot>& getSlots() {
+ invariant(!_oplogSlots.empty());
+ invariant(!_oplogSlots.back().opTime.isNull());
+ return _oplogSlots;
}
private:
OperationContext* _opCtx;
std::unique_ptr<Locker> _locker;
std::unique_ptr<RecoveryUnit> _recoveryUnit;
- OplogSlot _oplogSlot;
+ std::vector<OplogSlot> _oplogSlots;
};
friend std::ostream& operator<<(std::ostream& s, TransactionState txnState) {
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index b78c62518a5..6f51d4ddf7c 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -87,10 +87,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
+ OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 02f9cd938d8..bd92dd240a9 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -93,7 +93,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
void onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) override;
bool onTransactionPrepareThrowsException = false;
@@ -136,10 +136,10 @@ public:
};
void OpObserverMock::onTransactionPrepare(OperationContext* opCtx,
- const OplogSlot& prepareOpTime,
+ const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>& statements) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements);
+ OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index d68a567f1e5..9cdf8834649 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2902,6 +2902,96 @@ protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs;
};
+class PreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
+public:
+ PreparedMultiOplogEntryTransaction()
+ : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ const auto currentTime = _clock->getClusterTime();
+ firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
+ secondOplogEntryTs = currentTime.addTicks(2).asTimestamp();
+ prepareEntryTs = currentTime.addTicks(3).asTimestamp();
+ commitEntryTs = currentTime.addTicks(4).asTimestamp();
+ }
+
+ ~PreparedMultiOplogEntryTransaction() {
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ }
+
+ void run() {
+ auto txnParticipant = TransactionParticipant::get(_opCtx);
+ ASSERT(txnParticipant);
+ unittest::log() << "PrepareTS: " << prepareEntryTs;
+ logTimestamps();
+
+ const auto prepareFilter = BSON("ts" << prepareEntryTs);
+ const auto commitFilter = BSON("ts" << commitEntryTs);
+ {
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS);
+ auto coll = autoColl.getCollection();
+ assertDocumentAtTimestamp(coll, presentTs, BSONObj());
+ assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
+ assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj());
+
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, secondOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, false);
+
+ assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false);
+ }
+ txnParticipant.unstashTransactionResources(_opCtx, "insert");
+ const BSONObj doc2 = BSON("_id" << 2 << "TestValue" << 2);
+ {
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX, LockMode::MODE_IX);
+ insertDocument(autoColl.getCollection(), InsertStatement(doc2));
+ }
+ txnParticipant.prepareTransaction(_opCtx, {});
+
+ txnParticipant.stashTransactionResources(_opCtx);
+ {
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS);
+ auto coll = autoColl.getCollection();
+ const BSONObj query1 = BSON("_id" << 1);
+ const BSONObj query2 = BSON("_id" << 2);
+ assertDocumentAtTimestamp(coll, presentTs, BSONObj());
+ assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
+ assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, nullTs, BSONObj());
+
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
+
+ // We haven't committed the prepared transaction
+ assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false);
+ }
+
+ // TODO (SERVER-39442): Commit the prepared transaction and assert existence of oplogs at
+ // commitTimestamp.
+ }
+
+protected:
+ Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs;
+};
+
class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest {
public:
PreparedMultiDocumentTransaction()
@@ -3145,6 +3235,7 @@ public:
add<CreateCollectionWithSystemIndex>();
add<MultiDocumentTransaction>();
add<MultiOplogEntryTransaction>();
+ add<PreparedMultiOplogEntryTransaction>();
add<PreparedMultiDocumentTransaction>();
add<AbortedPreparedMultiDocumentTransaction>();
}