From a76788e89bf54abacccefeba62d4e3775f20c555 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Thu, 7 Mar 2019 19:01:36 -0500 Subject: Revert "SERVER-39441 Write the new 'prepareTransaction' command on primary" This reverts commit 86c1120002b6f28183f024f373ecc58123624a46. --- src/mongo/db/auth/auth_op_observer.h | 2 +- src/mongo/db/catalog/uuid_catalog.h | 2 +- src/mongo/db/free_mon/free_mon_op_observer.h | 2 +- src/mongo/db/op_observer.h | 5 +- src/mongo/db/op_observer_impl.cpp | 103 +++--------- src/mongo/db/op_observer_impl.h | 4 +- src/mongo/db/op_observer_impl_test.cpp | 177 ++++++++------------- src/mongo/db/op_observer_noop.h | 2 +- src/mongo/db/op_observer_registry.h | 4 +- src/mongo/db/repl/oplog.cpp | 15 +- src/mongo/db/repl/oplog.h | 17 +- src/mongo/db/repl/oplog_entry.cpp | 2 - src/mongo/db/repl/oplog_entry.h | 1 - src/mongo/db/repl/oplog_shim.cpp | 2 +- src/mongo/db/s/config_server_op_observer.h | 2 +- src/mongo/db/s/shard_server_op_observer.h | 2 +- src/mongo/db/transaction_participant.cpp | 27 ++-- src/mongo/db/transaction_participant.h | 19 +-- ...ansaction_participant_retryable_writes_test.cpp | 4 +- src/mongo/db/transaction_participant_test.cpp | 6 +- src/mongo/dbtests/storage_timestamp_tests.cpp | 91 ----------- 21 files changed, 144 insertions(+), 345 deletions(-) (limited to 'src') diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index a31e09cdfe9..a8afc4baefc 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -166,7 +166,7 @@ public: const std::vector& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) final {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h index d302e04354f..89dc3bac15a 100644 --- a/src/mongo/db/catalog/uuid_catalog.h +++ b/src/mongo/db/catalog/uuid_catalog.h @@ -160,7 +160,7 @@ public: Timestamp commitTimestamp, const std::vector& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 0a2ca1ca1e7..9aec975e8ec 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -167,7 +167,7 @@ public: const std::vector& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) final {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index dadbe435cc5..a6664937df6 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -309,13 +309,12 @@ public: * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be * called when a transaction is active. * - * 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction. The - * last reserved slot represents the prepareOpTime used for the prepare oplog entry. + * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry. * * The 'statements' are the list of CRUD operations to be applied in this transaction. */ virtual void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) = 0; /** diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 88ce25e15fb..568d70a504e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1087,15 +1087,11 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx, return times; } -// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If -// there are more oplog slots than statements, then only the first n slots are used, where n is the -// size of 'stmts'. void logOplogEntriesForTransaction(OperationContext* opCtx, const std::vector& stmts, const std::vector& oplogSlots) { invariant(!stmts.empty()); - invariant(stmts.size() <= oplogSlots.size()); - + invariant(stmts.size() == oplogSlots.size()); OperationSessionInfo sessionInfo; sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); @@ -1250,7 +1246,6 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1); auto commitSlot = oplogSlots.back(); oplogSlots.pop_back(); - invariant(oplogSlots.size() == statements.size()); logOplogEntriesForTransaction(opCtx, statements, oplogSlots); commitOpTime = logCommitForUnpreparedTransaction( opCtx, statements.size() /* stmtId */, oplogSlots.back().opTime, commitSlot); @@ -1280,48 +1275,9 @@ void OpObserverImpl::onPreparedTransactionCommit( shardObserveTransactionCommit(opCtx, statements, commitOplogEntryOpTime.opTime, true); } -repl::OpTime logPrepareTransaction(OperationContext* opCtx, - StmtId stmtId, - const repl::OpTime& prevOpTime, - const OplogSlot oplogSlot) { - const NamespaceString cmdNss{"admin", "$cmd"}; - - const auto wallClockTime = getWallClockTimeForOpLog(opCtx); - - OperationSessionInfo sessionInfo; - repl::OplogLink oplogLink; - PrepareTransactionOplogObject cmdObj; - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = prevOpTime; - - const auto oplogOpTime = logOperation(opCtx, - "c", - cmdNss, - {} /* uuid */, - cmdObj.toBSON(), - nullptr /* o2 */, - false /* fromMigrate */, - wallClockTime, - sessionInfo, - stmtId, - oplogLink, - false /* prepare */, - false /* inTxn */, - oplogSlot); - invariant(oplogSlot.opTime == oplogOpTime); - - onWriteOpCompleted( - opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared); - - return oplogSlot.opTime; -} - void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) { - invariant(!reservedSlots.empty()); - const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); invariant(!prepareOpTime.opTime.isNull()); @@ -1335,45 +1291,30 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, // transaction. // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp // and allow this transaction to be continued on failover. - TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + { + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry( + opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. + Lock::GlobalLock globalLock(opCtx, MODE_IX); - WriteUnitOfWork wuow(opCtx); - logApplyOpsForTransaction(opCtx, statements, prepareOpTime); - wuow.commit(); - }); + WriteUnitOfWork wuow(opCtx); + logApplyOpsForTransaction(opCtx, statements, prepareOpTime); + wuow.commit(); + }); + } } else { - // We should have reserved an extra oplog slot for the prepare oplog entry. - invariant(reservedSlots.size() == statements.size() + 1); - TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - - // prevOpTime is a null OpTime if there were no operations written other than the prepare - // oplog entry. - repl::OpTime prevOpTime; - - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); - - WriteUnitOfWork wuow(opCtx); - // It is possible that the transaction resulted in no changes, In that case, we - // should not write any operations other than the prepare oplog entry. - if (!statements.empty()) { - logOplogEntriesForTransaction(opCtx, statements, reservedSlots); - - // The prevOpTime is the OpTime of the second last entry in the reserved slots. - prevOpTime = reservedSlots.rbegin()[1].opTime; - } - logPrepareTransaction( - opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime); - wuow.commit(); - }); + // It is possible that the transaction resulted in no changes. In that case, we should + // not write any operations other than the prepare oplog entry. + if (!statements.empty()) { + // Reserve all the optimes in advance, so we only need to get the opTime mutex once. + // TODO (SERVER-39441): Reserve an extra slot here for the prepare oplog entry. + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); + logOplogEntriesForTransaction(opCtx, statements, oplogSlots); + } } } diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 2b0f55c9c58..86b47b05b31 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -145,8 +145,8 @@ public: Timestamp commitTimestamp, const std::vector& statements) noexcept final; void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, - std::vector& statements) final; + const OplogSlot& prepareOpTime, + std::vector& statments) final; void onTransactionAbort(OperationContext* opCtx, boost::optional abortOplogEntryOpTime) final; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index ef26aa3d60f..549bceec71b 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -746,7 +746,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -821,7 +821,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime); prepareTimestamp = prepareSlot.opTime.getTimestamp(); opObserver().onTransactionPrepare( - opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); commitSlot = repl::getNextOpTime(opCtx()); } @@ -892,7 +892,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime); opObserver().onTransactionPrepare( - opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); abortSlot = repl::getNextOpTime(opCtx()); } @@ -972,7 +972,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -997,7 +997,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); prepareOpTime = slot.opTime; opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } @@ -1030,7 +1030,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction OplogSlot slot = repl::getNextOpTime(opCtx()); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); abortSlot = repl::getNextOpTime(opCtx()); } @@ -1099,7 +1099,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti prepareOpTime = slot.opTime; opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); } @@ -1557,24 +1557,19 @@ TEST_F(OpObserverMultiEntryTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 1); - prepareOpTime = reservedSlots.back().opTime; - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - - auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); - auto prepareEntryObj = oplogEntryObjs.back(); - const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj)); - checkSessionAndTransactionFields(prepareEntryObj, 0); - - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject()); - txnParticipant.stashTransactionResources(opCtx()); - assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); - txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); + prepareOpTime = slot.opTime; + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } + getNOplogEntries(opCtx(), 0); - ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + // TODO (SERVER-39441): Make sure the prepare oplog entry is written and the durable transaction + // state is kPrepared. } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { @@ -1585,6 +1580,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); + WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); @@ -1598,31 +1594,26 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); - prepareOpTime = reservedSlots.back().opTime; - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 5); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } + // TODO (SERVER-39441): Account for prepare oplog entry. + auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 4) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_TRUE(oplogEntry.getInTxn()); ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1649,14 +1640,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject()); ASSERT_FALSE(oplogEntries[3].getObject2()); - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject()); - ASSERT_FALSE(oplogEntries[4].getObject2()); - - ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); - txnParticipant.stashTransactionResources(opCtx()); - assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); - txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and + // the durable transaction state is kPrepared. } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { @@ -1685,37 +1670,33 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { updateArgs2.criteria = BSON("_id" << 1); OplogUpdateEntryArgs update2(std::move(updateArgs2), nss2, uuid2); + WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); - prepareOpTime = reservedSlots.back().opTime; - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } - auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + // TODO (SERVER-39441): Account for prepare oplog entry. + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate); + ASSERT_TRUE(oplogEntry.getInTxn()); ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1738,15 +1719,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { ASSERT_TRUE(oplogEntries[1].getObject2()); ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1)); - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); - - ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); - - txnParticipant.stashTransactionResources(opCtx()); - assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); - txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and + // the durable transaction state is kPrepared. } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { @@ -1758,6 +1732,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "delete"); + WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().aboutToDelete(opCtx(), @@ -1771,33 +1746,27 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); - prepareOpTime = reservedSlots.back().opTime; - txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - prepareOpTime = reservedSlots.back().opTime; - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + { + WriteUnitOfWork wuow(opCtx()); + OplogSlot slot = repl::getNextOpTime(opCtx()); + txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + } - auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + // TODO (SERVER-39441): Account for prepare oplog entry. + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete); + ASSERT_TRUE(oplogEntry.getInTxn()); ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1814,14 +1783,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1)); ASSERT_FALSE(oplogEntries[1].getObject2()); - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); - - ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); - txnParticipant.stashTransactionResources(opCtx()); - assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); - txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and + // the durable transaction state is kPrepared. } } // namespace } // namespace mongo diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 6c72a04aa25..b4290305eca 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -144,7 +144,7 @@ public: Timestamp commitTimestamp, const std::vector& statements) noexcept override{}; void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override{}; void onTransactionAbort(OperationContext* opCtx, boost::optional abortOplogEntryOpTime) override{}; diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index d1c0713ab7d..946ee8893e9 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -277,11 +277,11 @@ public: } void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare(opCtx, reservedSlots, statements); + observer->onTransactionPrepare(opCtx, prepareOpTime, statements); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b588d7f0adc..b383b1b0565 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -819,6 +819,17 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet); } +MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot { + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; + invariant(oplog); + OplogSlot os; + _getNextOpTimes(opCtx, oplog, 1, &os); + return os; +} + OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; invariant(oplog); @@ -828,8 +839,7 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { return os; } -MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes) -(OperationContext* opCtx, std::size_t count)->std::vector { +std::vector getNextOpTimes(OperationContext* opCtx, std::size_t count) { // The local oplog collection pointer must already be established by this point. // We can't establish it here because that would require locking the local database, which would // be a lock order violation. @@ -841,6 +851,7 @@ MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes) return oplogSlots; } + // ------------------------------------- namespace { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 86be44cc30d..d04dab93a78 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -267,21 +267,15 @@ void createIndexForApplyOps(OperationContext* opCtx, // workaround. struct GetNextOpTimeClass { /** - * Allocates optimes for new entries in the oplog. Returns a vector of OplogSlots, which - * contain the new optimes along with their terms and newly calculated hash fields. + * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of + * OplogSlots, which contain the new optimes along with their terms and newly calculated hash + * fields. */ - static MONGO_DECLARE_SHIM((OperationContext * opCtx, std::size_t count)->std::vector) - getNextOpTimes; -}; - -inline std::vector getNextOpTimes(OperationContext* opCtx, std::size_t count) { - return GetNextOpTimeClass::getNextOpTimes(opCtx, count); + static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime; }; inline OplogSlot getNextOpTime(OperationContext* opCtx) { - auto slots = getNextOpTimes(opCtx, 1); - invariant(slots.size() == 1); - return slots.back(); + return GetNextOpTimeClass::getNextOpTime(opCtx); } /** @@ -296,6 +290,7 @@ inline OplogSlot getNextOpTime(OperationContext* opCtx) { * prepare generates an oplog entry in a separate unit of work. */ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx); +std::vector getNextOpTimes(OperationContext* opCtx, std::size_t count); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 9b97b178976..41c6df4eee3 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -77,8 +77,6 @@ OplogEntry::CommandType parseCommandType(const BSONObj& objectField) { return OplogEntry::CommandType::kCommitTransaction; } else if (commandString == "abortTransaction") { return OplogEntry::CommandType::kAbortTransaction; - } else if (commandString == "prepareTransaction") { - return OplogEntry::CommandType::kPrepareTransaction; } else { uasserted(ErrorCodes::BadValue, str::stream() << "Unknown oplog entry command type: " << commandString diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index de984061e6b..c385e813552 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -62,7 +62,6 @@ public: kDropIndexes, kCommitTransaction, kAbortTransaction, - kPrepareTransaction, }; // Current oplog version, should be the value of the v field in all oplog entries. diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp index f6efbe31683..24840a3aae9 100644 --- a/src/mongo/db/repl/oplog_shim.cpp +++ b/src/mongo/db/repl/oplog_shim.cpp @@ -31,6 +31,6 @@ namespace mongo { namespace repl { -MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTimes); +MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 08e1dfd0cf4..2ab3ea5e89b 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -167,7 +167,7 @@ public: const std::vector& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index e39089dbb32..8703db1dfce 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -168,7 +168,7 @@ public: const std::vector& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 4cf82057024..1f2301e3765 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -551,8 +551,7 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam o(lk).transactionMetricsObserver.onChooseReadTimestamp(timestamp); } -TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx, - int numSlotsToReserve) +TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) : _opCtx(opCtx) { // Stash the transaction on the OperationContext on the stack. At the end of this function it // will be unstashed onto the OperationContext. @@ -560,7 +559,7 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o // Begin a new WUOW and reserve a slot in the oplog. WriteUnitOfWork wuow(opCtx); - _oplogSlots = repl::getNextOpTimes(opCtx, numSlotsToReserve); + _oplogSlot = repl::getNextOpTime(opCtx); // Release the WUOW state since this WUOW is no longer in use. wuow.release(); @@ -937,13 +936,12 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( opCtx->checkForInterrupt(); o(lk).txnState.transitionTo(TransactionState::kPrepared); } - std::vector reservedSlots; + if (prepareOptime) { // On secondary, we just prepare the transaction and discard the buffered ops. prepareOplogSlot = OplogSlot(*prepareOptime, 0); stdx::lock_guard lk(*opCtx->getClient()); o(lk).prepareOpTime = *prepareOptime; - reservedSlots.push_back(prepareOplogSlot); } else { // On primary, we reserve an optime, prepare the transaction and write the oplog entry. // @@ -952,16 +950,8 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( // being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the // storage-transaction it uses to keep the hole open will abort and the slot (and // corresponding oplog hole) will vanish. - if (!gUseMultipleOplogEntryFormatForTransactions) { - oplogSlotReserver.emplace(opCtx); - } else { - const auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx).size(); - // Reserve an extra slot here for the prepare oplog entry. - oplogSlotReserver.emplace(opCtx, numSlotsToReserve + 1); - invariant(oplogSlotReserver->getSlots().size() >= 1); - } - prepareOplogSlot = oplogSlotReserver->getLastSlot(); - reservedSlots = oplogSlotReserver->getSlots(); + oplogSlotReserver.emplace(opCtx); + prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot(); invariant(o().prepareOpTime.isNull(), str::stream() << "This transaction has already reserved a prepareOpTime at: " << o().prepareOpTime.toString()); @@ -981,8 +971,9 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( } opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); + opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( - opCtx, reservedSlots, retrieveCompletedTransactionOperations(opCtx)); + opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx)); abortGuard.dismiss(); @@ -1158,7 +1149,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction( if (opCtx->writesAreReplicated()) { invariant(!commitOplogEntryOpTime); oplogSlotReserver.emplace(opCtx); - commitOplogSlot = oplogSlotReserver->getLastSlot(); + commitOplogSlot = oplogSlotReserver->getReservedOplogSlot(); invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp, str::stream() << "Commit oplog entry must be greater than or equal to commit " "timestamp due to causal consistency. commit timestamp: " @@ -1343,7 +1334,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction( boost::optional abortOplogSlot; if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) { oplogSlotReserver.emplace(opCtx); - abortOplogSlot = oplogSlotReserver->getLastSlot(); + abortOplogSlot = oplogSlotReserver->getReservedOplogSlot(); } // Clean up the transaction resources on the opCtx even if the transaction resources on the diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 6d05089b284..a3673ef544d 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -796,29 +796,22 @@ private: */ class OplogSlotReserver { public: - OplogSlotReserver(OperationContext* opCtx, int numSlotsToReserve = 1); + OplogSlotReserver(OperationContext* opCtx); ~OplogSlotReserver(); /** - * Returns the latest oplog slot reserved at construction. + * Returns the oplog slot reserved at construction. */ - OplogSlot getLastSlot() { - invariant(!_oplogSlots.empty()); - invariant(!_oplogSlots.back().opTime.isNull()); - return getSlots().back(); - } - - std::vector& getSlots() { - invariant(!_oplogSlots.empty()); - invariant(!_oplogSlots.back().opTime.isNull()); - return _oplogSlots; + OplogSlot getReservedOplogSlot() const { + invariant(!_oplogSlot.opTime.isNull()); + return _oplogSlot; } private: OperationContext* _opCtx; std::unique_ptr _locker; std::unique_ptr _recoveryUnit; - std::vector _oplogSlots; + OplogSlot _oplogSlot; }; friend std::ostream& operator<<(std::ostream& s, TransactionState txnState) { diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 6f51d4ddf7c..b78c62518a5 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -87,10 +87,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index bd92dd240a9..02f9cd938d8 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -93,7 +93,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) override; bool onTransactionPrepareThrowsException = false; @@ -136,10 +136,10 @@ public: }; void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, - const std::vector& reservedSlots, + const OplogSlot& prepareOpTime, std::vector& statements) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 9cdf8834649..d68a567f1e5 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2902,96 +2902,6 @@ protected: Timestamp firstOplogEntryTs, secondOplogEntryTs; }; -class PreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { -public: - PreparedMultiOplogEntryTransaction() - : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") { - gUseMultipleOplogEntryFormatForTransactions = true; - const auto currentTime = _clock->getClusterTime(); - firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); - secondOplogEntryTs = currentTime.addTicks(2).asTimestamp(); - prepareEntryTs = currentTime.addTicks(3).asTimestamp(); - commitEntryTs = currentTime.addTicks(4).asTimestamp(); - } - - ~PreparedMultiOplogEntryTransaction() { - gUseMultipleOplogEntryFormatForTransactions = false; - } - - void run() { - auto txnParticipant = TransactionParticipant::get(_opCtx); - ASSERT(txnParticipant); - unittest::log() << "PrepareTS: " << prepareEntryTs; - logTimestamps(); - - const auto prepareFilter = BSON("ts" << prepareEntryTs); - const auto commitFilter = BSON("ts" << commitEntryTs); - { - AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS); - auto coll = autoColl.getCollection(); - assertDocumentAtTimestamp(coll, presentTs, BSONObj()); - assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj()); - assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj()); - - assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, firstOplogEntryTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, secondOplogEntryTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, false); - - assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false); - } - txnParticipant.unstashTransactionResources(_opCtx, "insert"); - const BSONObj doc2 = BSON("_id" << 2 << "TestValue" << 2); - { - AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX, LockMode::MODE_IX); - insertDocument(autoColl.getCollection(), InsertStatement(doc2)); - } - txnParticipant.prepareTransaction(_opCtx, {}); - - txnParticipant.stashTransactionResources(_opCtx); - { - AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS); - auto coll = autoColl.getCollection(); - const BSONObj query1 = BSON("_id" << 1); - const BSONObj query2 = BSON("_id" << 2); - assertDocumentAtTimestamp(coll, presentTs, BSONObj()); - assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj()); - assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, nullTs, BSONObj()); - - assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false); - assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true); - assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true); - assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true); - - // We haven't committed the prepared transaction - assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false); - } - - // TODO (SERVER-39442): Commit the prepared transaction and assert existence of oplogs at - // commitTimestamp. - } - -protected: - Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs; -}; - class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest { public: PreparedMultiDocumentTransaction() @@ -3235,7 +3145,6 @@ public: add(); add(); add(); - add(); add(); add(); } -- cgit v1.2.1