diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2019-05-06 10:17:00 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2019-05-06 10:17:00 -0400 |
commit | 1b8d3d355363b3b2a5b8565df60f07a08393ced6 (patch) | |
tree | aed490c00c4c7eedd72e8957490ff82edda87d10 /src/mongo | |
parent | 30f602bb9c8799e9b2b0d1c608d13fdfb24d2ce2 (diff) | |
download | mongo-1b8d3d355363b3b2a5b8565df60f07a08393ced6.tar.gz |
SERVER-40725 Pack multiple operations into a single applyOps on primary for large transactions
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 347 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.idl | 10 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 40 |
4 files changed, 444 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 3ffb4988165..83ef9fe73e4 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -992,17 +992,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, namespace { -OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, - std::vector<repl::ReplOperation> stmts, - const OplogSlot& oplogSlot, - repl::OpTime prevWriteOpTime, - StmtId stmtId, - const bool prepare, - const bool isPartialTxn, - const bool shouldWriteStateField) { +// Logs one applyOps, packing in as many operations as fit in a single applyOps entry. If +// isPartialTxn is not set, all operations are attempted to be packed, regardless of whether or +// not they fit; TransactionTooLarge will be thrown if this is not the case. +// +// Returns an iterator to the first ReplOperation not packed in the batch. +std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApplyOpsForTransaction( + OperationContext* opCtx, + std::vector<repl::ReplOperation>::const_iterator stmtBegin, + std::vector<repl::ReplOperation>::const_iterator stmtEnd, + const OplogSlot& oplogSlot, + repl::OpTime prevWriteOpTime, + StmtId stmtId, + const bool prepare, + const bool isPartialTxn, + const bool shouldWriteStateField) { BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); - for (auto& stmt : stmts) { + std::vector<repl::ReplOperation>::const_iterator stmtIter; + for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { + const auto& stmt = *stmtIter; + // Stop packing when either number of transaction operations is reached, or when the next + // one would put the array over the maximum BSON Object User Size. We rely on the + // head room between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the + // BSON overhead and the other applyOps fields. But if the array with a single operation + // exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation + // should be able to be applied. + if (isPartialTxn && + (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || + (opsArray.arrSize() > 0 && + (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > + BSONObjMaxUserSize)))) + break; opsArray.append(stmt.toBSON()); } opsArray.done(); @@ -1059,7 +1080,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, txnState, startOpTime); } - return times; + return {times, stmtIter}; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. uassert(ErrorCodes::TransactionTooLarge, @@ -1099,12 +1120,13 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx, return times; } -// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If -// there are more oplog slots than statements, then only the first n slots are used, where n is the -// size of 'stmts'. -void logOplogEntriesForTransaction(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& stmts, - const std::vector<OplogSlot>& oplogSlots) { +// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts' in +// the worst case, where each operation requires an applyOps entry of its own. If there are more +// oplog slots than applyOps operations are written, the number of oplog slots corresponding to the +// number of applyOps written will be used. The number of oplog entries written is returned. +int logOplogEntriesForTransaction(OperationContext* opCtx, + const std::vector<repl::ReplOperation>& stmts, + const std::vector<OplogSlot>& oplogSlots) { invariant(!stmts.empty()); invariant(stmts.size() <= oplogSlots.size()); @@ -1122,23 +1144,23 @@ void logOplogEntriesForTransaction(OperationContext* opCtx, // Note the logged statement IDs are not the same as the user-chosen statement IDs. stmtId = 0; auto oplogSlot = oplogSlots.begin(); - for (const auto& stmt : stmts) { - // Wrap each individual operation in an applyOps entry. This is temporary until we - // implement the logic to pack multiple operations into a single applyOps entry for - // the larger transactions project. - // TODO (SERVER-40725): Modify this comment once packing logic is done. - prevWriteOpTime = logApplyOpsForTransaction(opCtx, - {stmt}, - *oplogSlot++, - prevWriteOpTime.writeOpTime, - stmtId, - false /* prepare */, - true /* isPartialTxn */, - true /* shouldWriteStateField */); + auto stmtsBegin = stmts.begin(); + while (stmtsBegin != stmts.end()) { + std::tie(prevWriteOpTime, stmtsBegin) = + logApplyOpsForTransaction(opCtx, + stmtsBegin, + stmts.end(), + *oplogSlot++, + prevWriteOpTime.writeOpTime, + stmtId, + false /* prepare */, + true /* isPartialTxn */, + true /* shouldWriteStateField */); stmtId++; } wuow.commit(); }); + return stmtId; } void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, @@ -1280,14 +1302,15 @@ void OpObserverImpl::onUnpreparedTransactionCommit( invariant(lastWriteOpTime.isNull()); commitOpTime = logApplyOpsForTransaction( opCtx, - statements, + statements.begin(), + statements.end(), OplogSlot(), lastWriteOpTime, StmtId(0), false /* prepare */, false /* isPartialTxn */, fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) - .writeOpTime; + .first.writeOpTime; } else { // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction, plus one for the commit @@ -1296,12 +1319,10 @@ void OpObserverImpl::onUnpreparedTransactionCommit( auto commitSlot = oplogSlots.back(); oplogSlots.pop_back(); invariant(oplogSlots.size() == statements.size()); - logOplogEntriesForTransaction(opCtx, statements, oplogSlots); - commitOpTime = logCommitForUnpreparedTransaction(opCtx, - statements.size() /* stmtId */, - oplogSlots.back(), - commitSlot, - statements.size()); + int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots); + const auto prevWriteOpTime = oplogSlots[numOplogEntries - 1]; + commitOpTime = logCommitForUnpreparedTransaction( + opCtx, StmtId(numOplogEntries), prevWriteOpTime, commitSlot, statements.size()); } invariant(!commitOpTime.isNull()); shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime); @@ -1410,7 +1431,8 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, invariant(lastWriteOpTime.isNull()); logApplyOpsForTransaction( opCtx, - statements, + statements.begin(), + statements.end(), prepareOpTime, lastWriteOpTime, StmtId(0), @@ -1436,16 +1458,23 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); // It is possible that the transaction resulted in no changes, In that case, we // should not write any operations other than the prepare oplog entry. + int nOperationOplogEntries = 0; if (!statements.empty()) { - logOplogEntriesForTransaction(opCtx, statements, reservedSlots); - - // The prevOpTime is the OpTime of the second last entry in the reserved slots. - prevOpTime = reservedSlots.rbegin()[1]; + nOperationOplogEntries = + logOplogEntriesForTransaction(opCtx, statements, reservedSlots); + + // We had reserved enough oplog slots for the worst case where each operation + // produced one oplog entry. When operations are smaller and can be packed, + // we will waste the extra slots. The prepare will still use the last + // reserved slot, because the transaction participant has already used + // that as the prepare time. So we set the prevOpTime to the last applyOps, + // to make the prevOpTime links work correctly. + prevOpTime = reservedSlots[nOperationOplogEntries - 1]; } auto startTxnSlot = reservedSlots.front(); const auto startOpTime = startTxnSlot; logPrepareTransaction( - opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime, startOpTime); + opCtx, StmtId(nOperationOplogEntries), prevOpTime, prepareOpTime, startOpTime); wuow.commit(); }); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 2c4a7bab692..76a2e3d0ca6 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1326,13 +1326,19 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { class OpObserverMultiEntryTransactionTest : public OpObserverTransactionTest { void setUp() override { gUseMultipleOplogEntryFormatForTransactions = true; + _prevPackingLimit = gMaxNumberOfTransactionOperationsInSingleOplogEntry; + gMaxNumberOfTransactionOperationsInSingleOplogEntry = 1; OpObserverTransactionTest::setUp(); } void tearDown() override { OpObserverTransactionTest::tearDown(); gUseMultipleOplogEntryFormatForTransactions = false; + gMaxNumberOfTransactionOperationsInSingleOplogEntry = _prevPackingLimit; } + +private: + int _prevPackingLimit; }; TEST_F(OpObserverMultiEntryTransactionTest, @@ -1911,11 +1917,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { inserts1.emplace_back(0, BSON("_id" << 0 << "data" << "x")); + inserts1.emplace_back(1, + BSON("_id" << 1 << "data" + << "y")); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); repl::OpTime prepareOpTime; - auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); @@ -1923,17 +1932,21 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { opObserver().onTransactionPrepare( opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); - const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); - ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand); - ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps); + const auto insertEntry1 = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry1.getOpType() == repl::OpTypeEnum::kCommand); + ASSERT_TRUE(insertEntry1.getCommandType() == OplogEntry::CommandType::kApplyOps); - const auto startOpTime = insertEntry.getOpTime(); + const auto insertEntry2 = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_TRUE(insertEntry2.getOpType() == repl::OpTypeEnum::kCommand); + ASSERT_TRUE(insertEntry2.getCommandType() == OplogEntry::CommandType::kApplyOps); + + const auto startOpTime = insertEntry1.getOpTime(); const auto prepareTimestamp = prepareOpTime.getTimestamp(); - const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[2])); ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); @@ -1960,16 +1973,18 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { commitTimestamp, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - oplogEntryObjs = getNOplogEntries(opCtx(), 3); + oplogEntryObjs = getNOplogEntries(opCtx(), 4); const auto commitOplogObj = oplogEntryObjs.back(); - // Statement id's for the insert and prepare should be 0 and 1 respectively. - const auto expectedCommitStmtId = 2; + // Statement id's for the inserts and prepare should be [0,1] and 2 respectively. + const auto expectedCommitStmtId = 3; checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId); auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj)); auto o = commitEntry.getObject(); auto oExpected = BSON( "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true); ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime()); assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted); // startTimestamp should no longer be set once the transaction has been committed. @@ -2037,11 +2052,323 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { auto o = abortEntry.getObject(); auto oExpected = BSON("abortTransaction" << 1); ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_TRUE(abortEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(*abortEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime()); assertTxnRecord(txnNum(), abortSlot, DurableTxnStateEnum::kAborted); // startOpTime should no longer be set once a transaction has been aborted. assertTxnRecordStartOpTime(boost::none); } +TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) { + gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max(); + + const NamespaceString nss1("testDB", "testColl"); + const NamespaceString nss2("testDB2", "testColl2"); + auto uuid1 = CollectionUUID::gen(); + auto uuid2 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, BSON("_id" << 0)); + inserts1.emplace_back(1, BSON("_id" << 1)); + std::vector<InsertStatement> inserts2; + inserts2.emplace_back(0, BSON("_id" << 2)); + inserts2.emplace_back(1, BSON("_id" << 3)); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + StmtId expectedStmtId = 0; + std::vector<OplogEntry> oplogEntries; + mongo::repl::OpTime expectedPrevWriteOpTime; + for (const auto& oplogEntryObj : oplogEntryObjs) { + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); + const auto& oplogEntry = oplogEntries.back(); + ASSERT(!oplogEntry.getPrepare()); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); + expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; + } + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0)) + << BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 1)) + << BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 2)) + << BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 3))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); +} + +TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) { + gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max(); + + const NamespaceString nss1("testDB", "testColl"); + const NamespaceString nss2("testDB2", "testColl2"); + auto uuid1 = CollectionUUID::gen(); + auto uuid2 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, BSON("_id" << 0)); + inserts1.emplace_back(1, BSON("_id" << 1)); + std::vector<InsertStatement> inserts2; + inserts2.emplace_back(0, BSON("_id" << 2)); + inserts2.emplace_back(1, BSON("_id" << 3)); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + StmtId expectedStmtId = 0; + std::vector<OplogEntry> oplogEntries; + mongo::repl::OpTime expectedPrevWriteOpTime; + for (const auto& oplogEntryObj : oplogEntryObjs) { + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); + const auto& oplogEntry = oplogEntries.back(); + ASSERT(!oplogEntry.getPrepare()); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); + expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; + } + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0)) + << BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 1)) + << BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 2)) + << BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 3))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + oExpected = BSON("prepareTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); +} + +TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) { + gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max(); + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + + std::vector<InsertStatement> inserts1; + inserts1.emplace_back(0, + BSON("_id" << 0 << "data" + << "x")); + inserts1.emplace_back(1, + BSON("_id" << 1 << "data" + << "y")); + + opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); + + repl::OpTime prepareOpTime; + auto reservedSlots = repl::getNextOpTimes(opCtx(), 3); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare( + opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + + const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand); + ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps); + + const auto startOpTime = insertEntry.getOpTime(); + + const auto prepareTimestamp = prepareOpTime.getTimestamp(); + + const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1])); + ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp()); + ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction); + + // Reserve oplog entry for the commit oplog entry. + OplogSlot commitSlot = repl::getNextOpTime(opCtx()); + + ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); + assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecordStartOpTime(startOpTime); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + + // Mimic committing the transaction. + opCtx()->setWriteUnitOfWork(nullptr); + opCtx()->lockState()->unsetMaxLockTimeout(); + + // commitTimestamp must be greater than the prepareTimestamp. + auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); + + txnParticipant.transitionToCommittingWithPrepareforTest(opCtx()); + opObserver().onPreparedTransactionCommit( + opCtx(), + commitSlot, + commitTimestamp, + txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + + oplogEntryObjs = getNOplogEntries(opCtx(), 3); + const auto commitOplogObj = oplogEntryObjs.back(); + // Statement id is based on number of operations + prepare, not actual number of applyOps. + const auto expectedCommitStmtId = 3; + checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId); + auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj)); + auto o = commitEntry.getObject(); + auto oExpected = BSON( + "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true); + ASSERT_BSONOBJ_EQ(oExpected, o); + ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime()); + + assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted); + // startTimestamp should no longer be set once the transaction has been committed. + assertTxnRecordStartOpTime(boost::none); +} + +class OpObserverLargeMultiEntryTransactionTest : public OpObserverLargeTransactionTest { + void setUp() override { + gUseMultipleOplogEntryFormatForTransactions = true; + OpObserverTransactionTest::setUp(); + } + + void tearDown() override { + OpObserverTransactionTest::tearDown(); + gUseMultipleOplogEntryFormatForTransactions = false; + } +}; + +// Tests that a large transaction may be committed. This test creates a transaction with two +// operations that together are just big enough to exceed the size limit, which should result in a +// two oplog entry transaction. +TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultipleOplogEntries) { + const NamespaceString nss("testDB", "testColl"); + auto uuid = CollectionUUID::gen(); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + + // This size is crafted such that two operations of this size are not too big to fit in a single + // oplog entry, but two operations plus oplog overhead are too big to fit in a single oplog + // entry. + constexpr size_t kHalfTransactionSize = BSONObjMaxInternalSize / 2 - 175; + std::unique_ptr<uint8_t[]> halfTransactionData(new uint8_t[kHalfTransactionSize]()); + auto operation1 = repl::OplogEntry::makeInsertOperation( + nss, + uuid, + BSON( + "_id" << 0 << "data" + << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); + auto operation2 = repl::OplogEntry::makeInsertOperation( + nss, + uuid, + BSON( + "_id" << 0 << "data" + << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); + txnParticipant.addTransactionOperation(opCtx(), operation1); + txnParticipant.addTransactionOperation(opCtx(), operation2); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + StmtId expectedStmtId = 0; + std::vector<OplogEntry> oplogEntries; + mongo::repl::OpTime expectedPrevWriteOpTime; + for (const auto& oplogEntryObj : oplogEntryObjs) { + checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); + oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); + const auto& oplogEntry = oplogEntries.back(); + ASSERT(!oplogEntry.getPrepare()); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); + expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; + } + + auto oExpected = BSON("applyOps" << BSON_ARRAY(operation1.toBSON()) << "partialTxn" << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "partialTxn" << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/transaction_participant.idl b/src/mongo/db/transaction_participant.idl index 61a5f990e48..6add17293df 100644 --- a/src/mongo/db/transaction_participant.idl +++ b/src/mongo/db/transaction_participant.idl @@ -64,3 +64,13 @@ server_parameters: cpp_vartype: bool cpp_varname: gUseMultipleOplogEntryFormatForTransactions default: false + + maxNumberOfTransactionOperationsInSingleOplogEntry: + description: >- + Maximum number of operations to pack into a single oplog entry, when multi-oplog + format for transactions is in use. + set_at: startup + cpp_vartype: int + cpp_varname: gMaxNumberOfTransactionOperationsInSingleOplogEntry + default: 2147483647 # INT_MAX + validator: { gte: 1 } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 5803f091224..e83430fb809 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2823,20 +2823,31 @@ public: } }; +class MultiOplogScopedSettings { +public: + MultiOplogScopedSettings() + : _prevPackingLimit(gMaxNumberOfTransactionOperationsInSingleOplogEntry) { + gUseMultipleOplogEntryFormatForTransactions = true; + gMaxNumberOfTransactionOperationsInSingleOplogEntry = 1; + } + ~MultiOplogScopedSettings() { + gUseMultipleOplogEntryFormatForTransactions = false; + gMaxNumberOfTransactionOperationsInSingleOplogEntry = _prevPackingLimit; + } + +private: + int _prevPackingLimit; +}; + class MultiOplogEntryTransaction : public MultiDocumentTransactionTest { public: MultiOplogEntryTransaction() : MultiDocumentTransactionTest("multiOplogEntryTransaction") { - gUseMultipleOplogEntryFormatForTransactions = true; const auto currentTime = _clock->getClusterTime(); firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); secondOplogEntryTs = currentTime.addTicks(2).asTimestamp(); commitEntryTs = currentTime.addTicks(3).asTimestamp(); } - ~MultiOplogEntryTransaction() { - gUseMultipleOplogEntryFormatForTransactions = false; - } - void run() { auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); @@ -2953,13 +2964,15 @@ public: protected: Timestamp firstOplogEntryTs, secondOplogEntryTs; + +private: + MultiOplogScopedSettings multiOplogSettings; }; class CommitPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { public: CommitPreparedMultiOplogEntryTransaction() : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") { - gUseMultipleOplogEntryFormatForTransactions = true; const auto currentTime = _clock->getClusterTime(); firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); secondOplogEntryTs = currentTime.addTicks(2).asTimestamp(); @@ -2967,10 +2980,6 @@ public: commitEntryTs = currentTime.addTicks(4).asTimestamp(); } - ~CommitPreparedMultiOplogEntryTransaction() { - gUseMultipleOplogEntryFormatForTransactions = false; - } - void run() { auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); @@ -3170,23 +3179,21 @@ public: protected: Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs; + +private: + MultiOplogScopedSettings multiOplogSettings; }; class AbortPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest { public: AbortPreparedMultiOplogEntryTransaction() : MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") { - gUseMultipleOplogEntryFormatForTransactions = true; const auto currentTime = _clock->getClusterTime(); firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); prepareEntryTs = currentTime.addTicks(2).asTimestamp(); abortEntryTs = currentTime.addTicks(3).asTimestamp(); } - ~AbortPreparedMultiOplogEntryTransaction() { - gUseMultipleOplogEntryFormatForTransactions = false; - } - void run() { auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); @@ -3284,6 +3291,9 @@ public: protected: Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs, abortEntryTs; + +private: + MultiOplogScopedSettings multiOplogSettings; }; class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest { |