diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-03-14 16:36:08 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-03-14 16:52:50 -0400 |
commit | 1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9 (patch) | |
tree | cecbef0ea229a5a876e5affa74ca6d0ff6836c47 /src/mongo | |
parent | 96aa8be92f1c6a69f7602aef7ef5be26b4a8a918 (diff) | |
download | mongo-1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9.tar.gz |
SERVER-39442 Write the new commit command on primary for large prepared transactions
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/txn_cmds.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 134 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 230 |
4 files changed, 339 insertions, 32 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl index f53df962e32..e5f9e6c236d 100644 --- a/src/mongo/db/commands/txn_cmds.idl +++ b/src/mongo/db/commands/txn_cmds.idl @@ -68,8 +68,8 @@ structs: prepared: type: bool optional: true - description: "Set to false for a commit for an unprepared transaction. Implicit - default is true, do not set explicitly to true" + description: "True if the transaction has been prepared. Set to false for a commit + for an unprepared transaction." AbortTransactionOplogObject: description: A document representing the 'o' field of an 'abortTransaction' oplog entry. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 3acf6aeb42b..860b92c7138 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1277,6 +1277,9 @@ void OpObserverImpl::onPreparedTransactionCommit( CommitTransactionOplogObject cmdObj; cmdObj.setCommitTimestamp(commitTimestamp); + if (gUseMultipleOplogEntryFormatForTransactions) { + cmdObj.setPrepared(true); + } logCommitOrAbortForPreparedTransaction( opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index d9fda193049..3bc545ebba6 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1823,5 +1823,139 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); } + +TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + + const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + + const auto prepareTimestamp = prepareOpTime.getTimestamp(); + + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); + + // Reserve oplog entry for the commit oplog entry. + OplogSlot commitSlot = repl::getNextOpTime(opCtx()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + + // Mimic committing the transaction. + opCtx()->setWriteUnitOfWork(nullptr); + opCtx()->lockState()->unsetMaxLockTimeout(); + + // commitTimestamp must be greater than the prepareTimestamp. + auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); + + txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + commitTimestamp, + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + oplogEntryObjs = getNOplogEntries(opCtx(), 3); + const auto commitOplogObj = oplogEntryObjs.back(); + // Statement id's for the insert and prepare should be 0 and 1 respectively. + const auto expectedCommitStmtId = 2; + checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId); + auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj)); + auto o = commitEntry.getObject(); + auto oExpected = BSON( + "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true); + ASSERT_BSONOBJ_EQ(oExpected, o); + + assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted); +} + +TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + + const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + + const auto prepareTimestamp = prepareOpTime.getTimestamp(); + + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); + + // Reserve oplog entry for the abort oplog entry. + OplogSlot abortSlot = repl::getNextOpTime(opCtx()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + + // Mimic aborting the transaction by resetting the WUOW. + opCtx()->setWriteUnitOfWork(nullptr); + opCtx()->lockState()->unsetMaxLockTimeout(); + opObserver().onTransactionAbort(opCtx(), abortSlot); + txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); + + oplogEntryObjs = getNOplogEntries(opCtx(), 3); + auto abortOplogObj = oplogEntryObjs.back(); + // Statement id's for the insert and prepare should be 0 and 1 respectively. + const auto expectedAbortStmtId = 2; + checkSessionAndTransactionFields(abortOplogObj, expectedAbortStmtId); + auto abortEntry = assertGet(OplogEntry::parse(abortOplogObj)); + auto o = abortEntry.getObject(); + auto oExpected = BSON("abortTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, o); + + assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted); +} + } // namespace } // namespace mongo diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 5160aaf823a..c2800a6a882 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2747,6 +2747,26 @@ public: unittest::log() << "Commit entry TS: " << commitEntryTs; } + BSONObj getSessionTxnInfoAtTimestamp(const Timestamp& ts, bool expected) { + AutoGetCollection autoColl( + _opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IX); + const auto sessionId = *_opCtx->getLogicalSessionId(); + const auto txnNum = *_opCtx->getTxnNumber(); + BSONObj doc; + OneOffRead oor(_opCtx, ts); + bool found = Helpers::findOne(_opCtx, + autoColl.getCollection(), + BSON("_id" << sessionId.toBSON() << "txnNum" << txnNum), + doc); + if (expected) { + ASSERT(found) << "Missing session transaction info at " << ts; + } else { + ASSERT_FALSE(found) << "Session transaction info at " << ts + << " is unexpectedly present " << doc; + } + return doc; + } + protected: NamespaceString nss; Timestamp presentTs; @@ -2890,33 +2910,13 @@ public: } } - BSONObj getSessionTxnInfoAtTimestamp(const Timestamp& ts, bool expected) { - AutoGetCollection autoColl( - _opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IX); - const auto sessionId = *_opCtx->getLogicalSessionId(); - const auto txnNum = *_opCtx->getTxnNumber(); - BSONObj doc; - OneOffRead oor(_opCtx, ts); - bool found = Helpers::findOne(_opCtx, - autoColl.getCollection(), - BSON("_id" << sessionId.toBSON() << "txnNum" << txnNum), - doc); - if (expected) { - ASSERT(found) << "Missing session transaction info at " << ts; - } else { - ASSERT_FALSE(found) << "Session transaction info at " << ts - << " is unexpectedly present " << doc; - } - return doc; - } - protected: Timestamp firstOplogEntryTs, secondOplogEntryTs; }; -class PreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { +class CommitPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { public: - PreparedMultiOplogEntryTransaction() + CommitPreparedMultiOplogEntryTransaction() : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") { gUseMultipleOplogEntryFormatForTransactions = true; const auto currentTime = _clock->getClusterTime(); @@ -2926,7 +2926,7 @@ public: commitEntryTs = currentTime.addTicks(4).asTimestamp(); } - ~PreparedMultiOplogEntryTransaction() { + ~CommitPreparedMultiOplogEntryTransaction() { gUseMultipleOplogEntryFormatForTransactions = false; } @@ -2970,20 +2970,25 @@ public: } txnParticipant.prepareTransaction(_opCtx, {}); + const BSONObj query1 = BSON("_id" << 1); + const BSONObj query2 = BSON("_id" << 2); + txnParticipant.stashTransactionResources(_opCtx); { AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS); auto coll = autoColl.getCollection(); - const BSONObj query1 = BSON("_id" << 1); - const BSONObj query2 = BSON("_id" << 2); assertDocumentAtTimestamp(coll, presentTs, BSONObj()); assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj()); + assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj()); + assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj()); assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj()); assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj()); assertDocumentAtTimestamp(coll, nullTs, BSONObj()); assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, firstOplogEntryTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, secondOplogEntryTs, false); assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true); assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true); assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true); @@ -2991,22 +2996,186 @@ public: // We haven't committed the prepared transaction assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(commitFilter, firstOplogEntryTs, false); + assertOplogDocumentExistsAtTimestamp(commitFilter, secondOplogEntryTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false); } - // Temporary until SERVER-39442: abort the prepared transaction and clean up the resources. + txnParticipant.unstashTransactionResources(_opCtx, "commitTransaction"); + + txnParticipant.commitPreparedTransaction(_opCtx, commitEntryTs, {}); + + txnParticipant.stashTransactionResources(_opCtx); + { + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + auto coll = autoColl.getCollection(); + assertDocumentAtTimestamp(coll, presentTs, BSONObj()); + assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj()); + assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj()); + assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj()); + assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj()); + assertFilteredDocumentAtTimestamp(coll, query1, commitEntryTs, doc); + assertFilteredDocumentAtTimestamp(coll, query2, commitEntryTs, doc2); + assertFilteredDocumentAtTimestamp(coll, query1, nullTs, doc); + assertFilteredDocumentAtTimestamp(coll, query2, nullTs, doc2); + + // The prepare oplog entry should exist at prepareEntryTs and onwards. + assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true); + + // The commit oplog entry should exist at commitEntryTs and onwards. + assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false); + assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, true); + assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true); + + // The first oplog entry should exist at firstOplogEntryTs and onwards. + const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op" + << "i" + << "o" + << doc); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, secondOplogEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true); + + // The second oplog entry should exist at secondOplogEntryTs and onwards. + const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op" + << "i" + << "o" + << doc2); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, secondOplogEntryTs, true); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, commitEntryTs, true); + assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, nullTs, true); + + // The session state should go to inProgress at firstOplogEntryTs, then to prepared at + // prepareEntryTs, and then finally to committed at commitEntryTs. + auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true); + ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); + + sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true); + ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs); + + sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true); + ASSERT_EQ(sessionInfo["state"].String(), "prepared"); + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), prepareEntryTs); + + sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true); + ASSERT_EQ(sessionInfo["state"].String(), "committed"); + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs); + } + } + +protected: + Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs; +}; + +class AbortPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { +public: + AbortPreparedMultiOplogEntryTransaction() + : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") { + gUseMultipleOplogEntryFormatForTransactions = true; + const auto currentTime = _clock->getClusterTime(); + firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); + prepareEntryTs = currentTime.addTicks(2).asTimestamp(); + abortEntryTs = currentTime.addTicks(3).asTimestamp(); + } + + ~AbortPreparedMultiOplogEntryTransaction() { + gUseMultipleOplogEntryFormatForTransactions = false; + } + + void run() { + auto txnParticipant = TransactionParticipant::get(_opCtx); + ASSERT(txnParticipant); + unittest::log() << "PrepareTS: " << prepareEntryTs; + unittest::log() << "AbortTS: " << abortEntryTs; + + const auto prepareFilter = BSON("ts" << prepareEntryTs); + const auto abortFilter = BSON("ts" << abortEntryTs); + { + assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, firstOplogEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, false); + } + txnParticipant.unstashTransactionResources(_opCtx, "insert"); + + txnParticipant.prepareTransaction(_opCtx, {}); + + txnParticipant.stashTransactionResources(_opCtx); + { + assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, abortEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true); + + assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, firstOplogEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, false); + } + txnParticipant.unstashTransactionResources(_opCtx, "abortTransaction"); + txnParticipant.abortActiveTransaction(_opCtx); + txnParticipant.stashTransactionResources(_opCtx); + const BSONObj query1 = BSON("_id" << 1); + { + // The prepare oplog entry should exist at prepareEntryTs and onwards. + assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, abortEntryTs, true); + assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true); + + // The abort oplog entry should exist at abortEntryTs and onwards. + assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false); + assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, true); + assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, true); - // TODO (SERVER-39442): Commit the prepared transaction and assert existence of oplogs at - // commitTimestamp. + // The first oplog entry should exist at firstOplogEntryTs and onwards. + const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op" + << "i" + << "o" + << doc); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, abortEntryTs, true); + assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true); + + // The session state should be "aborted" at abortEntryTs. + auto sessionInfo = getSessionTxnInfoAtTimestamp(abortEntryTs, true); + ASSERT_EQ(sessionInfo["state"].String(), "aborted"); + ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), abortEntryTs); + } } protected: - Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs; + Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs, abortEntryTs; }; class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest { @@ -3252,7 +3421,8 @@ public: add<CreateCollectionWithSystemIndex>(); add<MultiDocumentTransaction>(); add<MultiOplogEntryTransaction>(); - add<PreparedMultiOplogEntryTransaction>(); + add<CommitPreparedMultiOplogEntryTransaction>(); + add<AbortPreparedMultiOplogEntryTransaction>(); add<PreparedMultiDocumentTransaction>(); add<AbortedPreparedMultiDocumentTransaction>(); } |