summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-03-14 16:36:08 -0400
committerJason Chan <jason.chan@10gen.com>2019-03-14 16:52:50 -0400
commit1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9 (patch)
treececbef0ea229a5a876e5affa74ca6d0ff6836c47 /src/mongo/db
parent96aa8be92f1c6a69f7602aef7ef5be26b4a8a918 (diff)
downloadmongo-1de61831b1f1f1e780fb7fdd14e10e34a1c5cab9.tar.gz
SERVER-39442 Write the new commit command on primary for large prepared transactions
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/txn_cmds.idl4
-rw-r--r--src/mongo/db/op_observer_impl.cpp3
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp134
3 files changed, 139 insertions, 2 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl
index f53df962e32..e5f9e6c236d 100644
--- a/src/mongo/db/commands/txn_cmds.idl
+++ b/src/mongo/db/commands/txn_cmds.idl
@@ -68,8 +68,8 @@ structs:
prepared:
type: bool
optional: true
- description: "Set to false for a commit for an unprepared transaction. Implicit
- default is true, do not set explicitly to true"
+ description: "True if the transaction has been prepared. Set to false for a commit
+ for an unprepared transaction."
AbortTransactionOplogObject:
description: A document representing the 'o' field of an 'abortTransaction' oplog entry.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 3acf6aeb42b..860b92c7138 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1277,6 +1277,9 @@ void OpObserverImpl::onPreparedTransactionCommit(
CommitTransactionOplogObject cmdObj;
cmdObj.setCommitTimestamp(commitTimestamp);
+ if (gUseMultipleOplogEntryFormatForTransactions) {
+ cmdObj.setPrepared(true);
+ }
logCommitOrAbortForPreparedTransaction(
opCtx, commitOplogEntryOpTime, cmdObj.toBSON(), DurableTxnStateEnum::kCommitted);
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index d9fda193049..3bc545ebba6 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1823,5 +1823,139 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
}
+
+TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
+ const NamespaceString nss1("testDB", "testColl");
+ auto uuid1 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
+
+ // Reserve oplog entry for the commit oplog entry.
+ OplogSlot commitSlot = repl::getNextOpTime(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ // commitTimestamp must be greater than the prepareTimestamp.
+ auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ const auto commitOplogObj = oplogEntryObjs.back();
+ // Statement id's for the insert and prepare should be 0 and 1 respectively.
+ const auto expectedCommitStmtId = 2;
+ checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId);
+ auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
+ auto o = commitEntry.getObject();
+ auto oExpected = BSON(
+ "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ assertTxnRecord(txnNum(), commitSlot.opTime, DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
+ const NamespaceString nss1("testDB", "testColl");
+ auto uuid1 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ prepareOpTime = reservedSlots.back().opTime;
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
+
+ // Reserve oplog entry for the abort oplog entry.
+ OplogSlot abortSlot = repl::getNextOpTime(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+
+ // Mimic aborting the transaction by resetting the WUOW.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ auto abortOplogObj = oplogEntryObjs.back();
+ // Statement id's for the insert and prepare should be 0 and 1 respectively.
+ const auto expectedAbortStmtId = 2;
+ checkSessionAndTransactionFields(abortOplogObj, expectedAbortStmtId);
+ auto abortEntry = assertGet(OplogEntry::parse(abortOplogObj));
+ auto o = abortEntry.getObject();
+ auto oExpected = BSON("abortTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ assertTxnRecord(txnNum(), abortSlot.opTime, DurableTxnStateEnum::kAborted);
+}
+
} // namespace
} // namespace mongo