diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/uuid_catalog.h | 2 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 5 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 177 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_shim.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 19 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 6 |
20 files changed, 254 insertions, 144 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index a8afc4baefc..a31e09cdfe9 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -166,7 +166,7 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) final {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/catalog/uuid_catalog.h b/src/mongo/db/catalog/uuid_catalog.h index 89dc3bac15a..d302e04354f 100644 --- a/src/mongo/db/catalog/uuid_catalog.h +++ b/src/mongo/db/catalog/uuid_catalog.h @@ -160,7 +160,7 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 9aec975e8ec..0a2ca1ca1e7 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -167,7 +167,7 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept final {} void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) final {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index a6664937df6..dadbe435cc5 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -309,12 +309,13 @@ public: * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be * called when a transaction is active. * - * The 'prepareOpTime' is passed in to be used as the OpTime of the oplog entry. + * 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction. The + * last reserved slot represents the prepareOpTime used for the prepare oplog entry. * * The 'statements' are the list of CRUD operations to be applied in this transaction. */ virtual void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) = 0; /** diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 568d70a504e..88ce25e15fb 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1087,11 +1087,15 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx, return times; } +// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If +// there are more oplog slots than statements, then only the first n slots are used, where n is the +// size of 'stmts'. void logOplogEntriesForTransaction(OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const std::vector<OplogSlot>& oplogSlots) { invariant(!stmts.empty()); - invariant(stmts.size() == oplogSlots.size()); + invariant(stmts.size() <= oplogSlots.size()); + OperationSessionInfo sessionInfo; sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); @@ -1246,6 +1250,7 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1); auto commitSlot = oplogSlots.back(); oplogSlots.pop_back(); + invariant(oplogSlots.size() == statements.size()); logOplogEntriesForTransaction(opCtx, statements, oplogSlots); commitOpTime = logCommitForUnpreparedTransaction( opCtx, statements.size() /* stmtId */, oplogSlots.back().opTime, commitSlot); @@ -1275,9 +1280,48 @@ void OpObserverImpl::onPreparedTransactionCommit( shardObserveTransactionCommit(opCtx, statements, commitOplogEntryOpTime.opTime, true); } +repl::OpTime logPrepareTransaction(OperationContext* opCtx, + StmtId stmtId, + const repl::OpTime& prevOpTime, + const OplogSlot oplogSlot) { + const NamespaceString cmdNss{"admin", "$cmd"}; + + const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + + OperationSessionInfo sessionInfo; + repl::OplogLink oplogLink; + PrepareTransactionOplogObject cmdObj; + sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); + sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = prevOpTime; + + const auto oplogOpTime = logOperation(opCtx, + "c", + cmdNss, + {} /* uuid */, + cmdObj.toBSON(), + nullptr /* o2 */, + false /* fromMigrate */, + wallClockTime, + sessionInfo, + stmtId, + oplogLink, + false /* prepare */, + false /* inTxn */, + oplogSlot); + invariant(oplogSlot.opTime == oplogOpTime); + + onWriteOpCompleted( + opCtx, cmdNss, {stmtId}, oplogOpTime, wallClockTime, DurableTxnStateEnum::kPrepared); + + return oplogSlot.opTime; +} + void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) { + invariant(!reservedSlots.empty()); + const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); invariant(!prepareOpTime.opTime.isNull()); @@ -1291,30 +1335,45 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, // transaction. // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp // and allow this transaction to be continued on failover. - { - TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry( + opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + // Writes to the oplog only require a Global intent lock. + Lock::GlobalLock globalLock(opCtx, MODE_IX); - WriteUnitOfWork wuow(opCtx); - logApplyOpsForTransaction(opCtx, statements, prepareOpTime); - wuow.commit(); - }); - } + WriteUnitOfWork wuow(opCtx); + logApplyOpsForTransaction(opCtx, statements, prepareOpTime); + wuow.commit(); + }); } else { - // It is possible that the transaction resulted in no changes. In that case, we should - // not write any operations other than the prepare oplog entry. - if (!statements.empty()) { - // Reserve all the optimes in advance, so we only need to get the opTime mutex once. - // TODO (SERVER-39441): Reserve an extra slot here for the prepare oplog entry. - TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); - logOplogEntriesForTransaction(opCtx, statements, oplogSlots); - } + // We should have reserved an extra oplog slot for the prepare oplog entry. + invariant(reservedSlots.size() == statements.size() + 1); + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + + // prevOpTime is a null OpTime if there were no operations written other than the prepare + // oplog entry. + repl::OpTime prevOpTime; + + writeConflictRetry( + opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { + // Writes to the oplog only require a Global intent lock. + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + WriteUnitOfWork wuow(opCtx); + // It is possible that the transaction resulted in no changes, In that case, we + // should not write any operations other than the prepare oplog entry. + if (!statements.empty()) { + logOplogEntriesForTransaction(opCtx, statements, reservedSlots); + + // The prevOpTime is the OpTime of the second last entry in the reserved slots. + prevOpTime = reservedSlots.rbegin()[1].opTime; + } + logPrepareTransaction( + opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime); + wuow.commit(); + }); } } diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 86b47b05b31..2b0f55c9c58 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -145,8 +145,8 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final; void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, - std::vector<repl::ReplOperation>& statments) final; + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>& statements) final; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 549bceec71b..ef26aa3d60f 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -746,7 +746,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -821,7 +821,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime); prepareTimestamp = prepareSlot.opTime.getTimestamp(); opObserver().onTransactionPrepare( - opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); commitSlot = repl::getNextOpTime(opCtx()); } @@ -892,7 +892,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot.opTime); opObserver().onTransactionPrepare( - opCtx(), prepareSlot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); abortSlot = repl::getNextOpTime(opCtx()); } @@ -972,7 +972,7 @@ TEST_F(OpObserverTransactionTest, PreparingEmptyTransactionLogsEmptyApplyOps) { txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -997,7 +997,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); prepareOpTime = slot.opTime; opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); } @@ -1030,7 +1030,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction OplogSlot slot = repl::getNextOpTime(opCtx()); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); abortSlot = repl::getNextOpTime(opCtx()); } @@ -1099,7 +1099,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti prepareOpTime = slot.opTime; opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); } @@ -1557,19 +1557,24 @@ TEST_F(OpObserverMultiEntryTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); repl::OpTime prepareOpTime; - { - WriteUnitOfWork wuow(opCtx()); - OplogSlot slot = repl::getNextOpTime(opCtx()); - txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); - prepareOpTime = slot.opTime; - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - } - getNOplogEntries(opCtx(), 0); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 1); + prepareOpTime = reservedSlots.back().opTime; + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); + auto prepareEntryObj = oplogEntryObjs.back(); + const auto prepareOplogEntry = assertGet(OplogEntry::parse(prepareEntryObj)); + checkSessionAndTransactionFields(prepareEntryObj, 0); - // TODO (SERVER-39441): Make sure the prepare oplog entry is written and the durable transaction - // state is kPrepared. + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), prepareOplogEntry.getObject()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { @@ -1580,7 +1585,6 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); @@ -1594,26 +1598,31 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - { - WriteUnitOfWork wuow(opCtx()); - OplogSlot slot = repl::getNextOpTime(opCtx()); - txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - } - // TODO (SERVER-39441): Account for prepare oplog entry. - auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 5); StmtId expectedStmtId = 0; std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_TRUE(oplogEntry.getInTxn()); + if (expectedStmtId++ < 4) { + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_TRUE(oplogEntry.getInTxn()); + } else { + ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); + ASSERT_TRUE(oplogEntry.isCommand()); + ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == + oplogEntry.getCommandType()); + } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1640,8 +1649,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject()); ASSERT_FALSE(oplogEntries[3].getObject2()); - // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and - // the durable transaction state is kPrepared. + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject()); + ASSERT_FALSE(oplogEntries[4].getObject2()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { @@ -1670,33 +1685,37 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { updateArgs2.criteria = BSON("_id" << 1); OplogUpdateEntryArgs update2(std::move(updateArgs2), nss2, uuid2); - WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - { - WriteUnitOfWork wuow(opCtx()); - OplogSlot slot = repl::getNextOpTime(opCtx()); - txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - } + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - // TODO (SERVER-39441): Account for prepare oplog entry. - auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); StmtId expectedStmtId = 0; std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate); - ASSERT_TRUE(oplogEntry.getInTxn()); + if (expectedStmtId++ < 2) { + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate); + ASSERT_TRUE(oplogEntry.getInTxn()); + } else { + ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); + ASSERT_TRUE(oplogEntry.isCommand()); + ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == + oplogEntry.getCommandType()); + } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1719,8 +1738,15 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { ASSERT_TRUE(oplogEntries[1].getObject2()); ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1)); - // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and - // the durable transaction state is kPrepared. + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); + ASSERT_FALSE(oplogEntries[2].getObject2()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { @@ -1732,7 +1758,6 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "delete"); - WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().aboutToDelete(opCtx(), @@ -1746,27 +1771,33 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - { - WriteUnitOfWork wuow(opCtx()); - OplogSlot slot = repl::getNextOpTime(opCtx()); - txnParticipant.transitionToPreparedforTest(opCtx(), slot.opTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), slot, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - } + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + prepareOpTime = reservedSlots.back().opTime; + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - // TODO (SERVER-39441): Account for prepare oplog entry. - auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); StmtId expectedStmtId = 0; std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; for (const auto& oplogEntryObj : oplogEntryObjs) { - checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId++); + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete); - ASSERT_TRUE(oplogEntry.getInTxn()); + if (expectedStmtId++ < 2) { + ASSERT_TRUE(oplogEntry.isCrudOpType()); + ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete); + ASSERT_TRUE(oplogEntry.getInTxn()); + } else { + ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); + ASSERT_TRUE(oplogEntry.isCommand()); + ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == + oplogEntry.getCommandType()); + } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); @@ -1783,8 +1814,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1)); ASSERT_FALSE(oplogEntries[1].getObject2()); - // TODO (SERVER-39441): Test that the last oplog entry has the "prepareTransaction" command and - // the durable transaction state is kPrepared. + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); + ASSERT_FALSE(oplogEntries[2].getObject2()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); } } // namespace } // namespace mongo diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index b4290305eca..6c72a04aa25 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -144,7 +144,7 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override{}; void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override{}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override{}; diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 946ee8893e9..d1c0713ab7d 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -277,11 +277,11 @@ public: } void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare(opCtx, prepareOpTime, statements); + observer->onTransactionPrepare(opCtx, reservedSlots, statements); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 06dbcdc93a0..6838330d601 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -818,17 +818,6 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, localOplogInfo(opCtx->getServiceContext()).oplogName, isReplSet); } -MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTime)(OperationContext* opCtx)->OplogSlot { - // The local oplog collection pointer must already be established by this point. - // We can't establish it here because that would require locking the local database, which would - // be a lock order violation. - auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; - invariant(oplog); - OplogSlot os; - _getNextOpTimes(opCtx, oplog, 1, &os); - return os; -} - OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { auto oplog = localOplogInfo(opCtx->getServiceContext()).oplog; invariant(oplog); @@ -838,7 +827,8 @@ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx) { return os; } -std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) { +MONGO_REGISTER_SHIM(GetNextOpTimeClass::getNextOpTimes) +(OperationContext* opCtx, std::size_t count)->std::vector<OplogSlot> { // The local oplog collection pointer must already be established by this point. // We can't establish it here because that would require locking the local database, which would // be a lock order violation. @@ -850,7 +840,6 @@ std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count return oplogSlots; } - // ------------------------------------- namespace { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index d04dab93a78..86be44cc30d 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -267,15 +267,21 @@ void createIndexForApplyOps(OperationContext* opCtx, // workaround. struct GetNextOpTimeClass { /** - * Allocates optimes for new entries in the oplog. Returns an OplogSlot or a vector of - * OplogSlots, which contain the new optimes along with their terms and newly calculated hash - * fields. + * Allocates optimes for new entries in the oplog. Returns a vector of OplogSlots, which + * contain the new optimes along with their terms and newly calculated hash fields. */ - static MONGO_DECLARE_SHIM((OperationContext * opCtx)->OplogSlot) getNextOpTime; + static MONGO_DECLARE_SHIM((OperationContext * opCtx, std::size_t count)->std::vector<OplogSlot>) + getNextOpTimes; +}; + +inline std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count) { + return GetNextOpTimeClass::getNextOpTimes(opCtx, count); }; inline OplogSlot getNextOpTime(OperationContext* opCtx) { - return GetNextOpTimeClass::getNextOpTime(opCtx); + auto slots = getNextOpTimes(opCtx, 1); + invariant(slots.size() == 1); + return slots.back(); } /** @@ -290,7 +296,6 @@ inline OplogSlot getNextOpTime(OperationContext* opCtx) { * prepare generates an oplog entry in a separate unit of work. */ OplogSlot getNextOpTimeNoPersistForTesting(OperationContext* opCtx); -std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 41c6df4eee3..9b97b178976 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -77,6 +77,8 @@ OplogEntry::CommandType parseCommandType(const BSONObj& objectField) { return OplogEntry::CommandType::kCommitTransaction; } else if (commandString == "abortTransaction") { return OplogEntry::CommandType::kAbortTransaction; + } else if (commandString == "prepareTransaction") { + return OplogEntry::CommandType::kPrepareTransaction; } else { uasserted(ErrorCodes::BadValue, str::stream() << "Unknown oplog entry command type: " << commandString diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index c385e813552..de984061e6b 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -62,6 +62,7 @@ public: kDropIndexes, kCommitTransaction, kAbortTransaction, + kPrepareTransaction, }; // Current oplog version, should be the value of the v field in all oplog entries. diff --git a/src/mongo/db/repl/oplog_shim.cpp b/src/mongo/db/repl/oplog_shim.cpp index 24840a3aae9..f6efbe31683 100644 --- a/src/mongo/db/repl/oplog_shim.cpp +++ b/src/mongo/db/repl/oplog_shim.cpp @@ -31,6 +31,6 @@ namespace mongo { namespace repl { -MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTime); +MONGO_DEFINE_SHIM(GetNextOpTimeClass::getNextOpTimes); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 2ab3ea5e89b..08e1dfd0cf4 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -167,7 +167,7 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 8703db1dfce..e39089dbb32 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -168,7 +168,7 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept override {} void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override {} void onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 1f2301e3765..4cf82057024 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -551,7 +551,8 @@ void TransactionParticipant::Participant::_setSpeculativeTransactionReadTimestam o(lk).transactionMetricsObserver.onChooseReadTimestamp(timestamp); } -TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) +TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx, + int numSlotsToReserve) : _opCtx(opCtx) { // Stash the transaction on the OperationContext on the stack. At the end of this function it // will be unstashed onto the OperationContext. @@ -559,7 +560,7 @@ TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* o // Begin a new WUOW and reserve a slot in the oplog. WriteUnitOfWork wuow(opCtx); - _oplogSlot = repl::getNextOpTime(opCtx); + _oplogSlots = repl::getNextOpTimes(opCtx, numSlotsToReserve); // Release the WUOW state since this WUOW is no longer in use. wuow.release(); @@ -936,12 +937,13 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( opCtx->checkForInterrupt(); o(lk).txnState.transitionTo(TransactionState::kPrepared); } - + std::vector<OplogSlot> reservedSlots; if (prepareOptime) { // On secondary, we just prepare the transaction and discard the buffered ops. prepareOplogSlot = OplogSlot(*prepareOptime, 0); stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).prepareOpTime = *prepareOptime; + reservedSlots.push_back(prepareOplogSlot); } else { // On primary, we reserve an optime, prepare the transaction and write the oplog entry. // @@ -950,8 +952,16 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( // being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the // storage-transaction it uses to keep the hole open will abort and the slot (and // corresponding oplog hole) will vanish. - oplogSlotReserver.emplace(opCtx); - prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot(); + if (!gUseMultipleOplogEntryFormatForTransactions) { + oplogSlotReserver.emplace(opCtx); + } else { + const auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx).size(); + // Reserve an extra slot here for the prepare oplog entry. + oplogSlotReserver.emplace(opCtx, numSlotsToReserve + 1); + invariant(oplogSlotReserver->getSlots().size() >= 1); + } + prepareOplogSlot = oplogSlotReserver->getLastSlot(); + reservedSlots = oplogSlotReserver->getSlots(); invariant(o().prepareOpTime.isNull(), str::stream() << "This transaction has already reserved a prepareOpTime at: " << o().prepareOpTime.toString()); @@ -971,9 +981,8 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( } opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); - opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( - opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx)); + opCtx, reservedSlots, retrieveCompletedTransactionOperations(opCtx)); abortGuard.dismiss(); @@ -1149,7 +1158,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction( if (opCtx->writesAreReplicated()) { invariant(!commitOplogEntryOpTime); oplogSlotReserver.emplace(opCtx); - commitOplogSlot = oplogSlotReserver->getReservedOplogSlot(); + commitOplogSlot = oplogSlotReserver->getLastSlot(); invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp, str::stream() << "Commit oplog entry must be greater than or equal to commit " "timestamp due to causal consistency. commit timestamp: " @@ -1334,7 +1343,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction( boost::optional<OplogSlot> abortOplogSlot; if (o().txnState.isPrepared() && opCtx->writesAreReplicated()) { oplogSlotReserver.emplace(opCtx); - abortOplogSlot = oplogSlotReserver->getReservedOplogSlot(); + abortOplogSlot = oplogSlotReserver->getLastSlot(); } // Clean up the transaction resources on the opCtx even if the transaction resources on the diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index a3673ef544d..6d05089b284 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -796,22 +796,29 @@ private: */ class OplogSlotReserver { public: - OplogSlotReserver(OperationContext* opCtx); + OplogSlotReserver(OperationContext* opCtx, int numSlotsToReserve = 1); ~OplogSlotReserver(); /** - * Returns the oplog slot reserved at construction. + * Returns the latest oplog slot reserved at construction. */ - OplogSlot getReservedOplogSlot() const { - invariant(!_oplogSlot.opTime.isNull()); - return _oplogSlot; + OplogSlot getLastSlot() { + invariant(!_oplogSlots.empty()); + invariant(!_oplogSlots.back().opTime.isNull()); + return getSlots().back(); + } + + std::vector<OplogSlot>& getSlots() { + invariant(!_oplogSlots.empty()); + invariant(!_oplogSlots.back().opTime.isNull()); + return _oplogSlots; } private: OperationContext* _opCtx; std::unique_ptr<Locker> _locker; std::unique_ptr<RecoveryUnit> _recoveryUnit; - OplogSlot _oplogSlot; + std::vector<OplogSlot> _oplogSlots; }; friend std::ostream& operator<<(std::ostream& s, TransactionState txnState) { diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index b78c62518a5..6f51d4ddf7c 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -87,10 +87,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); + OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 02f9cd938d8..bd92dd240a9 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -93,7 +93,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) override; bool onTransactionPrepareThrowsException = false; @@ -136,10 +136,10 @@ public: }; void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, - const OplogSlot& prepareOpTime, + const std::vector<OplogSlot>& reservedSlots, std::vector<repl::ReplOperation>& statements) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime, statements); + OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", |