diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-03-14 16:36:08 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-03-14 16:52:50 -0400 |
commit | 1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9 (patch) | |
tree | cecbef0ea229a5a876e5affa74ca6d0ff6836c47 /src/mongo/db | |
parent | 96aa8be92f1c6a69f7602aef7ef5be26b4a8a918 (diff) | |
download | mongo-1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9.tar.gz |
SERVER-39442 Write the new commit command on primary for large prepared transactions
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/txn_cmds.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 134 |
3 files changed, 139 insertions, 2 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl index f53df962e32..e5f9e6c236d 100644 --- a/src/mongo/db/commands/txn_cmds.idl +++ b/src/mongo/db/commands/txn_cmds.idl @@ -68,8 +68,8 @@ structs: prepared: type: bool optional: true - description: "Set to false for a commit for an unprepared transaction. Implicit - default is true, do not set explicitly to true" + description: "True if the transaction has been prepared. Set to false for a commit + for an unprepared transaction." AbortTransactionOplogObject: description: A document representing the 'o' field of an 'abortTransaction' oplog entry. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 3acf6aeb42b..860b92c7138 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1277,6 +1277,9 @@ void OpObserverImpl::onPreparedTransactionCommit( CommitTransactionOplogObject cmdObj; cmdObj.setCommitTimestamp(commitTimestamp); + if (gUseMultipleOplogEntryFormatForTransactions) { + cmdObj.setPrepared(true); + } logCommitOrAbortForPreparedTransaction( opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index d9fda193049..3bc545ebba6 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1823,5 +1823,139 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); } + +TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + + const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + + const auto prepareTimestamp = prepareOpTime.getTimestamp(); + + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); + + // Reserve oplog entry for the commit oplog entry. + OplogSlot commitSlot = repl::getNextOpTime(opCtx()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + + // Mimic committing the transaction. + opCtx()->setWriteUnitOfWork(nullptr); + opCtx()->lockState()->unsetMaxLockTimeout(); + + // commitTimestamp must be greater than the prepareTimestamp. + auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); + + txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + commitTimestamp, + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + oplogEntryObjs = getNOplogEntries(opCtx(), 3); + const auto commitOplogObj = oplogEntryObjs.back(); + // Statement id's for the insert and prepare should be 0 and 1 respectively. + const auto expectedCommitStmtId = 2; + checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId); + auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj)); + auto o = commitEntry.getObject(); + auto oExpected = BSON( + "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true); + ASSERT_BSONOBJ_EQ(oExpected, o); + + assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted); +} + +TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + prepareOpTime = reservedSlots.back().opTime; + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + + const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + + const auto prepareTimestamp = prepareOpTime.getTimestamp(); + + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); + + // Reserve oplog entry for the abort oplog entry. + OplogSlot abortSlot = repl::getNextOpTime(opCtx()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); + + // Mimic aborting the transaction by resetting the WUOW. + opCtx()->setWriteUnitOfWork(nullptr); + opCtx()->lockState()->unsetMaxLockTimeout(); + opObserver().onTransactionAbort(opCtx(), abortSlot); + txnParticipant.transitionToAbortedWithPrepareforTest(opCtx()); + + oplogEntryObjs = getNOplogEntries(opCtx(), 3); + auto abortOplogObj = oplogEntryObjs.back(); + // Statement id's for the insert and prepare should be 0 and 1 respectively. + const auto expectedAbortStmtId = 2; + checkSessionAndTransactionFields(abortOplogObj, expectedAbortStmtId); + auto abortEntry = assertGet(OplogEntry::parse(abortOplogObj)); + auto o = abortEntry.getObject(); + auto oExpected = BSON("abortTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, o); + + assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted); +} + } // namespace } // namespace mongo |