summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-03-14 16:36:08 -0400
committerJason Chan <jason.chan@10gen.com>2019-03-14 16:52:50 -0400
commit1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9 (patch)
treececbef0ea229a5a876e5affa74ca6d0ff6836c47 /src/mongo
parent96aa8be92f1c6a69f7602aef7ef5be26b4a8a918 (diff)
downloadmongo-1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9.tar.gz
SERVER-39442 Write the new commit command on primary for large prepared transactions
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/txn_cmds.idl4
-rw-r--r--src/mongo/db/op_observer_impl.cpp3
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp134
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp230
4 files changed, 339 insertions, 32 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl
index f53df962e32..e5f9e6c236d 100644
--- a/src/mongo/db/commands/txn_cmds.idl
+++ b/src/mongo/db/commands/txn_cmds.idl
@@ -68,8 +68,8 @@ structs:
prepared:
type: bool
optional: true
- description: "Set to false for a commit for an unprepared transaction. Implicit
- default is true, do not set explicitly to true"
+ description: "True if the transaction has been prepared. Set to false for a commit
+ for an unprepared transaction."
AbortTransactionOplogObject:
description: A document representing the 'o' field of an 'abortTransaction' oplog entry.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 3acf6aeb42b..860b92c7138 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1277,6 +1277,9 @@ void OpObserverImpl::onPreparedTransactionCommit(
CommitTransactionOplogObject cmdObj;
cmdObj.setCommitTimestamp(commitTimestamp);
+ if (gUseMultipleOplogEntryFormatForTransactions) {
+ cmdObj.setPrepared(true);
+ }
logCommitOrAbortForPreparedTransaction(
opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index d9fda193049..3bc545ebba6 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1823,5 +1823,139 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
}
+
+TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
+ const NamespaceString nss1("testDB", "testColl");
+ auto uuid1 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
+
+ // Reserve oplog entry for the commit oplog entry.
+ OplogSlot commitSlot = repl::getNextOpTime(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ // commitTimestamp must be greater than the prepareTimestamp.
+ auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ const auto commitOplogObj = oplogEntryObjs.back();
+ // Statement id's for the insert and prepare should be 0 and 1 respectively.
+ const auto expectedCommitStmtId = 2;
+ checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId);
+ auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
+ auto o = commitEntry.getObject();
+ auto oExpected = BSON(
+ "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
+ const NamespaceString nss1("testDB", "testColl");
+ auto uuid1 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
+
+ // Reserve oplog entry for the abort oplog entry.
+ OplogSlot abortSlot = repl::getNextOpTime(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+
+ // Mimic aborting the transaction by resetting the WUOW.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ auto abortOplogObj = oplogEntryObjs.back();
+ // Statement id's for the insert and prepare should be 0 and 1 respectively.
+ const auto expectedAbortStmtId = 2;
+ checkSessionAndTransactionFields(abortOplogObj, expectedAbortStmtId);
+ auto abortEntry = assertGet(OplogEntry::parse(abortOplogObj));
+ auto o = abortEntry.getObject();
+ auto oExpected = BSON("abortTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 5160aaf823a..c2800a6a882 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2747,6 +2747,26 @@ public:
unittest::log() << "Commit entry TS: " << commitEntryTs;
}
+ BSONObj getSessionTxnInfoAtTimestamp(const Timestamp& ts, bool expected) {
+ AutoGetCollection autoColl(
+ _opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IX);
+ const auto sessionId = *_opCtx->getLogicalSessionId();
+ const auto txnNum = *_opCtx->getTxnNumber();
+ BSONObj doc;
+ OneOffRead oor(_opCtx, ts);
+ bool found = Helpers::findOne(_opCtx,
+ autoColl.getCollection(),
+ BSON("_id" << sessionId.toBSON() << "txnNum" << txnNum),
+ doc);
+ if (expected) {
+ ASSERT(found) << "Missing session transaction info at " << ts;
+ } else {
+ ASSERT_FALSE(found) << "Session transaction info at " << ts
+ << " is unexpectedly present " << doc;
+ }
+ return doc;
+ }
+
protected:
NamespaceString nss;
Timestamp presentTs;
@@ -2890,33 +2910,13 @@ public:
}
}
- BSONObj getSessionTxnInfoAtTimestamp(const Timestamp& ts, bool expected) {
- AutoGetCollection autoColl(
- _opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IX);
- const auto sessionId = *_opCtx->getLogicalSessionId();
- const auto txnNum = *_opCtx->getTxnNumber();
- BSONObj doc;
- OneOffRead oor(_opCtx, ts);
- bool found = Helpers::findOne(_opCtx,
- autoColl.getCollection(),
- BSON("_id" << sessionId.toBSON() << "txnNum" << txnNum),
- doc);
- if (expected) {
- ASSERT(found) << "Missing session transaction info at " << ts;
- } else {
- ASSERT_FALSE(found) << "Session transaction info at " << ts
- << " is unexpectedly present " << doc;
- }
- return doc;
- }
-
protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs;
};
-class PreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
+class CommitPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
public:
- PreparedMultiOplogEntryTransaction()
+ CommitPreparedMultiOplogEntryTransaction()
: MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
gUseMultipleOplogEntryFormatForTransactions = true;
const auto currentTime = _clock->getClusterTime();
@@ -2926,7 +2926,7 @@ public:
commitEntryTs = currentTime.addTicks(4).asTimestamp();
}
- ~PreparedMultiOplogEntryTransaction() {
+ ~CommitPreparedMultiOplogEntryTransaction() {
gUseMultipleOplogEntryFormatForTransactions = false;
}
@@ -2970,20 +2970,25 @@ public:
}
txnParticipant.prepareTransaction(_opCtx, {});
+ const BSONObj query1 = BSON("_id" << 1);
+ const BSONObj query2 = BSON("_id" << 2);
+
txnParticipant.stashTransactionResources(_opCtx);
{
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS, LockMode::MODE_IS);
auto coll = autoColl.getCollection();
- const BSONObj query1 = BSON("_id" << 1);
- const BSONObj query2 = BSON("_id" << 2);
assertDocumentAtTimestamp(coll, presentTs, BSONObj());
assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
+ assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj());
assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
assertDocumentAtTimestamp(coll, commitEntryTs, BSONObj());
assertDocumentAtTimestamp(coll, nullTs, BSONObj());
assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, secondOplogEntryTs, false);
assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true);
assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
@@ -2991,22 +2996,186 @@ public:
// We haven't committed the prepared transaction
assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, secondOplogEntryTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, false);
}
- // Temporary until SERVER-39442: abort the prepared transaction and clean up the resources.
+ txnParticipant.unstashTransactionResources(_opCtx, "commitTransaction");
+
+ txnParticipant.commitPreparedTransaction(_opCtx, commitEntryTs, {});
+
+ txnParticipant.stashTransactionResources(_opCtx);
+ {
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
+ auto coll = autoColl.getCollection();
+ assertDocumentAtTimestamp(coll, presentTs, BSONObj());
+ assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
+ assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj());
+ assertDocumentAtTimestamp(coll, prepareEntryTs, BSONObj());
+ assertFilteredDocumentAtTimestamp(coll, query1, commitEntryTs, doc);
+ assertFilteredDocumentAtTimestamp(coll, query2, commitEntryTs, doc2);
+ assertFilteredDocumentAtTimestamp(coll, query1, nullTs, doc);
+ assertFilteredDocumentAtTimestamp(coll, query2, nullTs, doc2);
+
+ // The prepare oplog entry should exist at prepareEntryTs and onwards.
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, commitEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
+
+ // The commit oplog entry should exist at commitEntryTs and onwards.
+ assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true);
+
+ // The first oplog entry should exist at firstOplogEntryTs and onwards.
+ const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op"
+ << "i"
+ << "o"
+ << doc);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, secondOplogEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true);
+
+ // The second oplog entry should exist at secondOplogEntryTs and onwards.
+ const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op"
+ << "i"
+ << "o"
+ << doc2);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, secondOplogEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, commitEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, nullTs, true);
+
+ // The session state should go to inProgress at firstOplogEntryTs, then to prepared at
+ // prepareEntryTs, and then finally to committed at commitEntryTs.
+ auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true);
+ ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
+
+ sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true);
+ ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), secondOplogEntryTs);
+
+ sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true);
+ ASSERT_EQ(sessionInfo["state"].String(), "prepared");
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), prepareEntryTs);
+
+ sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true);
+ ASSERT_EQ(sessionInfo["state"].String(), "committed");
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs);
+ }
+ }
+
+protected:
+ Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs;
+};
+
+class AbortPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
+public:
+ AbortPreparedMultiOplogEntryTransaction()
+ : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ const auto currentTime = _clock->getClusterTime();
+ firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
+ prepareEntryTs = currentTime.addTicks(2).asTimestamp();
+ abortEntryTs = currentTime.addTicks(3).asTimestamp();
+ }
+
+ ~AbortPreparedMultiOplogEntryTransaction() {
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ }
+
+ void run() {
+ auto txnParticipant = TransactionParticipant::get(_opCtx);
+ ASSERT(txnParticipant);
+ unittest::log() << "PrepareTS: " << prepareEntryTs;
+ unittest::log() << "AbortTS: " << abortEntryTs;
+
+ const auto prepareFilter = BSON("ts" << prepareEntryTs);
+ const auto abortFilter = BSON("ts" << abortEntryTs);
+ {
+ assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, false);
+ }
+ txnParticipant.unstashTransactionResources(_opCtx, "insert");
+
+ txnParticipant.prepareTransaction(_opCtx, {});
+
+ txnParticipant.stashTransactionResources(_opCtx);
+ {
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, abortEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
+
+ assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, firstOplogEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, false);
+ }
+
txnParticipant.unstashTransactionResources(_opCtx, "abortTransaction");
+
txnParticipant.abortActiveTransaction(_opCtx);
+
txnParticipant.stashTransactionResources(_opCtx);
+ const BSONObj query1 = BSON("_id" << 1);
+ {
+ // The prepare oplog entry should exist at prepareEntryTs and onwards.
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, abortEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(prepareFilter, nullTs, true);
+
+ // The abort oplog entry should exist at abortEntryTs and onwards.
+ assertOplogDocumentExistsAtTimestamp(abortFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, prepareEntryTs, false);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, true);
- // TODO (SERVER-39442): Commit the prepared transaction and assert existence of oplogs at
- // commitTimestamp.
+ // The first oplog entry should exist at firstOplogEntryTs and onwards.
+ const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op"
+ << "i"
+ << "o"
+ << doc);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, abortEntryTs, true);
+ assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true);
+
+ // The session state should be "aborted" at abortEntryTs.
+ auto sessionInfo = getSessionTxnInfoAtTimestamp(abortEntryTs, true);
+ ASSERT_EQ(sessionInfo["state"].String(), "aborted");
+ ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), abortEntryTs);
+ }
}
protected:
- Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs;
+ Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs, abortEntryTs;
};
class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest {
@@ -3252,7 +3421,8 @@ public:
add<CreateCollectionWithSystemIndex>();
add<MultiDocumentTransaction>();
add<MultiOplogEntryTransaction>();
- add<PreparedMultiOplogEntryTransaction>();
+ add<CommitPreparedMultiOplogEntryTransaction>();
+ add<AbortPreparedMultiOplogEntryTransaction>();
add<PreparedMultiDocumentTransaction>();
add<AbortedPreparedMultiDocumentTransaction>();
}